x32 Emulator

Aperi'CTF 2019 - MISC (250 pts).

Aperi’CTF 2019: x32 Emulator

Challenge details

Event Challenge Category Points Solves
Aperi’CTF 2019 x32 Emulator MISC 250 6

Maintenant que vous vous êtes suffisamment familiarisés avec ce langage, il va vous falloir un moyen d’exécuter du bytecode x32. Comment comptez-vous analyser leurs microcontrôleurs sinon ?

Le serveur va vous donner plusieurs suites de bytecode x32, il faudra lui envoyer le résultat de l’exécution de celles-ci. Vous pouvez être sûr que le serveur ne vous enverra que du bytecode valide.

nc x32.aperictf.fr 32323

Note : Lisez la documentation de ce langage avant de commencer.
Ressource complémentaire : la documentation - md5sum: 5ac1ef34b4641b319281e65d80e84411

Methodology

The service gives us x32 bytecode encoded in hexadecimal and require us to supply the output of executing the bytecode. If our given result doesn’t match the expected one, the server will gracefully show us the intended result. 300 checks must be passed and each time a new bytecode is generated, we can’t fool the server by remembering the expected outputs. We have to implement an emulator.

To simplify a bit this tedious task, we know that the bytecode produced by the service is 100% valid, so there is no need to spend much time on error handling.

General idea

First, it’s mandatory to read the documentation of the language because it’s what we will need to implement and there are examples given. To implement an emulator, I decided to write a python class.

An x32 emulator must have a stack, registers, stdin, stdout and a set of instructions to execute.

The stack

To represent the stack a simple list of 256 bytes is used.

Registers

Registers are properties of the class, one per register. A table is used to map register IDs to register names. Functions to get/set a register value based on it’s name must be implemented.

IO

STDIN and STDOUT are both represented by a byte string. This way we can dissociate between the real IO streams of the machine running the emulator and directly access the result by accessing a property.

Instructions

A table is used to map each bytecode to an instruction and a corresponding function that will handle the logic behind this instruction.

Execution flow

To execute the entire bytecode, the emulator will start at IP=0. Depending on the bytecode encountered the appropriate function will be called. Each instruction updates the IP register which will automatically move to the next instruction.

Solution

The complete class can be found in emulator.py.

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author: ENOENT

