Aperi’CTF 2019: x32 Emulator
Challenge details
| Event | Challenge | Category | Points | Solves | 
|---|---|---|---|---|
| Aperi’CTF 2019 | x32 Emulator | MISC | 250 | 6 | 
Maintenant que vous vous êtes suffisamment familiarisés avec ce langage, il va vous falloir un moyen d’exécuter du bytecode x32. Comment comptez-vous analyser leurs microcontrôleurs sinon ?
Le serveur va vous donner plusieurs suites de bytecode x32, il faudra lui envoyer le résultat de l’exécution de celles-ci. Vous pouvez être sûr que le serveur ne vous enverra que du bytecode valide.
nc x32.aperictf.fr 32323
Note : Lisez la documentation de ce langage avant de commencer.
Ressource complémentaire : la documentation - md5sum: 5ac1ef34b4641b319281e65d80e84411
Methodology
The service gives us x32 bytecode encoded in hexadecimal and require us to supply the output of executing the bytecode. If our given result doesn’t match the expected one, the server will gracefully show us the intended result. 300 checks must be passed and each time a new bytecode is generated, we can’t fool the server by remembering the expected outputs. We have to implement an emulator.
To simplify a bit this tedious task, we know that the bytecode produced by the service is 100% valid, so there is no need to spend much time on error handling.
General idea
First, it’s mandatory to read the documentation of the language because it’s what we will need to implement and there are examples given. To implement an emulator, I decided to write a python class.
An x32 emulator must have a stack, registers, stdin, stdout and a set of instructions to execute.
The stack
To represent the stack a simple list of 256 bytes is used.
Registers
Registers are properties of the class, one per register. A table is used to map register IDs to register names. Functions to get/set a register value based on it’s name must be implemented.
IO
STDIN and STDOUT are both represented by a byte string. This way we can dissociate between the real IO streams of the machine running the emulator and directly access the result by accessing a property.
Instructions
A table is used to map each bytecode to an instruction and a corresponding function that will handle the logic behind this instruction.
Execution flow
To execute the entire bytecode, the emulator will start at IP=0. Depending on the bytecode encountered the appropriate function will be called. Each instruction updates the IP register which will automatically move to the next instruction.
Solution
The complete class can be found in emulator.py.
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author: ENOENT
class Emulator:
    STDIN = bytes()
    STDOUT = bytes()
    stack = list(bytes(256))
    IP = 0
    SP = 0xFF
    R1, R2, R3, R4 = 0, 0, 0, 0
    A1 = 0
    ZF, GF = 0, 0
    bytecode = bytes()
    opcodeNames = {
    0x11 : "IN",
    0x12 : "OUT",
    0x31 : "SET",
    0x22 : "LOAD",
    0x23 : "STORE",
    0x32 : "ADD",
    0x33 : "SUB",
    0x34 : "XOR",
    0x41 : "PUSH",
    0x42 : "POP",
    0x51 : "CMP",
    0x61 : "JG",
    0x62 : "JL",
    0x63 : "JE",
    0x64 : "GOTO"
    }
    registerID = {
    0x1 : "R1",
    0x2 : "R2",
    0x3 : "R3",
    0x4 : "R4",
    0x5 : "A1",
    0x6 : "SP"
    }
    def __init__(self, bytecode, debug=False):
        self.bytecode = bytecode
        self.debug = debug
        self.opcodeID = {
        0x11 : self._handleIN,
        0x12 : self._handleOUT,
        0x31 : self._handleSET,
        0x22 : self._handleLOAD,
        0x23 : self._handleSTORE,
        0x32 : self._handleADD,
        0x33 : self._handleSUB,
        0x34 : self._handleXOR,
        0x41 : self._handlePUSH,
        0x42 : self._handlePOP,
        0x51 : self._handleCMP,
        0x61 : self._handleJG,
        0x62 : self._handleJL,
        0x63 : self._handleJE,
        0x64 : self._handleGOTO
        }
    def _getRegisterValue(self, regname):
        if regname == "R1":
            return self.R1
        elif regname == "R2":
            return self.R2
        elif regname == "R3":
            return self.R3
        elif regname == "R4":
            return self.R4
        elif regname == "A1":
            return self.A1
        elif regname == "SP":
            return self.SP
        elif regname == "ZF":
            return self.ZF
        elif regname == "GF":
            return self.GF
        else:
            return regname
    def _setRegisterValue(self, regname, value):
        if regname == "R1":
            self.R1 = value
        elif regname == "R2":
            self.R2 = value
        elif regname == "R3":
            self.R3 = value
        elif regname == "R4":
            self.R4 = value
        elif regname == "A1":
            self.A1 = value
        elif regname == "SP":
            self.SP = value
        elif regname == "ZF":
            self.ZF = value
        elif regname == "GF":
            self.GF = value
    def _getJumpOffset(self):
        b = self.bytecode[self.IP:self.IP+2]
        offset = int.from_bytes(b, 'big')
        self.IP += 2
        return offset
    def _getRegisters(self):
        b = self.bytecode[self.IP]
        a1IsReg = bool(b & 0x80)
        a2IsReg = bool(b & 0x40)
        reg1 = (b & 0b111000) >> 3
        reg2 = b & 0b111
        t = [None, None]
        if a1IsReg and self.registerID.get(reg1):
            t[0] = self.registerID[reg1]
        if a2IsReg and self.registerID.get(reg2):
            t[1] = self.registerID[reg2]
        if (a1IsReg and not self.registerID.get(reg1)) or (a2IsReg and not self.registerID.get(reg2)):
            raise Exception("IP = {} : Invalid bytecode for registers".format(self.IP), b)
        self.IP += 1
        return t
    def _getNextInstruction(self):
        op = self.bytecode[self.IP]
        if not self.opcodeID.get(op):
            raise Exception("IP = {} : Invalid bytecode for instruction".format(self.IP), op)
        self.IP += 1
        return op
    def _getArguments(self, op, nArgs, a1CanBeImm, a2CanBeImm=False):
        reg1, reg2 = self._getRegisters()
        if reg2 and nArgs < 2:
            raise Exception("IP = {} : Invalid number of registers for instruction {}".format(self.IP, self.opcodeNames[op]), [reg1, reg2])
        arguments = []
        # first argument
        a1 = 0
        if reg1:
            a1 = reg1
        elif a1CanBeImm:
            a1 = self.bytecode[self.IP]
            self.IP += 1
        else:
            raise Exception("IP = {} : Invalid first argument for instruction {}".format(self.IP, self.opcodeNames[op]), a1)
        arguments.append(a1)
        # second argument
        if nArgs > 1:
            a2 = 0
            if reg2:
                a2 = reg2
            elif a2CanBeImm:
                a2 = self.bytecode[self.IP]
                self.IP += 1
            else:
                raise Exception("IP = {} : Invalid second argument for instruction {}".format(self.IP, self.opcodeNames[op]), a2)
            arguments.append(a2)
        return arguments
    def _handleIN(self, op):
        dst = self._getArguments(op, 1, True)[0]
        if self.debug : print("{} {}".format(self.opcodeNames[op], dst))
        dst = self._getRegisterValue(dst)
        if self.A1 > len(self.STDIN):
            raise Exception("STDIN too small", self.A1)
        data = self.STDIN[:self.A1]
        try:
            for i in range(dst, dst+self.A1):
                self.stack[i] = data[i-dst]
        except:
            raise Exception("IP = {} : Out of bounds exception during instruction {}".format(self.IP, self.opcodeNames[op]), i)
    def _handleOUT(self, op):
        src = self._getArguments(op, 1, True)[0]
        if self.debug : print("{} {}".format(self.opcodeNames[op], src))
        src = self._getRegisterValue(src)
        try:
            # could do that but wouldn't crash if attempting to read outside stack
            # data = self.stack[src: src+self.A1]
            data = []
            for i in range(src, src+self.A1):
                data.append(self.stack[i])
            self.STDOUT = bytes(data)
        except:
            raise Exception("IP = {} : Out of bounds exception during instruction {}".format(self.IP, self.opcodeNames[op]), i)
    def _handleSET(self, op):
        dst, src = self._getArguments(op, 2, False, True)
        if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
        src = self._getRegisterValue(src)
        self._setRegisterValue(dst, src)
    def _handleADD(self, op):
        dst, src = self._getArguments(op, 2, False, True)
        if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
        src = self._getRegisterValue(src)
        a = self._getRegisterValue(dst)
        r = (a+src) % 0x100
        self.ZF = r == 0
        self._setRegisterValue(dst, r)
    def _handleSUB(self, op):
        dst, src = self._getArguments(op, 2, False, True)
        if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
        src = self._getRegisterValue(src)
        a = self._getRegisterValue(dst)
        r = (a-src) % 0x100
        self.ZF = r == 0
        self._setRegisterValue(dst, r)
    def _handleXOR(self, op):
        dst, src = self._getArguments(op, 2, False, True)
        if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
        src = self._getRegisterValue(src)
        a = self._getRegisterValue(dst)
        r = a^src
        self.ZF = r == 0
        self._setRegisterValue(dst, r)
    def _handleLOAD(self, op):
        dst, src = self._getArguments(op, 2, False, True)
        if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
        src = self._getRegisterValue(src)
        try:
            self._setRegisterValue(dst, self.stack[src])
        except:
            raise Exception("IP = {} : Out of bounds exception during instruction {}".format(self.IP, self.opcodeNames[op]), src)
    def _handleSTORE(self, op):
        dst, src = self._getArguments(op, 2, True, False)
        if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
        dst = self._getRegisterValue(dst)
        try:
            self.stack[dst] = self._getRegisterValue(src)
        except:
            raise Exception("IP = {} : Out of bounds exception during instruction {}".format(self.IP, self.opcodeNames[op]), dst)
    def _handlePUSH(self, op):
        src = self._getArguments(op, 1, True)[0]
        if self.debug : print("{} {}".format(self.opcodeNames[op], src))
        src = self._getRegisterValue(src)
        self.SP -= 1
        try:
            self.stack[self.SP] = src
        except:
            raise Exception("IP = {} : Out of bounds exception during instruction {}".format(self.IP, self.opcodeNames[op]), dst)
    def _handlePOP(self, op):
        dst = self._getArguments(op, 1, False)[0]
        if self.debug : print("{} {}".format(self.opcodeNames[op], dst))
        try:
            src = self.stack[self.SP]
            self._setRegisterValue(dst, src)
            self.SP -= 1
        except:
            raise Exception("IP = {} : Out of bounds exception during instruction {}".format(self.IP, self.opcodeNames[op]), dst)
    def _handleCMP(self, op):
        dst, src = self._getArguments(op, 2, False, True)
        if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
        src = self._getRegisterValue(src)
        dst = self._getRegisterValue(dst)
        self.GF = dst > src
        self.ZF = dst == src
    def _handleJE(self, op):
        offset = self._getJumpOffset()
        if self.debug : print("{} {}".format(self.opcodeNames[op], offset))
        if self.ZF:
            self.IP = offset
    def _handleJG(self, op):
        offset = self._getJumpOffset()
        if self.debug : print("{} {}".format(self.opcodeNames[op], offset))
        if self.GF:
            self.IP = offset
    def _handleJL(self, op):
        offset = self._getJumpOffset()
        if self.debug : print("{} {}".format(self.opcodeNames[op], offset))
        if not self.GF and not self.ZF:
            self.IP = offset
    def _handleGOTO(self, op):
        offset = self._getJumpOffset()
        if self.debug : print("{} {}".format(self.opcodeNames[op], offset))
        self.IP = offset
    def run(self):
        N = len(self.bytecode) - 1
        while self.IP < N:
            if self.debug : print("{} : ".format(self.IP), end="")
            op = self._getNextInstruction()
            self.opcodeID[op](op)
