Module refinery.lib.emulator

This module implements an emulator abstraction layer.

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
This module implements an emulator abstraction layer.
"""
from __future__ import annotations
from typing import Dict, List, Any, Generic, TypeVar, Optional, Iterator, Union
from typing import TYPE_CHECKING

from abc import ABC, abstractmethod
from enum import IntFlag
from functools import lru_cache, cached_property, partial

from refinery.lib.executable import align, Arch, ET, BO, Executable, ExecutableCodeBlob
from refinery.lib.tools import NoLogging
from refinery.lib.vfs import VirtualFileSystem
from refinery.units import RefineryImportMissing

if TYPE_CHECKING:
    from capstone import Cs
    from speakeasy import Speakeasy as Se
    from speakeasy.memmgr import MemMap
    from unicorn import Uc
    from icicle import Icicle as Ic
    from intervaltree import Interval
else:
    Cs = TypeVar('Cs')
    Uc = TypeVar('Uc')
    Ic = TypeVar('Ic')
    Se = TypeVar('Se')


_T = TypeVar('_T')
_E = TypeVar('_E')
_R = TypeVar('_R')


class MissingModule:
    """
    This class can wrap a module import that is currently missing. If any attribute of the missing
    module is accessed, it raises `refinery.units.RefineryImportMissing`.
    """
    def __init__(self, name, dist=None):
        self.name = name
        self.dist = dist or name

    def __getattr__(self, key: str):
        if key.startswith('__') and key.endswith('__'):
            raise AttributeError(key)
        raise RefineryImportMissing(self.name, self.dist)


try:
    with NoLogging():
        import unicorn as uc
    # import unicorn.x86_const
    # import unicorn.arm64_const
    # import unicorn.mips_const
    # import unicorn.sparc_const
    # try:
    #     import unicorn.ppc_const
    # except ImportError:
    #     pass
except ImportError:
    uc = MissingModule('unicorn')
try:
    import speakeasy as se
except ImportError:
    se = MissingModule('speakeasy-emulator')
try:
    import icicle as ic
except ImportError:
    ic = MissingModule('icicle-emu')
try:
    import capstone as cs
except ImportError:
    cs = MissingModule('capstone')
try:
    import intervaltree
except ImportError:
    intervaltree = MissingModule('intervaltree')


class EmulationError(Exception):
    """
    Base class for any exceptions raised by emulators.
    """
    pass


class Register(Generic[_R]):
    """
    Represents an arbitrary CPU register.
    """
    __slots__ = (
        'name',
        'code',
        'size',
    )
    name: str
    """
    This is the common name of the register, like "eax" on x86.
    """
    code: _R
    """
    The code of a register is any emulator-specific internal identifier for the register.
    """
    size: Optional[int]
    """
    If not `None`, this property contains the size of the register in bytes.
    """

    def __init__(self, name: str, code: _R, size: Optional[int] = 0):
        self.name = name
        self.code = code
        self.size = size

    def __eq__(self, other: Register):
        return self.code == other.code and self.size == other.size

    def __hash__(self):
        return hash((self.code, self.size))


class Hook(IntFlag):
    """
    A bit mask flag for the types of hooks that are requested from an emulator.
    """
    CodeExecute  = 0b000_00001  # noqa
    CodeError    = 0b000_00010  # noqa
    MemoryRead   = 0b000_00100  # noqa
    MemoryWrite  = 0b000_01000  # noqa
    MemoryError  = 0b000_10000  # noqa

    OnlyErrors   = 0b000_10010  # noqa
    Everything   = 0b111_11111  # noqa
    Nothing      = 0b000_00000  # noqa
    MemoryAccess = 0b000_01100  # noqa
    Memory       = 0b000_11100  # noqa
    NoErrors     = 0b000_01101  # noqa


class Emulator(ABC, Generic[_E, _R, _T]):
    """
    The emulator base class.
    """

    def __init__(
        self,
        data: Union[Executable, bytes, bytearray, memoryview],
        base: Optional[int] = None,
        arch: Optional[Arch] = None,
        hooks: Hook = Hook.OnlyErrors,
        align_size: int = 0x1000,
        alloc_size: int = 0x1000,
    ):
        if isinstance(data, Executable):
            exe = data
        try:
            exe = Executable.Load(data)
        except ValueError:
            exe = ExecutableCodeBlob(data, base, arch)
            raw = True
        else:
            raw = False

        self.exe = exe
        self.raw = raw

        self.hooks = hooks
        self.base = exe.base

        self.align_size = align_size
        self.alloc_size = alloc_size
        self._resetonce = False

        self._sp, self._ip, self._rv = {
            Arch.PPC32   : ('1',   'pc',  '3'  ), # noqa
            Arch.PPC64   : ('1',   'pc',  '3'  ), # noqa
            Arch.X32     : ('esp', 'eip', 'eax'), # noqa
            Arch.X64     : ('rsp', 'rip', 'rax'), # noqa
            Arch.ARM32   : ('sp',  'ip',  'r0' ), # noqa
            Arch.ARM64   : ('sp',  'ip',  'r0' ), # noqa
            Arch.MIPS16  : ('sp',  'pc',  '0'  ), # noqa
            Arch.MIPS32  : ('sp',  'pc',  'v0' ), # noqa
            Arch.MIPS64  : ('sp',  'pc',  'v0' ), # noqa
            Arch.SPARC32 : ('sp',  'pc',  'o0' ), # noqa
            Arch.SPARC64 : ('sp',  'pc',  'o0' ), # noqa
        }[exe.arch()]

        self._init()

    @cached_property
    def _reg_sp(self):
        return self._lookup_register(self._sp).code

    @cached_property
    def _reg_ip(self):
        return self._lookup_register(self._ip).code

    @cached_property
    def _reg_rv(self):
        return self._lookup_register(self._rv).code

    def hooked(self, hook: Hook) -> bool:
        """
        Return whether the given hook is (supposed to be) set.
        """
        return self.hooks & hook == hook

    def reset(self, state: Optional[_T] = None):
        """
        This function resets the emulator to an initial state. This will create a new instance of
        the underlying emulator engine, map the input executable to memory, and install any of the
        requested hooks.
        """
        self._resetonce = True
        self._memorymap = intervaltree.IntervalTree()
        self.state = state
        self._reset()

    def emulate(self, start: int, end: Optional[int] = None):
        """
        Call this function to begin emulation. The `start` parameter is the address where execution
        should begin, the `end` parameter is an optional address to halt at.
        """
        if not self._resetonce:
            self.reset()
        exe = self.exe
        start = start - exe.base + self.base
        if end is not None:
            end = end - exe.base + self.base
        return self._emulate(start, end)

    @abstractmethod
    def _reset(self):
        """
        Called as part of `refinery.lib.emulator.Emulator.reset`.
        """
        ...

    def _init(self):
        """
        Called at the very end of the object initializer. Can be overridden by child classes to
        initialize variables that do not depend on the emulator engine to be ready.
        """
        pass

    @abstractmethod
    def _emulate(self, start: int, end: Optional[int] = None):
        """
        This is the tail call of `refinery.lib.emulator.Emulator.emulate`.
        """
        ...

    @abstractmethod
    def halt(self):
        """
        Causes the emulation to halt, usually when called from a hook.
        """
        ...

    @abstractmethod
    def _set_register(self, register: _R, v: int):
        """
        Called as part of `refinery.lib.emulator.Emulator.set_register`.
        """
        ...

    @abstractmethod
    def _get_register(self, register: _R) -> int:
        """
        Called as part of `refinery.lib.emulator.Emulator.get_register`.
        """
        ...

    @abstractmethod
    def _lookup_register(self, var: Union[_R, int]) -> Register[_R]:
        """
        Called as part of `refinery.lib.emulator.Emulator.lookup_register`.
        """
        ...

    @abstractmethod
    def _map(self, address: int, size: int):
        """
        Called as part of `refinery.lib.emulator.Emulator.map`.
        """
        ...

    @abstractmethod
    def mem_write(self, address: int, data: bytes):
        """
        Write data to already mapped memory.
        """
        ...

    @abstractmethod
    def mem_read(self, address: int, size: int) -> bytes:
        """
        Read data from the emulator's mapped memory.
        """
        ...

    @abstractmethod
    def malloc(self, size: int) -> int:
        """
        Allocate (i.e. map) the given amount of memory in the emulator's memory space.
        """
        ...

    @abstractmethod
    def morestack(self):
        """
        Allocate more memory for the stack to grow into.
        """
        ...

    def lookup_register(self, var: Union[str, _R, Register[_R]]):
        """
        Return the `refinery.lib.emulator.Register` for the given name or code. `Register` type
        inputs are passed through unaltered.
        """
        if isinstance(var, Register):
            return var
        return self._lookup_register(var)

    def map(self, address: int, size: int):
        """
        Map memory of the given size at the given address. This function does not fail when part
        of the memory is already mapped; it will instead map only the missing pieces.
        """
        lower = address
        upper = address + size
        for interval in self._memorymap.overlap(lower, upper):
            interval: Interval
            lower = interval.begin if lower < interval.begin else interval.end
            upper = interval.end if upper > interval.end else interval.begin
            if lower >= upper:
                return
        self._memorymap.addi(lower, upper)
        self._map(lower, upper - lower)

    @property
    def sp(self):
        """
        The stack pointer.
        """
        return self.get_register(self._reg_sp)

    @sp.setter
    def sp(self, value: int):
        return self.set_register(self._reg_sp, value)

    @property
    def rv(self):
        """
        The return value.
        """
        return self.get_register(self._reg_rv)

    @rv.setter
    def rv(self, value: int):
        return self.set_register(self._reg_rv, value)

    @property
    def ip(self):
        """
        The instruction pointer.
        """
        return self.get_register(self._reg_ip)

    @ip.setter
    def ip(self, value: int):
        return self.set_register(self._reg_ip, value)

    def measure_register_size(self, reg: _R) -> int:
        """
        Measures the size of a register by writing a very large number to it with all bits set,
        subsequently reading the register value, and counting the number of bits in the
        measurement. Props for this one go to Matthieu Walter who originally proposed it as a
        joke; I have not found a better way to do this for uncooperative emulators.
        """
        val = self._get_register(reg)
        self._set_register(reg, (1 << 512) - 1)
        q, r = divmod(self._get_register(reg).bit_length(), 8)
        assert r == 0
        self._set_register(reg, val)
        return q

    def push(self, val: int, size: Optional[int] = None):
        """
        Push the given integer value to the stack. If the `size` parameter is missing, the function
        will push a machine word sized value.
        """
        if size is None:
            size = self.exe.pointer_size // 8
        tos = self.sp - size
        for already_retried_once in (False, True):
            try:
                self.mem_write(tos, val.to_bytes(size, self.exe.byte_order().value))
            except Exception:
                if already_retried_once:
                    raise
                self.morestack()
            else:
                break

    def pop(self, size: Optional[int] = None):
        """
        Pop an integer value from the stack. If the `size` parameter is missing, the function will
        pop a machine word sized value.
        """
        if size is None:
            size = self.exe.pointer_size // 8
        sp = self.sp
        sv = int.from_bytes(self.mem_read(sp, size), self.exe.byte_order().value)
        self.sp = sp + size
        return sv

    def push_register(self, reg: Union[int, str, Register[_R]]):
        """
        Push the contents of the given register to the stack.
        """
        reg = self.lookup_register(reg)
        val = self.get_register(reg.code)
        self.push(val, reg.size)

    def align(self, value, down=False):
        """
        Align the given value according to the emulator's alignment setting. If the `down` parameter
        is set, it will return the nearest lower address instead of the nearest higher one.
        """
        return align(self.align_size, value, down=down)

    def set_register(self, register: Union[int, str, Register[_R]], value: int):
        """
        Write the given value to the given CPU register.
        """
        register = self.lookup_register(register)
        return self._set_register(register.code, value)

    def get_register(self, register: Union[int, str, Register[_R]]) -> int:
        """
        Read the contents of the given CPU register.
        """
        register = self.lookup_register(register)
        return self._get_register(register.code)

    def hook_code_execute(self, emu: _E, address: int, size: int, state: Optional[_T] = None) -> bool:
        """
        Called when code execution is hooked.
        """
        return True

    def hook_code_error(self, emu: _E, state: Optional[_T] = None) -> bool:
        """
        Called when code errors are hooked.
        """
        self.halt()
        return False

    def hook_mem_read(self, emu: _E, access: int, address: int, size: int, value: int, state: Optional[_T] = None) -> bool:
        """
        Called when memory reads are hooked.
        """
        return True

    def hook_mem_write(self, emu: _E, access: int, address: int, size: int, value: int, state: Optional[_T] = None) -> bool:
        """
        Called when memory writes are hooked.
        """
        return True

    def hook_mem_error(self, emu: _E, access: int, address: int, size: int, value: int, state: Optional[_T] = None) -> bool:
        """
        Called when memory errors are hooked.
        """
        try:
            self.map(self.align(address, down=True), self.alloc_size)
        except Exception:
            pass
        return True

    @lru_cache
    def disassembler(self) -> Cs:
        """
        Create a capstone disassembler that matches the emulator's architecture.
        """
        cs_arch, cs_mode = {
            Arch.X32     : (cs.CS_ARCH_X86,   cs.CS_MODE_32),     # noqa
            Arch.X64     : (cs.CS_ARCH_X86,   cs.CS_MODE_64),     # noqa
            Arch.ARM32   : (cs.CS_ARCH_ARM,   cs.CS_MODE_ARM),    # noqa
            Arch.ARM64   : (cs.CS_ARCH_ARM,   cs.CS_MODE_THUMB),  # noqa
            Arch.MIPS16  : (cs.CS_ARCH_MIPS,  cs.CS_MODE_16),     # noqa
            Arch.MIPS32  : (cs.CS_ARCH_MIPS,  cs.CS_MODE_32),     # noqa
            Arch.MIPS64  : (cs.CS_ARCH_MIPS,  cs.CS_MODE_64),     # noqa
            Arch.PPC32   : (cs.CS_ARCH_PPC,   cs.CS_MODE_32),     # noqa
            Arch.PPC64   : (cs.CS_ARCH_PPC,   cs.CS_MODE_64),     # noqa
            Arch.SPARC32 : (cs.CS_ARCH_SPARC, cs.CS_MODE_32),     # noqa
            Arch.SPARC64 : (cs.CS_ARCH_SPARC, cs.CS_MODE_V9),     # noqa
        }[self.exe.arch()]

        cs_mode |= {
            BO.BE: cs.CS_MODE_BIG_ENDIAN,
            BO.LE: cs.CS_MODE_LITTLE_ENDIAN,
        }[self.exe.byte_order()]

        return cs.Cs(cs_arch, cs_mode)

    def general_purpose_registers(self):
        """
        A generator that lists the general purpose registers for the current architecture. The
        implementation is currently incomplete and only has support for the Intel architectures.
        For other architectures, this is an empty generator.
        """
        arch = self.exe.arch()
        regs = []
        if arch is Arch.X32:
            regs = ('eax', 'ebx', 'ecx', 'edx', 'esi', 'edi', 'ebp')
        elif arch is Arch.X64:
            regs = ('rax', 'rbx', 'rcx', 'rdx', 'rsi', 'rdi', 'rbp', 'r8', 'r9', 'r10', 'r11', 'r12', 'r13', 'r14', 'r15')
        for reg in regs:
            yield self._lookup_register(reg)


class RawMetalEmulator(Emulator[_E, _R, _T]):
    """
    The base class for emulators whose engine does not provide any abstraction layer on top of the
    CPU itself. This class implements helper functions to map the associated executable segments
    to memory, and implements the heap and stack.
    """

    stack_base: int
    stack_size: int
    alloc_base: int

    def _map_stack_and_heap(self):
        alloc = self.alloc_size
        limit = 1 << self.exe.pointer_size
        limit = limit - alloc
        image = self.exe.image_defined_address_space()
        upper = self.align(image.upper)
        lower = self.align(image.lower, down=True)
        stack_size = 3 * alloc
        if upper + 5 * alloc < limit:
            self.stack_base = limit - stack_size
            self.alloc_base = upper
        elif lower > 5 * alloc:
            self.stack_base = lower - stack_size
            self.alloc_base = 0
        elif upper + 3 * alloc < limit and lower > 2 * alloc:
            self.stack_base = limit - stack_size
            self.alloc_base = 0
        else:
            raise RuntimeError(
                U'Unable to find sufficient space for heap and stack with '
                F'allocation size of 0x{alloc:X}.')
        self.stack_size = stack_size
        self.map(self.stack_base, self.stack_size)
        self.sp = self.stack_base + self.stack_size - self.alloc_size

    def _map_segments(self):
        exe = self.exe
        img = exe.data
        mem = intervaltree.IntervalTree()
        for segment in exe.segments():
            if not segment.virtual:
                continue
            mem.addi(
                self.align(segment.virtual.lower, down=True),
                self.align(segment.virtual.upper))
        mem.merge_overlaps()
        it: Iterator[Interval] = iter(mem)
        for interval in it:
            self.map(interval.begin, interval.end - interval.begin)
        for segment in exe.segments():
            pm = segment.physical
            vm = segment.virtual
            if len(pm) <= 0:
                continue
            self.mem_write(vm.lower, bytes(img[pm.slice()]))

    def morestack(self):
        self.stack_base -= self.alloc_size
        self.stack_size += self.alloc_size
        self.map(self.stack_base, self.alloc_size)

    def malloc(self, size: int) -> int:
        size = self.align(size)
        self.map(self.alloc_base, size)
        addr = self.alloc_base
        self.alloc_base += size
        return addr


class UnicornEmulator(RawMetalEmulator[Uc, int, _T]):
    """
    A Unicorn-based emulator.
    """

    unicorn: Uc

    def _reset(self):
        uc_arch, uc_mode = {
            Arch.X32     : (uc.UC_ARCH_X86,   uc.UC_MODE_32),     # noqa
            Arch.X64     : (uc.UC_ARCH_X86,   uc.UC_MODE_64),     # noqa
            Arch.ARM32   : (uc.UC_ARCH_ARM,   uc.UC_MODE_ARM),    # noqa
            Arch.ARM64   : (uc.UC_ARCH_ARM,   uc.UC_MODE_THUMB),  # noqa
            Arch.MIPS16  : (uc.UC_ARCH_MIPS,  uc.UC_MODE_16),     # noqa
            Arch.MIPS32  : (uc.UC_ARCH_MIPS,  uc.UC_MODE_32),     # noqa
            Arch.MIPS64  : (uc.UC_ARCH_MIPS,  uc.UC_MODE_64),     # noqa
            Arch.PPC32   : (uc.UC_ARCH_PPC,   uc.UC_MODE_32),     # noqa
            Arch.PPC64   : (uc.UC_ARCH_PPC,   uc.UC_MODE_64),     # noqa
            Arch.SPARC32 : (uc.UC_ARCH_SPARC, uc.UC_MODE_32),     # noqa
            Arch.SPARC64 : (uc.UC_ARCH_SPARC, uc.UC_MODE_V9),     # noqa
        }[self.exe.arch()]

        uc_mode |= {
            BO.BE: uc.UC_MODE_BIG_ENDIAN,
            BO.LE: uc.UC_MODE_LITTLE_ENDIAN,
        }[self.exe.byte_order()]

        self.unicorn = uc.Uc(uc_arch, uc_mode)

        self._map_segments()
        self._map_stack_and_heap()

        for hook, flag, callback in [
            (uc.UC_HOOK_CODE,           Hook.CodeExecute, self.hook_code_execute ),  # noqa
            (uc.UC_HOOK_INSN_INVALID,   Hook.CodeError,   self.hook_code_error   ),  # noqa
            (uc.UC_HOOK_MEM_READ_AFTER, Hook.MemoryRead,  self.hook_mem_read     ),  # noqa
            (uc.UC_HOOK_MEM_WRITE,      Hook.MemoryWrite, self.hook_mem_write    ),  # noqa
            (uc.UC_HOOK_MEM_INVALID,    Hook.MemoryError, self.hook_mem_error    ),  # noqa
        ]:
            if self.hooked(flag):
                self.unicorn.hook_add(hook, callback, user_data=self.state)

    def _init(self):
        self._reg_by_name: Dict[str, Register[int]] = {}
        self._reg_by_code: Dict[int, Register[int]] = {}
        for module in [
            uc.x86_const,
            uc.arm_const,
            uc.sparc_const,
            uc.mips_const,
        ]:
            md: Dict[str, Any] = module.__dict__
            for name, code in md.items():
                try:
                    u, *_, kind, name = name.split('_')
                except Exception:
                    continue
                if kind != 'REG' or u != 'UC':
                    continue
                name = name.casefold()
                reg = Register(name, code)
                self._reg_by_name[name] = reg
                self._reg_by_code[code] = reg

    def _emulate(self, start: int, end: Optional[int] = None):
        if end is None:
            end = self.exe.location_from_address(start).virtual.box.upper
        try:
            self.unicorn.emu_start(start, end)
        except uc.UcError as E:
            raise EmulationError(*E.args) from E

    def halt(self):
        self.unicorn.emu_stop()

    def _lookup_register(self, var: Union[str, int]) -> Register[int]:
        reg = None
        if isinstance(var, str):
            reg = self._reg_by_name[var.casefold()]
        if isinstance(var, int):
            reg = self._reg_by_code[var]
        if reg is None:
            raise TypeError(var)
        if reg.size is None:
            reg.size = self.measure_register_size(reg.code)
        return reg

    def _map(self, address: int, size: int):
        return self.unicorn.mem_map(address, size)

    def _set_register(self, reg: int, value: int) -> None:
        return self.unicorn.reg_write(reg, value)

    def _get_register(self, reg: int) -> int:
        return self.unicorn.reg_read(reg)

    def mem_write(self, address: int, data: bytes):
        return self.unicorn.mem_write(address, data)

    def mem_read(self, address: int, size: int):
        return self.unicorn.mem_read(address, size)


class IcicleEmulator(RawMetalEmulator[Ic, str, _T]):
    """
    An Icicle-based emulator. Icicle is a more recent emulator engine and not yet as mature as
    Unicorn. There are some compelling arguments for its robustness, but with the current
    interface it is completely lacking any memory write hook support, which makes it difficult
    to use for most of our applications. See also the [Icicle paper][ICE].

    [ICE]: https://arxiv.org/pdf/2301.13346
    """

    icicle: Ic

    def _init(self):
        ...

    def _reset(self):
        exe = self.exe

        try:
            arch = {
                Arch.X32   : 'i686',
                Arch.X64   : 'x86_64',
            }[exe.arch()]
        except KeyError:
            arch = None
        if arch not in ic.architectures():
            raise NotImplementedError(F'Icicle cannot handle executables of arch {exe.arch().name}')

        if self.hooks & Hook.Memory:
            raise NotImplementedError(U'Icicle does not support memory hooks yet.')

        self.icicle = ice = ic.Icicle(arch)
        self.regmap = {reg.casefold(): val[1] for reg, val in ice.reg_list().items()}

        self._map_segments()
        self._map_stack_and_heap()

    def _emulate(self, start: int, end: Optional[int] = None):
        dasm = self.disassembler()
        code = False
        RS = ic.RunStatus
        emu = self.icicle

        if self.hooked(Hook.CodeExecute):
            code = True
            step = partial(emu.step, 1)
        elif end is not None:
            step = partial(emu.run_until, end)
        else:
            step = emu.run

        self.ip = ip = start

        while True:
            if code:
                op = next(dasm.disasm(self.exe[ip:ip + 12], ip, 1))
                self.hook_code_execute(emu, ip, op._raw.size, self.state)

            status = step()

            if status == RS.InstructionLimit:
                ip = self.ip
                continue

            if status in (
                RS.Breakpoint,
                RS.Halt,
                RS.Killed,
            ):
                break
            if status == RS.UnhandledException:
                raise EmulationError(emu.exception_code.name)
            if status != RS.Running:
                raise EmulationError(status.name)

    def halt(self):
        self.icicle.add_breakpoint(self.ip)

    def _lookup_register(self, var: str) -> Register[str]:
        name = var.casefold()
        size = self.regmap[name]
        return Register(name, name, size)

    def _map(self, address: int, size: int):
        MP = ic.MemoryProtection
        if self.hooked(Hook.MemoryRead):
            perm = MP.ExecuteRead
        elif self.hooked(Hook.MemoryWrite):
            perm = MP.ExecuteRead
        else:
            perm = MP.ExecuteReadWrite
        return self.icicle.mem_map(address, size, perm)

    def _set_register(self, reg: str, value: int) -> None:
        return self.icicle.reg_write(reg, value)

    def _get_register(self, reg: str) -> int:
        return self.icicle.reg_read(reg)

    def mem_write(self, address: int, data: bytes):
        return self.icicle.mem_write(address, data)

    def mem_read(self, address: int, size: int):
        return self.icicle.mem_read(address, size)


class SpeakeasyEmulator(Emulator[Se, str, _T]):
    """
    A Speakeasy-based emulator. Speakeasy only supports PE files, but it has support for several
    Windows API routines which can be an advantage.
    """

    speakeasy: Se

    def _init(self):
        self._regs: Dict[str, Register[str]] = {}

    def _reset(self):
        exe = self.exe
        if exe.type not in (ET.PE, ET.BLOB):
            raise NotImplementedError(F'Speakeasy cannot handle executables of type {exe.type.name}.')
        try:
            arch = {
                Arch.X32: 'x86',
                Arch.X64: 'x64',
            }[exe.arch()]
        except KeyError as KE:
            raise NotImplementedError(F'Speakeasy cannot handle executables of arch {exe.arch().name}') from KE

        emu = self.speakeasy = se.Speakeasy()

        with VirtualFileSystem() as vfs:
            db = bytes(exe.data)
            vf = vfs.new(db)
            if exe.blob:
                self.base = emu.load_shellcode(vf.path, data=db, arch=arch)
            else:
                self.base = emu.load_module(vf.path, data=db).get_base()

        emu.emu.timeout = 0

        if self.hooked(Hook.CodeExecute):
            emu.add_code_hook(self.hook_code_execute, ctx=self.state)

        if self.hooked(Hook.MemoryRead):
            emu.add_mem_read_hook(self.hook_mem_read)

        if self.hooked(Hook.MemoryWrite):
            emu.add_mem_write_hook(self.hook_mem_write)

        if self.hooked(Hook.MemoryError):
            emu.add_mem_invalid_hook(self.hook_mem_error)

    @property
    def stack_region(self):
        emu = self.speakeasy
        tos = self.sp
        mms: List[MemMap] = emu.get_mem_maps()
        if tos != emu.get_stack_ptr():
            raise EmulationError('Unexpected stack pointer misalignment')
        try:
            sm, = (mm for mm in mms if tos in range(mm.base, mm.base + mm.size))
        except Exception:
            raise EmulationError('Ambiguous memory, unable to locate the stack.')
        return sm

    @property
    def stack_base(self):
        return self.stack_region.base

    @stack_base.setter
    def stack_base(self, value):
        raise AttributeError

    @property
    def stack_size(self):
        return self.stack_region.size

    @stack_size.setter
    def stack_size(self, value):
        raise AttributeError

    def malloc(self, size: int) -> int:
        return self.speakeasy.mem_alloc(size)

    def morestack(self):
        spksy = self.speakeasy
        stack = self.stack_region
        base = stack.base - self.alloc_size
        spksy.emu.mem_map(self.alloc_size, base)
        stack.base = base
        stack.size = stack.size + self.alloc_size

    def _emulate(self, start: int, end: Optional[int] = None):
        emu = self.speakeasy

        def stackfix(emu, address: int, size: int, ctx: list):
            if not ctx:
                stack = self.stack_region
                self.sp = stack.base + stack.size // 3
                ctx.append(True)
            return True

        emu.add_code_hook(stackfix, start, start, ctx=[])

        if end is not None:
            def _terminate(*_):
                emu.stop()
            emu.add_code_hook(_terminate, end, end + 1)

        if self.exe.blob:
            return emu.run_shellcode(start)
        else:
            return emu.call(start)

    def halt(self):
        return self.speakeasy.stop()

    def _set_register(self, register: str, v: int):
        return self.speakeasy.reg_write(register, v)

    def _get_register(self, register: str) -> int:
        return self.speakeasy.reg_read(register)

    def _lookup_register(self, var: str) -> Register[str]:
        try:
            reg = self._regs[var]
        except KeyError:
            try:
                size = self.measure_register_size(var)
            except Exception:
                raise LookupError(var)
            else:
                reg = self._regs[var] = Register(var, var, size)
        return reg

    def _map(self, address: int, size: int):
        spksy = self.speakeasy
        alloc = spksy.mem_alloc(size, address)
        if alloc != address:
            spksy.mem_free(alloc)
            alloc = spksy.emu.mem_map(size, address)
        if alloc != address:
            raise LookupError(F'Unable to allocate {size} bytes at address 0x{address:X}')
        return alloc

    def mem_write(self, address: int, data: bytes):
        return self.speakeasy.mem_write(address, data)

    def mem_read(self, address: int, size: int):
        return self.speakeasy.mem_read(address, size)

Classes

class MissingModule (name, dist=None)

This class can wrap a module import that is currently missing. If any attribute of the missing module is accessed, it raises RefineryImportMissing.

Expand source code Browse git
class MissingModule:
    """
    This class can wrap a module import that is currently missing. If any attribute of the missing
    module is accessed, it raises `refinery.units.RefineryImportMissing`.
    """
    def __init__(self, name, dist=None):
        self.name = name
        self.dist = dist or name

    def __getattr__(self, key: str):
        if key.startswith('__') and key.endswith('__'):
            raise AttributeError(key)
        raise RefineryImportMissing(self.name, self.dist)
class EmulationError (*args, **kwargs)

Base class for any exceptions raised by emulators.

Expand source code Browse git
class EmulationError(Exception):
    """
    Base class for any exceptions raised by emulators.
    """
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class Register (name, code, size=0)

Represents an arbitrary CPU register.

Expand source code Browse git
class Register(Generic[_R]):
    """
    Represents an arbitrary CPU register.
    """
    __slots__ = (
        'name',
        'code',
        'size',
    )
    name: str
    """
    This is the common name of the register, like "eax" on x86.
    """
    code: _R
    """
    The code of a register is any emulator-specific internal identifier for the register.
    """
    size: Optional[int]
    """
    If not `None`, this property contains the size of the register in bytes.
    """

    def __init__(self, name: str, code: _R, size: Optional[int] = 0):
        self.name = name
        self.code = code
        self.size = size

    def __eq__(self, other: Register):
        return self.code == other.code and self.size == other.size

    def __hash__(self):
        return hash((self.code, self.size))

Ancestors

  • typing.Generic

Instance variables

var code

The code of a register is any emulator-specific internal identifier for the register.

var name

This is the common name of the register, like "eax" on x86.

var size

If not None, this property contains the size of the register in bytes.

class Hook (value, names=None, *, module=None, qualname=None, type=None, start=1)

A bit mask flag for the types of hooks that are requested from an emulator.

Expand source code Browse git
class Hook(IntFlag):
    """
    A bit mask flag for the types of hooks that are requested from an emulator.
    """
    CodeExecute  = 0b000_00001  # noqa
    CodeError    = 0b000_00010  # noqa
    MemoryRead   = 0b000_00100  # noqa
    MemoryWrite  = 0b000_01000  # noqa
    MemoryError  = 0b000_10000  # noqa

    OnlyErrors   = 0b000_10010  # noqa
    Everything   = 0b111_11111  # noqa
    Nothing      = 0b000_00000  # noqa
    MemoryAccess = 0b000_01100  # noqa
    Memory       = 0b000_11100  # noqa
    NoErrors     = 0b000_01101  # noqa

Ancestors

  • enum.IntFlag
  • builtins.int
  • enum.Flag
  • enum.Enum

Class variables

var CodeExecute
var CodeError
var MemoryRead
var MemoryWrite
var MemoryError
var OnlyErrors
var Everything
var Nothing
var MemoryAccess
var Memory
var NoErrors
class Emulator (data, base=None, arch=None, hooks=Hook.OnlyErrors, align_size=4096, alloc_size=4096)

The emulator base class.

Expand source code Browse git
class Emulator(ABC, Generic[_E, _R, _T]):
    """
    The emulator base class.
    """

    def __init__(
        self,
        data: Union[Executable, bytes, bytearray, memoryview],
        base: Optional[int] = None,
        arch: Optional[Arch] = None,
        hooks: Hook = Hook.OnlyErrors,
        align_size: int = 0x1000,
        alloc_size: int = 0x1000,
    ):
        if isinstance(data, Executable):
            exe = data
        try:
            exe = Executable.Load(data)
        except ValueError:
            exe = ExecutableCodeBlob(data, base, arch)
            raw = True
        else:
            raw = False

        self.exe = exe
        self.raw = raw

        self.hooks = hooks
        self.base = exe.base

        self.align_size = align_size
        self.alloc_size = alloc_size
        self._resetonce = False

        self._sp, self._ip, self._rv = {
            Arch.PPC32   : ('1',   'pc',  '3'  ), # noqa
            Arch.PPC64   : ('1',   'pc',  '3'  ), # noqa
            Arch.X32     : ('esp', 'eip', 'eax'), # noqa
            Arch.X64     : ('rsp', 'rip', 'rax'), # noqa
            Arch.ARM32   : ('sp',  'ip',  'r0' ), # noqa
            Arch.ARM64   : ('sp',  'ip',  'r0' ), # noqa
            Arch.MIPS16  : ('sp',  'pc',  '0'  ), # noqa
            Arch.MIPS32  : ('sp',  'pc',  'v0' ), # noqa
            Arch.MIPS64  : ('sp',  'pc',  'v0' ), # noqa
            Arch.SPARC32 : ('sp',  'pc',  'o0' ), # noqa
            Arch.SPARC64 : ('sp',  'pc',  'o0' ), # noqa
        }[exe.arch()]

        self._init()

    @cached_property
    def _reg_sp(self):
        return self._lookup_register(self._sp).code

    @cached_property
    def _reg_ip(self):
        return self._lookup_register(self._ip).code

    @cached_property
    def _reg_rv(self):
        return self._lookup_register(self._rv).code

    def hooked(self, hook: Hook) -> bool:
        """
        Return whether the given hook is (supposed to be) set.
        """
        return self.hooks & hook == hook

    def reset(self, state: Optional[_T] = None):
        """
        This function resets the emulator to an initial state. This will create a new instance of
        the underlying emulator engine, map the input executable to memory, and install any of the
        requested hooks.
        """
        self._resetonce = True
        self._memorymap = intervaltree.IntervalTree()
        self.state = state
        self._reset()

    def emulate(self, start: int, end: Optional[int] = None):
        """
        Call this function to begin emulation. The `start` parameter is the address where execution
        should begin, the `end` parameter is an optional address to halt at.
        """
        if not self._resetonce:
            self.reset()
        exe = self.exe
        start = start - exe.base + self.base
        if end is not None:
            end = end - exe.base + self.base
        return self._emulate(start, end)

    @abstractmethod
    def _reset(self):
        """
        Called as part of `refinery.lib.emulator.Emulator.reset`.
        """
        ...

    def _init(self):
        """
        Called at the very end of the object initializer. Can be overridden by child classes to
        initialize variables that do not depend on the emulator engine to be ready.
        """
        pass

    @abstractmethod
    def _emulate(self, start: int, end: Optional[int] = None):
        """
        This is the tail call of `refinery.lib.emulator.Emulator.emulate`.
        """
        ...

    @abstractmethod
    def halt(self):
        """
        Causes the emulation to halt, usually when called from a hook.
        """
        ...

    @abstractmethod
    def _set_register(self, register: _R, v: int):
        """
        Called as part of `refinery.lib.emulator.Emulator.set_register`.
        """
        ...

    @abstractmethod
    def _get_register(self, register: _R) -> int:
        """
        Called as part of `refinery.lib.emulator.Emulator.get_register`.
        """
        ...

    @abstractmethod
    def _lookup_register(self, var: Union[_R, int]) -> Register[_R]:
        """
        Called as part of `refinery.lib.emulator.Emulator.lookup_register`.
        """
        ...

    @abstractmethod
    def _map(self, address: int, size: int):
        """
        Called as part of `refinery.lib.emulator.Emulator.map`.
        """
        ...

    @abstractmethod
    def mem_write(self, address: int, data: bytes):
        """
        Write data to already mapped memory.
        """
        ...

    @abstractmethod
    def mem_read(self, address: int, size: int) -> bytes:
        """
        Read data from the emulator's mapped memory.
        """
        ...

    @abstractmethod
    def malloc(self, size: int) -> int:
        """
        Allocate (i.e. map) the given amount of memory in the emulator's memory space.
        """
        ...

    @abstractmethod
    def morestack(self):
        """
        Allocate more memory for the stack to grow into.
        """
        ...

    def lookup_register(self, var: Union[str, _R, Register[_R]]):
        """
        Return the `refinery.lib.emulator.Register` for the given name or code. `Register` type
        inputs are passed through unaltered.
        """
        if isinstance(var, Register):
            return var
        return self._lookup_register(var)

    def map(self, address: int, size: int):
        """
        Map memory of the given size at the given address. This function does not fail when part
        of the memory is already mapped; it will instead map only the missing pieces.
        """
        lower = address
        upper = address + size
        for interval in self._memorymap.overlap(lower, upper):
            interval: Interval
            lower = interval.begin if lower < interval.begin else interval.end
            upper = interval.end if upper > interval.end else interval.begin
            if lower >= upper:
                return
        self._memorymap.addi(lower, upper)
        self._map(lower, upper - lower)

    @property
    def sp(self):
        """
        The stack pointer.
        """
        return self.get_register(self._reg_sp)

    @sp.setter
    def sp(self, value: int):
        return self.set_register(self._reg_sp, value)

    @property
    def rv(self):
        """
        The return value.
        """
        return self.get_register(self._reg_rv)

    @rv.setter
    def rv(self, value: int):
        return self.set_register(self._reg_rv, value)

    @property
    def ip(self):
        """
        The instruction pointer.
        """
        return self.get_register(self._reg_ip)

    @ip.setter
    def ip(self, value: int):
        return self.set_register(self._reg_ip, value)

    def measure_register_size(self, reg: _R) -> int:
        """
        Measures the size of a register by writing a very large number to it with all bits set,
        subsequently reading the register value, and counting the number of bits in the
        measurement. Props for this one go to Matthieu Walter who originally proposed it as a
        joke; I have not found a better way to do this for uncooperative emulators.
        """
        val = self._get_register(reg)
        self._set_register(reg, (1 << 512) - 1)
        q, r = divmod(self._get_register(reg).bit_length(), 8)
        assert r == 0
        self._set_register(reg, val)
        return q

    def push(self, val: int, size: Optional[int] = None):
        """
        Push the given integer value to the stack. If the `size` parameter is missing, the function
        will push a machine word sized value.
        """
        if size is None:
            size = self.exe.pointer_size // 8
        tos = self.sp - size
        for already_retried_once in (False, True):
            try:
                self.mem_write(tos, val.to_bytes(size, self.exe.byte_order().value))
            except Exception:
                if already_retried_once:
                    raise
                self.morestack()
            else:
                break

    def pop(self, size: Optional[int] = None):
        """
        Pop an integer value from the stack. If the `size` parameter is missing, the function will
        pop a machine word sized value.
        """
        if size is None:
            size = self.exe.pointer_size // 8
        sp = self.sp
        sv = int.from_bytes(self.mem_read(sp, size), self.exe.byte_order().value)
        self.sp = sp + size
        return sv

    def push_register(self, reg: Union[int, str, Register[_R]]):
        """
        Push the contents of the given register to the stack.
        """
        reg = self.lookup_register(reg)
        val = self.get_register(reg.code)
        self.push(val, reg.size)

    def align(self, value, down=False):
        """
        Align the given value according to the emulator's alignment setting. If the `down` parameter
        is set, it will return the nearest lower address instead of the nearest higher one.
        """
        return align(self.align_size, value, down=down)

    def set_register(self, register: Union[int, str, Register[_R]], value: int):
        """
        Write the given value to the given CPU register.
        """
        register = self.lookup_register(register)
        return self._set_register(register.code, value)

    def get_register(self, register: Union[int, str, Register[_R]]) -> int:
        """
        Read the contents of the given CPU register.
        """
        register = self.lookup_register(register)
        return self._get_register(register.code)

    def hook_code_execute(self, emu: _E, address: int, size: int, state: Optional[_T] = None) -> bool:
        """
        Called when code execution is hooked.
        """
        return True

    def hook_code_error(self, emu: _E, state: Optional[_T] = None) -> bool:
        """
        Called when code errors are hooked.
        """
        self.halt()
        return False

    def hook_mem_read(self, emu: _E, access: int, address: int, size: int, value: int, state: Optional[_T] = None) -> bool:
        """
        Called when memory reads are hooked.
        """
        return True

    def hook_mem_write(self, emu: _E, access: int, address: int, size: int, value: int, state: Optional[_T] = None) -> bool:
        """
        Called when memory writes are hooked.
        """
        return True

    def hook_mem_error(self, emu: _E, access: int, address: int, size: int, value: int, state: Optional[_T] = None) -> bool:
        """
        Called when memory errors are hooked.
        """
        try:
            self.map(self.align(address, down=True), self.alloc_size)
        except Exception:
            pass
        return True

    @lru_cache
    def disassembler(self) -> Cs:
        """
        Create a capstone disassembler that matches the emulator's architecture.
        """
        cs_arch, cs_mode = {
            Arch.X32     : (cs.CS_ARCH_X86,   cs.CS_MODE_32),     # noqa
            Arch.X64     : (cs.CS_ARCH_X86,   cs.CS_MODE_64),     # noqa
            Arch.ARM32   : (cs.CS_ARCH_ARM,   cs.CS_MODE_ARM),    # noqa
            Arch.ARM64   : (cs.CS_ARCH_ARM,   cs.CS_MODE_THUMB),  # noqa
            Arch.MIPS16  : (cs.CS_ARCH_MIPS,  cs.CS_MODE_16),     # noqa
            Arch.MIPS32  : (cs.CS_ARCH_MIPS,  cs.CS_MODE_32),     # noqa
            Arch.MIPS64  : (cs.CS_ARCH_MIPS,  cs.CS_MODE_64),     # noqa
            Arch.PPC32   : (cs.CS_ARCH_PPC,   cs.CS_MODE_32),     # noqa
            Arch.PPC64   : (cs.CS_ARCH_PPC,   cs.CS_MODE_64),     # noqa
            Arch.SPARC32 : (cs.CS_ARCH_SPARC, cs.CS_MODE_32),     # noqa
            Arch.SPARC64 : (cs.CS_ARCH_SPARC, cs.CS_MODE_V9),     # noqa
        }[self.exe.arch()]

        cs_mode |= {
            BO.BE: cs.CS_MODE_BIG_ENDIAN,
            BO.LE: cs.CS_MODE_LITTLE_ENDIAN,
        }[self.exe.byte_order()]

        return cs.Cs(cs_arch, cs_mode)

    def general_purpose_registers(self):
        """
        A generator that lists the general purpose registers for the current architecture. The
        implementation is currently incomplete and only has support for the Intel architectures.
        For other architectures, this is an empty generator.
        """
        arch = self.exe.arch()
        regs = []
        if arch is Arch.X32:
            regs = ('eax', 'ebx', 'ecx', 'edx', 'esi', 'edi', 'ebp')
        elif arch is Arch.X64:
            regs = ('rax', 'rbx', 'rcx', 'rdx', 'rsi', 'rdi', 'rbp', 'r8', 'r9', 'r10', 'r11', 'r12', 'r13', 'r14', 'r15')
        for reg in regs:
            yield self._lookup_register(reg)

Ancestors

  • abc.ABC
  • typing.Generic

Subclasses

Instance variables

var sp

The stack pointer.

Expand source code Browse git
@property
def sp(self):
    """
    The stack pointer.
    """
    return self.get_register(self._reg_sp)
var rv

The return value.

Expand source code Browse git
@property
def rv(self):
    """
    The return value.
    """
    return self.get_register(self._reg_rv)
var ip

The instruction pointer.

Expand source code Browse git
@property
def ip(self):
    """
    The instruction pointer.
    """
    return self.get_register(self._reg_ip)

Methods

def hooked(self, hook)

Return whether the given hook is (supposed to be) set.

Expand source code Browse git
def hooked(self, hook: Hook) -> bool:
    """
    Return whether the given hook is (supposed to be) set.
    """
    return self.hooks & hook == hook
def reset(self, state=None)

This function resets the emulator to an initial state. This will create a new instance of the underlying emulator engine, map the input executable to memory, and install any of the requested hooks.

Expand source code Browse git
def reset(self, state: Optional[_T] = None):
    """
    This function resets the emulator to an initial state. This will create a new instance of
    the underlying emulator engine, map the input executable to memory, and install any of the
    requested hooks.
    """
    self._resetonce = True
    self._memorymap = intervaltree.IntervalTree()
    self.state = state
    self._reset()
def emulate(self, start, end=None)

Call this function to begin emulation. The start parameter is the address where execution should begin, the end parameter is an optional address to halt at.

Expand source code Browse git
def emulate(self, start: int, end: Optional[int] = None):
    """
    Call this function to begin emulation. The `start` parameter is the address where execution
    should begin, the `end` parameter is an optional address to halt at.
    """
    if not self._resetonce:
        self.reset()
    exe = self.exe
    start = start - exe.base + self.base
    if end is not None:
        end = end - exe.base + self.base
    return self._emulate(start, end)
def halt(self)

Causes the emulation to halt, usually when called from a hook.

Expand source code Browse git
@abstractmethod
def halt(self):
    """
    Causes the emulation to halt, usually when called from a hook.
    """
    ...
def mem_write(self, address, data)

Write data to already mapped memory.

Expand source code Browse git
@abstractmethod
def mem_write(self, address: int, data: bytes):
    """
    Write data to already mapped memory.
    """
    ...
def mem_read(self, address, size)

Read data from the emulator's mapped memory.

Expand source code Browse git
@abstractmethod
def mem_read(self, address: int, size: int) -> bytes:
    """
    Read data from the emulator's mapped memory.
    """
    ...
def malloc(self, size)

Allocate (i.e. map) the given amount of memory in the emulator's memory space.

Expand source code Browse git
@abstractmethod
def malloc(self, size: int) -> int:
    """
    Allocate (i.e. map) the given amount of memory in the emulator's memory space.
    """
    ...
def morestack(self)

Allocate more memory for the stack to grow into.

Expand source code Browse git
@abstractmethod
def morestack(self):
    """
    Allocate more memory for the stack to grow into.
    """
    ...
def lookup_register(self, var)

Return the Register for the given name or code. Register type inputs are passed through unaltered.

Expand source code Browse git
def lookup_register(self, var: Union[str, _R, Register[_R]]):
    """
    Return the `refinery.lib.emulator.Register` for the given name or code. `Register` type
    inputs are passed through unaltered.
    """
    if isinstance(var, Register):
        return var
    return self._lookup_register(var)
def map(self, address, size)

Map memory of the given size at the given address. This function does not fail when part of the memory is already mapped; it will instead map only the missing pieces.

Expand source code Browse git
def map(self, address: int, size: int):
    """
    Map memory of the given size at the given address. This function does not fail when part
    of the memory is already mapped; it will instead map only the missing pieces.
    """
    lower = address
    upper = address + size
    for interval in self._memorymap.overlap(lower, upper):
        interval: Interval
        lower = interval.begin if lower < interval.begin else interval.end
        upper = interval.end if upper > interval.end else interval.begin
        if lower >= upper:
            return
    self._memorymap.addi(lower, upper)
    self._map(lower, upper - lower)
def measure_register_size(self, reg)

Measures the size of a register by writing a very large number to it with all bits set, subsequently reading the register value, and counting the number of bits in the measurement. Props for this one go to Matthieu Walter who originally proposed it as a joke; I have not found a better way to do this for uncooperative emulators.

Expand source code Browse git
def measure_register_size(self, reg: _R) -> int:
    """
    Measures the size of a register by writing a very large number to it with all bits set,
    subsequently reading the register value, and counting the number of bits in the
    measurement. Props for this one go to Matthieu Walter who originally proposed it as a
    joke; I have not found a better way to do this for uncooperative emulators.
    """
    val = self._get_register(reg)
    self._set_register(reg, (1 << 512) - 1)
    q, r = divmod(self._get_register(reg).bit_length(), 8)
    assert r == 0
    self._set_register(reg, val)
    return q
def push(self, val, size=None)

Push the given integer value to the stack. If the size parameter is missing, the function will push a machine word sized value.

Expand source code Browse git
def push(self, val: int, size: Optional[int] = None):
    """
    Push the given integer value to the stack. If the `size` parameter is missing, the function
    will push a machine word sized value.
    """
    if size is None:
        size = self.exe.pointer_size // 8
    tos = self.sp - size
    for already_retried_once in (False, True):
        try:
            self.mem_write(tos, val.to_bytes(size, self.exe.byte_order().value))
        except Exception:
            if already_retried_once:
                raise
            self.morestack()
        else:
            break
def pop(self, size=None)

Pop an integer value from the stack. If the size parameter is missing, the function will pop a machine word sized value.

Expand source code Browse git
def pop(self, size: Optional[int] = None):
    """
    Pop an integer value from the stack. If the `size` parameter is missing, the function will
    pop a machine word sized value.
    """
    if size is None:
        size = self.exe.pointer_size // 8
    sp = self.sp
    sv = int.from_bytes(self.mem_read(sp, size), self.exe.byte_order().value)
    self.sp = sp + size
    return sv
def push_register(self, reg)

Push the contents of the given register to the stack.

Expand source code Browse git
def push_register(self, reg: Union[int, str, Register[_R]]):
    """
    Push the contents of the given register to the stack.
    """
    reg = self.lookup_register(reg)
    val = self.get_register(reg.code)
    self.push(val, reg.size)
def align(self, value, down=False)

Align the given value according to the emulator's alignment setting. If the down parameter is set, it will return the nearest lower address instead of the nearest higher one.

Expand source code Browse git
def align(self, value, down=False):
    """
    Align the given value according to the emulator's alignment setting. If the `down` parameter
    is set, it will return the nearest lower address instead of the nearest higher one.
    """
    return align(self.align_size, value, down=down)
def set_register(self, register, value)

Write the given value to the given CPU register.

Expand source code Browse git
def set_register(self, register: Union[int, str, Register[_R]], value: int):
    """
    Write the given value to the given CPU register.
    """
    register = self.lookup_register(register)
    return self._set_register(register.code, value)
def get_register(self, register)

Read the contents of the given CPU register.

Expand source code Browse git
def get_register(self, register: Union[int, str, Register[_R]]) -> int:
    """
    Read the contents of the given CPU register.
    """
    register = self.lookup_register(register)
    return self._get_register(register.code)
def hook_code_execute(self, emu, address, size, state=None)

Called when code execution is hooked.

Expand source code Browse git
def hook_code_execute(self, emu: _E, address: int, size: int, state: Optional[_T] = None) -> bool:
    """
    Called when code execution is hooked.
    """
    return True
def hook_code_error(self, emu, state=None)

Called when code errors are hooked.

Expand source code Browse git
def hook_code_error(self, emu: _E, state: Optional[_T] = None) -> bool:
    """
    Called when code errors are hooked.
    """
    self.halt()
    return False
def hook_mem_read(self, emu, access, address, size, value, state=None)

Called when memory reads are hooked.

Expand source code Browse git
def hook_mem_read(self, emu: _E, access: int, address: int, size: int, value: int, state: Optional[_T] = None) -> bool:
    """
    Called when memory reads are hooked.
    """
    return True
def hook_mem_write(self, emu, access, address, size, value, state=None)

Called when memory writes are hooked.

Expand source code Browse git
def hook_mem_write(self, emu: _E, access: int, address: int, size: int, value: int, state: Optional[_T] = None) -> bool:
    """
    Called when memory writes are hooked.
    """
    return True
def hook_mem_error(self, emu, access, address, size, value, state=None)

Called when memory errors are hooked.

Expand source code Browse git
def hook_mem_error(self, emu: _E, access: int, address: int, size: int, value: int, state: Optional[_T] = None) -> bool:
    """
    Called when memory errors are hooked.
    """
    try:
        self.map(self.align(address, down=True), self.alloc_size)
    except Exception:
        pass
    return True
def disassembler(self)

Create a capstone disassembler that matches the emulator's architecture.

Expand source code Browse git
@lru_cache
def disassembler(self) -> Cs:
    """
    Create a capstone disassembler that matches the emulator's architecture.
    """
    cs_arch, cs_mode = {
        Arch.X32     : (cs.CS_ARCH_X86,   cs.CS_MODE_32),     # noqa
        Arch.X64     : (cs.CS_ARCH_X86,   cs.CS_MODE_64),     # noqa
        Arch.ARM32   : (cs.CS_ARCH_ARM,   cs.CS_MODE_ARM),    # noqa
        Arch.ARM64   : (cs.CS_ARCH_ARM,   cs.CS_MODE_THUMB),  # noqa
        Arch.MIPS16  : (cs.CS_ARCH_MIPS,  cs.CS_MODE_16),     # noqa
        Arch.MIPS32  : (cs.CS_ARCH_MIPS,  cs.CS_MODE_32),     # noqa
        Arch.MIPS64  : (cs.CS_ARCH_MIPS,  cs.CS_MODE_64),     # noqa
        Arch.PPC32   : (cs.CS_ARCH_PPC,   cs.CS_MODE_32),     # noqa
        Arch.PPC64   : (cs.CS_ARCH_PPC,   cs.CS_MODE_64),     # noqa
        Arch.SPARC32 : (cs.CS_ARCH_SPARC, cs.CS_MODE_32),     # noqa
        Arch.SPARC64 : (cs.CS_ARCH_SPARC, cs.CS_MODE_V9),     # noqa
    }[self.exe.arch()]

    cs_mode |= {
        BO.BE: cs.CS_MODE_BIG_ENDIAN,
        BO.LE: cs.CS_MODE_LITTLE_ENDIAN,
    }[self.exe.byte_order()]

    return cs.Cs(cs_arch, cs_mode)
def general_purpose_registers(self)

A generator that lists the general purpose registers for the current architecture. The implementation is currently incomplete and only has support for the Intel architectures. For other architectures, this is an empty generator.

Expand source code Browse git
def general_purpose_registers(self):
    """
    A generator that lists the general purpose registers for the current architecture. The
    implementation is currently incomplete and only has support for the Intel architectures.
    For other architectures, this is an empty generator.
    """
    arch = self.exe.arch()
    regs = []
    if arch is Arch.X32:
        regs = ('eax', 'ebx', 'ecx', 'edx', 'esi', 'edi', 'ebp')
    elif arch is Arch.X64:
        regs = ('rax', 'rbx', 'rcx', 'rdx', 'rsi', 'rdi', 'rbp', 'r8', 'r9', 'r10', 'r11', 'r12', 'r13', 'r14', 'r15')
    for reg in regs:
        yield self._lookup_register(reg)
class RawMetalEmulator (data, base=None, arch=None, hooks=Hook.OnlyErrors, align_size=4096, alloc_size=4096)

The base class for emulators whose engine does not provide any abstraction layer on top of the CPU itself. This class implements helper functions to map the associated executable segments to memory, and implements the heap and stack.

Expand source code Browse git
class RawMetalEmulator(Emulator[_E, _R, _T]):
    """
    The base class for emulators whose engine does not provide any abstraction layer on top of the
    CPU itself. This class implements helper functions to map the associated executable segments
    to memory, and implements the heap and stack.
    """

    stack_base: int
    stack_size: int
    alloc_base: int

    def _map_stack_and_heap(self):
        alloc = self.alloc_size
        limit = 1 << self.exe.pointer_size
        limit = limit - alloc
        image = self.exe.image_defined_address_space()
        upper = self.align(image.upper)
        lower = self.align(image.lower, down=True)
        stack_size = 3 * alloc
        if upper + 5 * alloc < limit:
            self.stack_base = limit - stack_size
            self.alloc_base = upper
        elif lower > 5 * alloc:
            self.stack_base = lower - stack_size
            self.alloc_base = 0
        elif upper + 3 * alloc < limit and lower > 2 * alloc:
            self.stack_base = limit - stack_size
            self.alloc_base = 0
        else:
            raise RuntimeError(
                U'Unable to find sufficient space for heap and stack with '
                F'allocation size of 0x{alloc:X}.')
        self.stack_size = stack_size
        self.map(self.stack_base, self.stack_size)
        self.sp = self.stack_base + self.stack_size - self.alloc_size

    def _map_segments(self):
        exe = self.exe
        img = exe.data
        mem = intervaltree.IntervalTree()
        for segment in exe.segments():
            if not segment.virtual:
                continue
            mem.addi(
                self.align(segment.virtual.lower, down=True),
                self.align(segment.virtual.upper))
        mem.merge_overlaps()
        it: Iterator[Interval] = iter(mem)
        for interval in it:
            self.map(interval.begin, interval.end - interval.begin)
        for segment in exe.segments():
            pm = segment.physical
            vm = segment.virtual
            if len(pm) <= 0:
                continue
            self.mem_write(vm.lower, bytes(img[pm.slice()]))

    def morestack(self):
        self.stack_base -= self.alloc_size
        self.stack_size += self.alloc_size
        self.map(self.stack_base, self.alloc_size)

    def malloc(self, size: int) -> int:
        size = self.align(size)
        self.map(self.alloc_base, size)
        addr = self.alloc_base
        self.alloc_base += size
        return addr

Ancestors

Subclasses

Class variables

var stack_base
var stack_size
var alloc_base

Inherited members

class UnicornEmulator (data, base=None, arch=None, hooks=Hook.OnlyErrors, align_size=4096, alloc_size=4096)

A Unicorn-based emulator.

Expand source code Browse git
class UnicornEmulator(RawMetalEmulator[Uc, int, _T]):
    """
    A Unicorn-based emulator.
    """

    unicorn: Uc

    def _reset(self):
        uc_arch, uc_mode = {
            Arch.X32     : (uc.UC_ARCH_X86,   uc.UC_MODE_32),     # noqa
            Arch.X64     : (uc.UC_ARCH_X86,   uc.UC_MODE_64),     # noqa
            Arch.ARM32   : (uc.UC_ARCH_ARM,   uc.UC_MODE_ARM),    # noqa
            Arch.ARM64   : (uc.UC_ARCH_ARM,   uc.UC_MODE_THUMB),  # noqa
            Arch.MIPS16  : (uc.UC_ARCH_MIPS,  uc.UC_MODE_16),     # noqa
            Arch.MIPS32  : (uc.UC_ARCH_MIPS,  uc.UC_MODE_32),     # noqa
            Arch.MIPS64  : (uc.UC_ARCH_MIPS,  uc.UC_MODE_64),     # noqa
            Arch.PPC32   : (uc.UC_ARCH_PPC,   uc.UC_MODE_32),     # noqa
            Arch.PPC64   : (uc.UC_ARCH_PPC,   uc.UC_MODE_64),     # noqa
            Arch.SPARC32 : (uc.UC_ARCH_SPARC, uc.UC_MODE_32),     # noqa
            Arch.SPARC64 : (uc.UC_ARCH_SPARC, uc.UC_MODE_V9),     # noqa
        }[self.exe.arch()]

        uc_mode |= {
            BO.BE: uc.UC_MODE_BIG_ENDIAN,
            BO.LE: uc.UC_MODE_LITTLE_ENDIAN,
        }[self.exe.byte_order()]

        self.unicorn = uc.Uc(uc_arch, uc_mode)

        self._map_segments()
        self._map_stack_and_heap()

        for hook, flag, callback in [
            (uc.UC_HOOK_CODE,           Hook.CodeExecute, self.hook_code_execute ),  # noqa
            (uc.UC_HOOK_INSN_INVALID,   Hook.CodeError,   self.hook_code_error   ),  # noqa
            (uc.UC_HOOK_MEM_READ_AFTER, Hook.MemoryRead,  self.hook_mem_read     ),  # noqa
            (uc.UC_HOOK_MEM_WRITE,      Hook.MemoryWrite, self.hook_mem_write    ),  # noqa
            (uc.UC_HOOK_MEM_INVALID,    Hook.MemoryError, self.hook_mem_error    ),  # noqa
        ]:
            if self.hooked(flag):
                self.unicorn.hook_add(hook, callback, user_data=self.state)

    def _init(self):
        self._reg_by_name: Dict[str, Register[int]] = {}
        self._reg_by_code: Dict[int, Register[int]] = {}
        for module in [
            uc.x86_const,
            uc.arm_const,
            uc.sparc_const,
            uc.mips_const,
        ]:
            md: Dict[str, Any] = module.__dict__
            for name, code in md.items():
                try:
                    u, *_, kind, name = name.split('_')
                except Exception:
                    continue
                if kind != 'REG' or u != 'UC':
                    continue
                name = name.casefold()
                reg = Register(name, code)
                self._reg_by_name[name] = reg
                self._reg_by_code[code] = reg

    def _emulate(self, start: int, end: Optional[int] = None):
        if end is None:
            end = self.exe.location_from_address(start).virtual.box.upper
        try:
            self.unicorn.emu_start(start, end)
        except uc.UcError as E:
            raise EmulationError(*E.args) from E

    def halt(self):
        self.unicorn.emu_stop()

    def _lookup_register(self, var: Union[str, int]) -> Register[int]:
        reg = None
        if isinstance(var, str):
            reg = self._reg_by_name[var.casefold()]
        if isinstance(var, int):
            reg = self._reg_by_code[var]
        if reg is None:
            raise TypeError(var)
        if reg.size is None:
            reg.size = self.measure_register_size(reg.code)
        return reg

    def _map(self, address: int, size: int):
        return self.unicorn.mem_map(address, size)

    def _set_register(self, reg: int, value: int) -> None:
        return self.unicorn.reg_write(reg, value)

    def _get_register(self, reg: int) -> int:
        return self.unicorn.reg_read(reg)

    def mem_write(self, address: int, data: bytes):
        return self.unicorn.mem_write(address, data)

    def mem_read(self, address: int, size: int):
        return self.unicorn.mem_read(address, size)

Ancestors

Class variables

var unicorn

Inherited members

class IcicleEmulator (data, base=None, arch=None, hooks=Hook.OnlyErrors, align_size=4096, alloc_size=4096)

An Icicle-based emulator. Icicle is a more recent emulator engine and not yet as mature as Unicorn. There are some compelling arguments for its robustness, but with the current interface it is completely lacking any memory write hook support, which makes it difficult to use for most of our applications. See also the Icicle paper.

Expand source code Browse git
class IcicleEmulator(RawMetalEmulator[Ic, str, _T]):
    """
    An Icicle-based emulator. Icicle is a more recent emulator engine and not yet as mature as
    Unicorn. There are some compelling arguments for its robustness, but with the current
    interface it is completely lacking any memory write hook support, which makes it difficult
    to use for most of our applications. See also the [Icicle paper][ICE].

    [ICE]: https://arxiv.org/pdf/2301.13346
    """

    icicle: Ic

    def _init(self):
        ...

    def _reset(self):
        exe = self.exe

        try:
            arch = {
                Arch.X32   : 'i686',
                Arch.X64   : 'x86_64',
            }[exe.arch()]
        except KeyError:
            arch = None
        if arch not in ic.architectures():
            raise NotImplementedError(F'Icicle cannot handle executables of arch {exe.arch().name}')

        if self.hooks & Hook.Memory:
            raise NotImplementedError(U'Icicle does not support memory hooks yet.')

        self.icicle = ice = ic.Icicle(arch)
        self.regmap = {reg.casefold(): val[1] for reg, val in ice.reg_list().items()}

        self._map_segments()
        self._map_stack_and_heap()

    def _emulate(self, start: int, end: Optional[int] = None):
        dasm = self.disassembler()
        code = False
        RS = ic.RunStatus
        emu = self.icicle

        if self.hooked(Hook.CodeExecute):
            code = True
            step = partial(emu.step, 1)
        elif end is not None:
            step = partial(emu.run_until, end)
        else:
            step = emu.run

        self.ip = ip = start

        while True:
            if code:
                op = next(dasm.disasm(self.exe[ip:ip + 12], ip, 1))
                self.hook_code_execute(emu, ip, op._raw.size, self.state)

            status = step()

            if status == RS.InstructionLimit:
                ip = self.ip
                continue

            if status in (
                RS.Breakpoint,
                RS.Halt,
                RS.Killed,
            ):
                break
            if status == RS.UnhandledException:
                raise EmulationError(emu.exception_code.name)
            if status != RS.Running:
                raise EmulationError(status.name)

    def halt(self):
        self.icicle.add_breakpoint(self.ip)

    def _lookup_register(self, var: str) -> Register[str]:
        name = var.casefold()
        size = self.regmap[name]
        return Register(name, name, size)

    def _map(self, address: int, size: int):
        MP = ic.MemoryProtection
        if self.hooked(Hook.MemoryRead):
            perm = MP.ExecuteRead
        elif self.hooked(Hook.MemoryWrite):
            perm = MP.ExecuteRead
        else:
            perm = MP.ExecuteReadWrite
        return self.icicle.mem_map(address, size, perm)

    def _set_register(self, reg: str, value: int) -> None:
        return self.icicle.reg_write(reg, value)

    def _get_register(self, reg: str) -> int:
        return self.icicle.reg_read(reg)

    def mem_write(self, address: int, data: bytes):
        return self.icicle.mem_write(address, data)

    def mem_read(self, address: int, size: int):
        return self.icicle.mem_read(address, size)

Ancestors

Class variables

var icicle

Inherited members

class SpeakeasyEmulator (data, base=None, arch=None, hooks=Hook.OnlyErrors, align_size=4096, alloc_size=4096)

A Speakeasy-based emulator. Speakeasy only supports PE files, but it has support for several Windows API routines which can be an advantage.

Expand source code Browse git
class SpeakeasyEmulator(Emulator[Se, str, _T]):
    """
    A Speakeasy-based emulator. Speakeasy only supports PE files, but it has support for several
    Windows API routines which can be an advantage.
    """

    speakeasy: Se

    def _init(self):
        self._regs: Dict[str, Register[str]] = {}

    def _reset(self):
        exe = self.exe
        if exe.type not in (ET.PE, ET.BLOB):
            raise NotImplementedError(F'Speakeasy cannot handle executables of type {exe.type.name}.')
        try:
            arch = {
                Arch.X32: 'x86',
                Arch.X64: 'x64',
            }[exe.arch()]
        except KeyError as KE:
            raise NotImplementedError(F'Speakeasy cannot handle executables of arch {exe.arch().name}') from KE

        emu = self.speakeasy = se.Speakeasy()

        with VirtualFileSystem() as vfs:
            db = bytes(exe.data)
            vf = vfs.new(db)
            if exe.blob:
                self.base = emu.load_shellcode(vf.path, data=db, arch=arch)
            else:
                self.base = emu.load_module(vf.path, data=db).get_base()

        emu.emu.timeout = 0

        if self.hooked(Hook.CodeExecute):
            emu.add_code_hook(self.hook_code_execute, ctx=self.state)

        if self.hooked(Hook.MemoryRead):
            emu.add_mem_read_hook(self.hook_mem_read)

        if self.hooked(Hook.MemoryWrite):
            emu.add_mem_write_hook(self.hook_mem_write)

        if self.hooked(Hook.MemoryError):
            emu.add_mem_invalid_hook(self.hook_mem_error)

    @property
    def stack_region(self):
        emu = self.speakeasy
        tos = self.sp
        mms: List[MemMap] = emu.get_mem_maps()
        if tos != emu.get_stack_ptr():
            raise EmulationError('Unexpected stack pointer misalignment')
        try:
            sm, = (mm for mm in mms if tos in range(mm.base, mm.base + mm.size))
        except Exception:
            raise EmulationError('Ambiguous memory, unable to locate the stack.')
        return sm

    @property
    def stack_base(self):
        return self.stack_region.base

    @stack_base.setter
    def stack_base(self, value):
        raise AttributeError

    @property
    def stack_size(self):
        return self.stack_region.size

    @stack_size.setter
    def stack_size(self, value):
        raise AttributeError

    def malloc(self, size: int) -> int:
        return self.speakeasy.mem_alloc(size)

    def morestack(self):
        spksy = self.speakeasy
        stack = self.stack_region
        base = stack.base - self.alloc_size
        spksy.emu.mem_map(self.alloc_size, base)
        stack.base = base
        stack.size = stack.size + self.alloc_size

    def _emulate(self, start: int, end: Optional[int] = None):
        emu = self.speakeasy

        def stackfix(emu, address: int, size: int, ctx: list):
            if not ctx:
                stack = self.stack_region
                self.sp = stack.base + stack.size // 3
                ctx.append(True)
            return True

        emu.add_code_hook(stackfix, start, start, ctx=[])

        if end is not None:
            def _terminate(*_):
                emu.stop()
            emu.add_code_hook(_terminate, end, end + 1)

        if self.exe.blob:
            return emu.run_shellcode(start)
        else:
            return emu.call(start)

    def halt(self):
        return self.speakeasy.stop()

    def _set_register(self, register: str, v: int):
        return self.speakeasy.reg_write(register, v)

    def _get_register(self, register: str) -> int:
        return self.speakeasy.reg_read(register)

    def _lookup_register(self, var: str) -> Register[str]:
        try:
            reg = self._regs[var]
        except KeyError:
            try:
                size = self.measure_register_size(var)
            except Exception:
                raise LookupError(var)
            else:
                reg = self._regs[var] = Register(var, var, size)
        return reg

    def _map(self, address: int, size: int):
        spksy = self.speakeasy
        alloc = spksy.mem_alloc(size, address)
        if alloc != address:
            spksy.mem_free(alloc)
            alloc = spksy.emu.mem_map(size, address)
        if alloc != address:
            raise LookupError(F'Unable to allocate {size} bytes at address 0x{address:X}')
        return alloc

    def mem_write(self, address: int, data: bytes):
        return self.speakeasy.mem_write(address, data)

    def mem_read(self, address: int, size: int):
        return self.speakeasy.mem_read(address, size)

Ancestors

Class variables

var speakeasy

Instance variables

var stack_region
Expand source code Browse git
@property
def stack_region(self):
    emu = self.speakeasy
    tos = self.sp
    mms: List[MemMap] = emu.get_mem_maps()
    if tos != emu.get_stack_ptr():
        raise EmulationError('Unexpected stack pointer misalignment')
    try:
        sm, = (mm for mm in mms if tos in range(mm.base, mm.base + mm.size))
    except Exception:
        raise EmulationError('Ambiguous memory, unable to locate the stack.')
    return sm
var stack_base
Expand source code Browse git
@property
def stack_base(self):
    return self.stack_region.base
var stack_size
Expand source code Browse git
@property
def stack_size(self):
    return self.stack_region.size

Inherited members