N1CTF 2021 | n1ogin

#n1ctf2021

このほか n1ogin.pub の他、adminがログインしたときのpcapファイルが渡されている

import os
import json
import time

from Crypto.PublicKey.RSA import import_key
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import hashes, hmac
from pwn import *


PUB_KEY = import_key(open("n1ogin.pub", "r").read())


def seal(content):
    iv = os.urandom(16)
    aes_key = os.urandom(24)
    hmac_key = os.urandom(24)

    mm = int.from_bytes(PKCS1_pad(aes_key+hmac_key), 'big')
    rsa_data = pow(mm, PUB_KEY.e, PUB_KEY.n).to_bytes(2048//8, 'big')

    aes = Cipher(algorithms.AES(aes_key), modes.CBC(iv))
    encryptor = aes.encryptor()
    cipher = encryptor.update(PKCS7_pad(content)) + encryptor.finalize()

    mac = iv + cipher
    for _ in range(7777):
        h = hmac.HMAC(hmac_key, hashes.MD5())
        h.update(mac)
        mac = h.finalize()
    aes_data = iv + cipher + mac

    res = {
        "rsa_data": rsa_data.hex(),
        "aes_data": aes_data.hex()
    }
    return res

def PKCS1_pad(payload):
    assert len(payload) == 48
    return b"\x00\x02" + b"\x77"*(2048//8-2-1-48) + b"\x00" + payload

def PKCS7_pad(payload):
    pad_length = 16 - len(payload)%16
    payload += bytes([pad_length]) * pad_length
    return payload

def login(conn):
    username = input("username: ")
    password = input("password: ")
    content = json.dumps({
        "choice": "login",
        "timestamp": int(time.time()),
        "nonce": os.urandom(8).hex(),
        "username": username,
        "password": password
    })
    envelope = json.dumps(seal(content.encode()))
    conn.sendlineafter(b"> ", envelope.encode())
    print(conn.recvline().decode())
    conn.interactive()

def register(conn):
    username = input("username: ")
    password = input("password: ")
    content = json.dumps({
        "choice": "register",
        "timestamp": int(time.time()),
        "nonce": os.urandom(8).hex(),
        "username": username,
        "password": password
    })
    envelope = json.dumps(seal(content.encode()))
    conn.sendlineafter(b"> ", envelope.encode())
    print(conn.recvline().decode())


def main():
    HOST = "127.0.0.1"
    PORT = 7777
    conn = remote(HOST, PORT)
    while True:
        choice = input("login/register: ")
        if choice == "login":
            login(conn)
        elif choice == "register":
            register(conn)
        else:
            break
    conn.close()

if __name__ == "__main__":
    main()
import os
import json
import time

from Crypto.PublicKey.RSA import import_key
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import hashes, hmac

from secret import FLAG, SALT


# generated by `openssl genrsa -out n1ogin.pem 2048`
PRIV_KEY = import_key(open("n1ogin.pem", "r").read())

# nonce for replay attack
Nonces = set()


def cal_password_hash(password):
    hash = password.encode() + SALT
    for _ in range(7777):    # enhanced secure
        digest = hashes.Hash(hashes.MD5())
        digest.update(hash)
        hash = digest.finalize()
    return hash

def RSA_decrypt(rsa_data):
    cc = int.from_bytes(rsa_data, 'big')
    mm = pow(cc, PRIV_KEY.d, PRIV_KEY.n)
    message = mm.to_bytes(2048//8, 'big')

    if check_PKCS1(message):
        payload = message[-48:]
    else:
        # To prevent Bleichenbacher's attack, we continue with random bytes
        # when the PKCS1 check is not passed
        payload = os.urandom(48)
    return payload

def check_PKCS1(message):
    # message: 0x00 || 0x02 || padding string || 0x00 || (48 bytes) payload
    ok = all([
        message[0] == 0x00,
        message[1] == 0x02,
        all(byte != 0x00 for byte in message[2:-49]),
        message[-49] == 0x00
    ])
    return ok

def check_time(timestamp):
    return abs(int(time.time()) - timestamp) < 30

def check_nonce(nonce):
    if nonce in Nonces:
        return False
    Nonces.add(nonce)
    return True

def AES_decrypt(key, enc_data):
    # key: aes_key || hmac_key
    aes_key = key[:24]
    hmac_key = key[24:]
    # enc_data: iv || cipher || mac
    iv, cipher, mac = enc_data[:16], enc_data[16:-16], enc_data[-16:]

    aes = Cipher(algorithms.AES(aes_key), modes.CBC(iv))
    decryptor = aes.decryptor()
    data = decryptor.update(cipher) + decryptor.finalize()

    # check padding
    data = unpad(data)
    if not data:
        return None, "padding error"

    # check hmac
    cal_mac = iv + cipher
    for _ in range(7777):    # enhanced secure
        h = hmac.HMAC(hmac_key, hashes.MD5())
        h.update(cal_mac)
        cal_mac = h.finalize()
    if cal_mac != mac:
        return None, "hmac error"

    return data, None

def pad(pt):
    pad_length = 16 - len(pt)%16
    pt += bytes([pad_length]) * pad_length
    return pt

def unpad(ct):
    pad_length = ct[-1]
    if pad(ct[:-pad_length]) == ct:
        return ct[:-pad_length]
    else:
        return None

def login(username, password):
    if username not in Users or Users[username] != cal_password_hash(password):
        print("login failed...")
        return
    print(f"{username} login ok!")
    echo_shell(username)

def register(username, password):
    if username in Users or len(username) > 20:
        print("register failed...")
    else:
        Users[username] = cal_password_hash(password)
        print(f"{username} register ok!")

def echo_shell(username):
    while True:
        command = input(f"{username}@local> ")
        if username == "admin" and command == "flag":
            print(FLAG)
        elif command == "exit":
            exit(0)
        else:
            print(command)

def handle(envelope):
    try:
        envelope_json = json.loads(envelope)

        key = RSA_decrypt(bytes.fromhex(envelope_json["rsa_data"]))
        content, err = AES_decrypt(key, bytes.fromhex(envelope_json["aes_data"]))
        if err:
            print("Error!")
            return

        content = json.loads(content)
        # check nonce
        if not check_nonce(content["nonce"]):
            print("Error!")
            return
        # check time
        if not check_time(content["timestamp"]):
            print("Error!")
            return
        # handle login/register
        choice = content["choice"]
        if choice == "login":
            login(content["username"], content["password"])
        elif choice == "register":
            register(content["username"], content["password"])
        else:
            print("Error!")

    except Exception as e:
        print("Error!")


Users = {
    # username:password_hash
    "admin": "REACTED",  # admin password obeys the strong password policy
    "guest": cal_password_hash("guest")
}


def main():
    print("Welcome to the n1ogin system!")
    while True:
        envelope = input("> ")
        handle(envelope)

if __name__ == "__main__":
    main()

概要

  • login/registerができるシステム

    • admin としてログインすることが目的

    • adminとしてログインしているpcapfileはあるが、RSAは復号不可能なので aes_datablackboxかつ、nonceは一度きりしか使えないのでそのままログインはできない

  • コマンドは PKCS#7padding された後 AES - CBC で暗号化され、 HMAC署名 される

    • ! ただしこのHMACの処理が多少特殊で、通常のHMACを7777回ネストしたものになっている

    • PKCS#7パディングエラーHMAC の検証エラーは区別できる

      • 出力はどちらも単に Error! だが、HMACが7777回ループしているので Timing Attack が可能
  • また、AES, HMACで用いられる鍵は結合されたあと PKCS#1padding し、 RSA で暗号化して渡される

    • PKCS#1 のチェックでエラーがあった場合にはランダムデータとして処理されるので Bleichenbacher's Attack はできないと主張されている
  • ログインのためには usernamepassword を渡す必要がある

  • パスワードは SALT をつけられた上で7777回の HMAC によってハッシュ化され、ハッシュ化したもの同士を比較する

解法

  • aes_data には iv + username/password + macが含まれている。今回は Padding Oracle Attack を使って aes_data の中身を解読してadminのパスワードを得る

  • rsa_data はそのまま送り、 aes_data のデータ部を変化させながらクエリする。正しいパディングになったタイミングで HMAC の処理が走るため、レスポンスまでの時間を測ることでオラクルが得られる

writeups