class Emulator:
    STDIN = bytes()
    STDOUT = bytes()
    stack = list(bytes(256))
    IP = 0
    SP = 0xFF
    R1, R2, R3, R4 = 0, 0, 0, 0
    A1 = 0
    ZF, GF = 0, 0
    bytecode = bytes()
    opcodeNames = {
    0x11 : "IN",
    0x12 : "OUT",
    0x31 : "SET",
    0x22 : "LOAD",
    0x23 : "STORE",
    0x32 : "ADD",
    0x33 : "SUB",
    0x34 : "XOR",
    0x41 : "PUSH",
    0x42 : "POP",
    0x51 : "CMP",
    0x61 : "JG",
    0x62 : "JL",
    0x63 : "JE",
    0x64 : "GOTO"
    }
    registerID = {
    0x1 : "R1",
    0x2 : "R2",
    0x3 : "R3",
    0x4 : "R4",
    0x5 : "A1",
    0x6 : "SP"
    }

    def __init__(self, bytecode, debug=False):
        self.bytecode = bytecode
        self.debug = debug
        self.opcodeID = {
        0x11 : self._handleIN,
        0x12 : self._handleOUT,
        0x31 : self._handleSET,
        0x22 : self._handleLOAD,
        0x23 : self._handleSTORE,
        0x32 : self._handleADD,
        0x33 : self._handleSUB,
        0x34 : self._handleXOR,
        0x41 : self._handlePUSH,
        0x42 : self._handlePOP,
        0x51 : self._handleCMP,
        0x61 : self._handleJG,
        0x62 : self._handleJL,
        0x63 : self._handleJE,
        0x64 : self._handleGOTO
        }

    def _getRegisterValue(self, regname):
        if regname == "R1":
            return self.R1
        elif regname == "R2":
            return self.R2
        elif regname == "R3":
            return self.R3
        elif regname == "R4":
            return self.R4
        elif regname == "A1":
            return self.A1
        elif regname == "SP":
            return self.SP
        elif regname == "ZF":
            return self.ZF
        elif regname == "GF":
            return self.GF
        else:
            return regname

    def _setRegisterValue(self, regname, value):
        if regname == "R1":
            self.R1 = value
        elif regname == "R2":
            self.R2 = value
        elif regname == "R3":
            self.R3 = value
        elif regname == "R4":
            self.R4 = value
        elif regname == "A1":
            self.A1 = value
        elif regname == "SP":
            self.SP = value
        elif regname == "ZF":
            self.ZF = value
        elif regname == "GF":
            self.GF = value

    def _getJumpOffset(self):
        b = self.bytecode[self.IP:self.IP+2]
        offset = int.from_bytes(b, 'big')
        self.IP += 2
        return offset

    def _getRegisters(self):
        b = self.bytecode[self.IP]
        a1IsReg = bool(b & 0x80)
        a2IsReg = bool(b & 0x40)
        reg1 = (b & 0b111000) >> 3
        reg2 = b & 0b111
        t = [None, None]
        if a1IsReg and self.registerID.get(reg1):
            t[0] = self.registerID[reg1]
        if a2IsReg and self.registerID.get(reg2):
            t[1] = self.registerID[reg2]
        if (a1IsReg and not self.registerID.get(reg1)) or (a2IsReg and not self.registerID.get(reg2)):
            raise Exception("IP = {} : Invalid bytecode for registers".format(self.IP), b)
        self.IP += 1
        return t

    def _getNextInstruction(self):
        op = self.bytecode[self.IP]
        if not self.opcodeID.get(op):
            raise Exception("IP = {} : Invalid bytecode for instruction".format(self.IP), op)
        self.IP += 1
        return op

    def _getArguments(self, op, nArgs, a1CanBeImm, a2CanBeImm=False):
        reg1, reg2 = self._getRegisters()
        if reg2 and nArgs < 2:
            raise Exception("IP = {} : Invalid number of registers for instruction {}".format(self.IP, self.opcodeNames[op]), [reg1, reg2])
        arguments = []
        # first argument
        a1 = 0
        if reg1:
            a1 = reg1
        elif a1CanBeImm:
            a1 = self.bytecode[self.IP]
            self.IP += 1
        else:
            raise Exception("IP = {} : Invalid first argument for instruction {}".format(self.IP, self.opcodeNames[op]), a1)
        arguments.append(a1)
        # second argument
        if nArgs > 1:
            a2 = 0
            if reg2:
                a2 = reg2
            elif a2CanBeImm:
                a2 = self.bytecode[self.IP]
                self.IP += 1
            else:
                raise Exception("IP = {} : Invalid second argument for instruction {}".format(self.IP, self.opcodeNames[op]), a2)
            arguments.append(a2)
        return arguments

    def _handleIN(self, op):
        dst = self._getArguments(op, 1, True)[0]
        if self.debug : print("{} {}".format(self.opcodeNames[op], dst))
        dst = self._getRegisterValue(dst)
        if self.A1 > len(self.STDIN):
            raise Exception("STDIN too small", self.A1)
        data = self.STDIN[:self.A1]
        try:
            for i in range(dst, dst+self.A1):
                self.stack[i] = data[i-dst]
        except:
            raise Exception("IP = {} : Out of bounds exception during instruction {}".format(self.IP, self.opcodeNames[op]), i)

    def _handleOUT(self, op):
        src = self._getArguments(op, 1, True)[0]
        if self.debug : print("{} {}".format(self.opcodeNames[op], src))
        src = self._getRegisterValue(src)
        try:
            # could do that but wouldn't crash if attempting to read outside stack
            # data = self.stack[src: src+self.A1]
            data = []
            for i in range(src, src+self.A1):
                data.append(self.stack[i])
            self.STDOUT = bytes(data)
        except:
            raise Exception("IP = {} : Out of bounds exception during instruction {}".format(self.IP, self.opcodeNames[op]), i)

    def _handleSET(self, op):
        dst, src = self._getArguments(op, 2, False, True)
        if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
        src = self._getRegisterValue(src)
        self._setRegisterValue(dst, src)

    def _handleADD(self, op):
        dst, src = self._getArguments(op, 2, False, True)
        if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
        src = self._getRegisterValue(src)
        a = self._getRegisterValue(dst)
        r = (a+src) % 0x100
        self.ZF = r == 0
        self._setRegisterValue(dst, r)

    def _handleSUB(self, op):
        dst, src = self._getArguments(op, 2, False, True)
        if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
        src = self._getRegisterValue(src)
        a = self._getRegisterValue(dst)
        r = (a-src) % 0x100
        self.ZF = r == 0
        self._setRegisterValue(dst, r)

    def _handleXOR(self, op):
        dst, src = self._getArguments(op, 2, False, True)
        if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
        src = self._getRegisterValue(src)
        a = self._getRegisterValue(dst)
        r = a^src
        self.ZF = r == 0
        self._setRegisterValue(dst, r)

    def _handleLOAD(self, op):
        dst, src = self._getArguments(op, 2, False, True)
        if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
        src = self._getRegisterValue(src)

        try:
            self._setRegisterValue(dst, self.stack[src])
        except:
            raise Exception("IP = {} : Out of bounds exception during instruction {}".format(self.IP, self.opcodeNames[op]), src)

    def _handleSTORE(self, op):
        dst, src = self._getArguments(op, 2, True, False)
        if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
        dst = self._getRegisterValue(dst)
        try:
            self.stack[dst] = self._getRegisterValue(src)
        except:
            raise Exception("IP = {} : Out of bounds exception during instruction {}".format(self.IP, self.opcodeNames[op]), dst)

    def _handlePUSH(self, op):
        src = self._getArguments(op, 1, True)[0]
        if self.debug : print("{} {}".format(self.opcodeNames[op], src))
        src = self._getRegisterValue(src)
        self.SP -= 1
        try:
            self.stack[self.SP] = src
        except:
            raise Exception("IP = {} : Out of bounds exception during instruction {}".format(self.IP, self.opcodeNames[op]), dst)

    def _handlePOP(self, op):
        dst = self._getArguments(op, 1, False)[0]
        if self.debug : print("{} {}".format(self.opcodeNames[op], dst))
        try:
            src = self.stack[self.SP]
            self._setRegisterValue(dst, src)
            self.SP -= 1
        except:
            raise Exception("IP = {} : Out of bounds exception during instruction {}".format(self.IP, self.opcodeNames[op]), dst)

    def _handleCMP(self, op):
        dst, src = self._getArguments(op, 2, False, True)
        if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
        src = self._getRegisterValue(src)
        dst = self._getRegisterValue(dst)
        self.GF = dst > src
        self.ZF = dst == src

    def _handleJE(self, op):
        offset = self._getJumpOffset()
        if self.debug : print("{} {}".format(self.opcodeNames[op], offset))
        if self.ZF:
            self.IP = offset

    def _handleJG(self, op):
        offset = self._getJumpOffset()
        if self.debug : print("{} {}".format(self.opcodeNames[op], offset))
        if self.GF:
            self.IP = offset

    def _handleJL(self, op):
        offset = self._getJumpOffset()
        if self.debug : print("{} {}".format(self.opcodeNames[op], offset))
        if not self.GF and not self.ZF:
            self.IP = offset

    def _handleGOTO(self, op):
        offset = self._getJumpOffset()
        if self.debug : print("{} {}".format(self.opcodeNames[op], offset))
        self.IP = offset

    def run(self):
        N = len(self.bytecode) - 1
        while self.IP < N:
            if self.debug : print("{} : ".format(self.IP), end="")
            op = self._getNextInstruction()
            self.opcodeID[op](op)