We can now use it to dialogue with the service and pass all the checks (solve.py).
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import socket # Because pwntools doesn't work with python3 :(
from emulator import Emulator
import binascii
def recvuntil(msg, drop=False):
    buffer = b""
    while msg not in buffer:
        buffer += conn.recv(1)
    if drop:
        buffer = buffer[:-len(msg)]
    return buffer
def solve():
    recvuntil(b"Byte code in hex : ")
    code = recvuntil(b"\n", drop=True).decode("utf-8")
    code = binascii.unhexlify(code)
    emu = Emulator(code)
    emu.run()
    expected = emu.STDOUT
    recvuntil(b"What is the outut of this programm ?\n")
    conn.send(expected)
    print(conn.recv(1024).decode('utf-8').strip())
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.connect(("x32.aperictf.fr", 32323))
for i in range(300):
    solve()
print(conn.recv(1024).decode('utf-8').strip())
conn.close()
Flag
python3 solve.py
Test 1/300 : SUCCESS
Test 2/300 : SUCCESS
Test 3/300 : SUCCESS
...
Test 298/300 : SUCCESS
Test 299/300 : SUCCESS
Test 300/300 : SUCCESS
You passed all the tests, here you go :
APRK{Th4ts_S0m3_c00l_3mul4t10n_y0u_G0t_Th3r3!}
APRK{Th4ts_S0m3_c00l_3mul4t10n_y0u_G0t_Th3r3!}
ENOENT