We can now use it to dialogue with the service and pass all the checks (solve.py).


#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import socket # Because pwntools doesn't work with python3 :(
from emulator import Emulator
import binascii

def recvuntil(msg, drop=False):
    buffer = b""
    while msg not in buffer:
        buffer += conn.recv(1)
    if drop:
        buffer = buffer[:-len(msg)]
    return buffer

def solve():
    recvuntil(b"Byte code in hex : ")
    code = recvuntil(b"\n", drop=True).decode("utf-8")
    code = binascii.unhexlify(code)

    emu = Emulator(code)
    emu.run()
    expected = emu.STDOUT
    recvuntil(b"What is the outut of this programm ?\n")
    conn.send(expected)
    print(conn.recv(1024).decode('utf-8').strip())


conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.connect(("x32.aperictf.fr", 32323))
for i in range(300):
    solve()
print(conn.recv(1024).decode('utf-8').strip())
conn.close()

Flag

python3 solve.py
Test 1/300 : SUCCESS
Test 2/300 : SUCCESS
Test 3/300 : SUCCESS
...
Test 298/300 : SUCCESS
Test 299/300 : SUCCESS
Test 300/300 : SUCCESS
You passed all the tests, here you go :
APRK{Th4ts_S0m3_c00l_3mul4t10n_y0u_G0t_Th3r3!}

APRK{Th4ts_S0m3_c00l_3mul4t10n_y0u_G0t_Th3r3!}

ENOENT