Module refinery.lib.emulator.se

Implements refinery.lib.emulator.interface.Emulator for the speakeasy backend.

Expand source code Browse git
"""
Implements `refinery.lib.emulator.interface.Emulator` for the speakeasy backend.
"""
from __future__ import annotations

from typing import TYPE_CHECKING, TypeVar

from refinery.lib.emulator.abstract import EmulationError, Emulator, Hook, Register
from refinery.lib.executable import ET, Arch
from refinery.lib.shared import speakeasy as se
from refinery.lib.vfs import VirtualFileSystem

if TYPE_CHECKING:
    from speakeasy import Speakeasy as Se
    from speakeasy.common import CodeHook
    from speakeasy.common import Hook as SeHook
    from speakeasy.memmgr import MemMap
else:
    class Se:
        pass


class SpeakeasyNotInitialized(EmulationError):
    def __init__(self) -> None:
        super().__init__('Speakeasy was unexpectedly not initialized.')


_T = TypeVar('_T')


class SpeakeasyEmulator(Emulator[Se, str, _T]):
    """
    A Speakeasy-based emulator. Speakeasy only supports PE files, but it has support for several
    Windows API routines which can be an advantage.
    """

    speakeasy: Se

    def _init(self):
        self._regs: dict[str, Register[str]] = {}

    class _singlestep:
        def __init__(self):
            self.stepped = False

        def __call__(self, se: Se, *_, **kw):
            if self.stepped:
                self.stepped = False
                se.stop()
            else:
                self.stepped = True
            return True

    class _stackfix:
        hook: CodeHook | None

        def __init__(self, parent: SpeakeasyEmulator):
            self.hook = None
            self.parent = parent

        def __call__(self, base_emu: Se, address: int, size: int, ctx: list):
            if hook := self.hook:
                emu = self.parent
                stack = emu.stack_region
                emu.sp = stack.base + stack.size // 3
                hook.disable()

    def _reset(self):
        exe = self.exe
        if exe.type not in (ET.PE, ET.BLOB):
            raise NotImplementedError(F'Speakeasy cannot handle executables of type {exe.type.name}.')
        try:
            arch = {
                Arch.X32: 'x86',
                Arch.X64: 'x64',
            }[exe.arch()]
        except KeyError as KE:
            raise NotImplementedError(F'Speakeasy cannot handle executables of arch {exe.arch().name}') from KE

        emu = self.speakeasy = se.Speakeasy()

        with VirtualFileSystem() as vfs:
            db = bytes(exe.data)
            vf = vfs.new(db)
            if exe.blob:
                self.base = emu.load_shellcode(vf.path, data=db, arch=arch)
            else:
                self.base = emu.load_module(vf.path, data=db).get_base()

        if emu.emu is None:
            raise RuntimeError('emulator failed to initialize')

        self._end_hook_s = None
        self._end_hook_d = None

        self._single_step_hook_s = emu.add_code_hook(self._singlestep())
        self._single_step_hook_d = emu.add_dyn_code_hook(self._singlestep())
        self._disable_single_step()

        stackfix = self._stackfix(self)
        stackfix.hook = emu.add_code_hook(stackfix)

        emu.emu.timeout = 0

        # prevent memory hook from being overridden, this is a bug in speakeasy
        emu.emu.add_interrupt_hook(cb=emu.emu._hook_interrupt)
        emu.emu.builtin_hooks_set = True

        if self.hooked(Hook.CodeExecute):
            emu.add_code_hook(self.hook_code_execute, ctx=self.state)
            emu.add_dyn_code_hook(self.hook_code_execute, ctx=self.state)

        if self.hooked(Hook.MemoryRead):
            emu.add_mem_read_hook(self.hook_mem_read)

        if self.hooked(Hook.MemoryWrite):
            emu.add_mem_write_hook(self.hook_mem_write)

        if self.hooked(Hook.MemoryError):
            emu.add_mem_invalid_hook(self.hook_mem_error)

        if self.hooked(Hook.ApiCall):
            emu.add_api_hook(self.hook_api_call, '*', '*')

    def _enable_single_step(self):
        hd = self._single_step_hook_d
        hs = self._single_step_hook_s
        if hd is None or hs is None:
            raise RuntimeError('single stepping hooks failed to be installed')
        hd.cb.stepped = False
        hs.cb.stepped = False
        hd.enable()
        hs.enable()

    def _disable_single_step(self):
        if hook := self._single_step_hook_d:
            hook.disable()
        if hook := self._single_step_hook_s:
            hook.disable()

    @property
    def stack_region(self):
        emu = self.speakeasy
        tos = self.sp
        mms: list[MemMap] = emu.get_mem_maps()
        if tos != emu.get_stack_ptr():
            raise EmulationError('Unexpected stack pointer misalignment')
        try:
            sm, = (mm for mm in mms if tos in range(mm.base, mm.base + mm.size))
        except Exception:
            raise EmulationError('Ambiguous memory, unable to locate the stack.')
        return sm

    @property
    def stack_base(self):
        return self.stack_region.base

    @stack_base.setter
    def stack_base(self, value):
        raise AttributeError

    @property
    def stack_size(self):
        return self.stack_region.size

    @stack_size.setter
    def stack_size(self, value):
        raise AttributeError

    def _map_update(self):
        self._memorymap.clear()
        if (e := self.speakeasy.emu) and (eng := e.emu_eng) and (emu := eng.emu):
            for a, b, _ in emu.mem_regions():
                self._memorymap.addi(a, b - a + 1)
        else:
            raise SpeakeasyNotInitialized

    def malloc(self, size: int) -> int:
        return self.speakeasy.mem_alloc(size)

    def morestack(self):
        spksy = self.speakeasy
        stack = self.stack_region
        base = stack.base - self.alloc_size
        if (emu := spksy.emu) is None:
            raise SpeakeasyNotInitialized
        emu.mem_map(self.alloc_size, base)
        stack.base = base
        stack.size = stack.size + self.alloc_size

    class _stop:
        hook: SeHook | None
        address: int | None

        def __init__(self, address: int | None = None):
            self.address = address
            self.hook = None

        def __call__(self, spky: Se, address: int, size: int, ctx: list):
            if hook := self.hook:
                if address == self.address:
                    spky.stop()
                    hook.disable()

    _end_hook_s: _stop | None
    _end_hook_d: _stop | None

    def _remove_hook(self, hook: SeHook | None):
        if hook is None:
            return
        hook.emu_eng.hook_del(hook.handle)
        emu = self.speakeasy.emu
        assert emu is not None
        for hooklist in emu.hooks.values():
            assert isinstance(hooklist, list)
            for k, h in enumerate(hooklist):
                if h is hook:
                    del hooklist[k]
                    break

    def _set_end(self, end: int | None):
        if h := self._end_hook_s:
            self._remove_hook(h.hook)
        if h := self._end_hook_d:
            self._remove_hook(h.hook)
        if end is None:
            self._end_hook_s = None
            self._end_hook_d = None
        else:
            self._end_hook_s = h = self._stop(end)
            h.hook = self.speakeasy.add_code_hook(h, end, end + 1)
            self._end_hook_d = h = self._stop(end)
            h.hook = self.speakeasy.add_dyn_code_hook(h)

    def _emulate(self, start: int, end: int | None = None):
        spk = self.speakeasy

        if (inner := spk.emu) is None:
            raise SpeakeasyNotInitialized

        win32 = isinstance(inner, se.Win32Emulator)
        self._set_end(end)

        if inner.get_current_run():
            return spk.resume(start)

        if self.exe.blob:
            offset = start - self.base
            if offset < 0:
                raise ValueError(F'invalid offset 0x{start:X} specified; base address is 0x{self.base:X}')
            spk.run_shellcode(self.base, offset=offset)
        else:
            inner.stack_base, _ = inner.alloc_stack(self.stack_size)
            inner.set_func_args(inner.stack_base, inner.return_hook)

            run = se.profiler.Run()
            run.type = 'thread'         # type:ignore
            run.start_addr = start      # type:ignore
            run.instr_cnt = 0           # type:ignore
            run.args = ()               # type:ignore

            inner.add_run(run)

            if win32:
                if not (process := inner.init_container_process()):
                    process = se.windows.objman.Process(self)
                inner.processes.append(process)
                inner.curr_process = process
            else:
                process = None

            if mm := inner.get_address_map(start): # type:ignore
                mm: MemMap
                mm.set_process(inner.curr_process)

            t = se.windows.objman.Thread(inner, stack_base=inner.stack_base, stack_commit=self.stack_size)

            inner.om.objects.update({t.address: t})
            inner.curr_process.threads.append(t)
            inner.curr_thread = t

            if win32:
                peb = inner.alloc_peb(process)
                inner.init_teb(t, peb)

            inner.start()

    def halt(self):
        return self.speakeasy.stop()

    def _set_register(self, register: str, v: int):
        return self.speakeasy.reg_write(register, v)

    def _get_register(self, register: str) -> int:
        return self.speakeasy.reg_read(register)

    def _lookup_register(self, var: str) -> Register[str]:
        try:
            reg = self._regs[var]
        except KeyError:
            try:
                size = self.measure_register_size(var)
            except Exception:
                raise LookupError(var)
            else:
                reg = self._regs[var] = Register(var, var, size)
        return reg

    def _map(self, address: int, size: int):
        spksy = self.speakeasy
        if (emu := spksy.emu) is None:
            raise SpeakeasyNotInitialized
        if emu.get_address_map(address):
            raise ValueError(address)
        if mm := emu.get_reserve_map(address):
            mm: MemMap = emu.get_address_map(emu.mem_map_reserve(mm.base))
            if address not in range(mm.base, mm.base + mm.size):
                raise RuntimeError(F'Speakeasy claimed to map 0x{address:X} in map 0x{mm.base:X}-0x{mm.base + mm.size:X}.')
            map_size = mm.size
            map_base = mm.base
            _new_size = size - map_size + address - map_base
            _new_base = address + map_size
            if _new_size > 0 and self._map(_new_base, _new_size) != _new_base:
                raise RuntimeError(F'Attempting to remain rest of size 0x{_new_size:X} at 0x{_new_base:X} failed.')
            return address
        else:
            alloc = spksy.mem_alloc(size, address)
            if alloc != address:
                spksy.mem_free(alloc)
                raise LookupError(F'Unable to allocate {size} bytes at address 0x{address:X} because Speakeasy has reserved this region.')
            return alloc

    def mem_write(self, address: int, data: bytes):
        return self.speakeasy.mem_write(address, data)

    def mem_read(self, address: int, size: int):
        return self.speakeasy.mem_read(address, size)

Classes

class Se
Expand source code Browse git
class Se:
    pass
class SpeakeasyNotInitialized

Base class for any exceptions raised by emulators.

Expand source code Browse git
class SpeakeasyNotInitialized(EmulationError):
    def __init__(self) -> None:
        super().__init__('Speakeasy was unexpectedly not initialized.')

Ancestors

class SpeakeasyEmulator (data, base=None, arch=None, hooks=18, align_size=4096, alloc_size=4096)

A Speakeasy-based emulator. Speakeasy only supports PE files, but it has support for several Windows API routines which can be an advantage.

Expand source code Browse git
class SpeakeasyEmulator(Emulator[Se, str, _T]):
    """
    A Speakeasy-based emulator. Speakeasy only supports PE files, but it has support for several
    Windows API routines which can be an advantage.
    """

    speakeasy: Se

    def _init(self):
        self._regs: dict[str, Register[str]] = {}

    class _singlestep:
        def __init__(self):
            self.stepped = False

        def __call__(self, se: Se, *_, **kw):
            if self.stepped:
                self.stepped = False
                se.stop()
            else:
                self.stepped = True
            return True

    class _stackfix:
        hook: CodeHook | None

        def __init__(self, parent: SpeakeasyEmulator):
            self.hook = None
            self.parent = parent

        def __call__(self, base_emu: Se, address: int, size: int, ctx: list):
            if hook := self.hook:
                emu = self.parent
                stack = emu.stack_region
                emu.sp = stack.base + stack.size // 3
                hook.disable()

    def _reset(self):
        exe = self.exe
        if exe.type not in (ET.PE, ET.BLOB):
            raise NotImplementedError(F'Speakeasy cannot handle executables of type {exe.type.name}.')
        try:
            arch = {
                Arch.X32: 'x86',
                Arch.X64: 'x64',
            }[exe.arch()]
        except KeyError as KE:
            raise NotImplementedError(F'Speakeasy cannot handle executables of arch {exe.arch().name}') from KE

        emu = self.speakeasy = se.Speakeasy()

        with VirtualFileSystem() as vfs:
            db = bytes(exe.data)
            vf = vfs.new(db)
            if exe.blob:
                self.base = emu.load_shellcode(vf.path, data=db, arch=arch)
            else:
                self.base = emu.load_module(vf.path, data=db).get_base()

        if emu.emu is None:
            raise RuntimeError('emulator failed to initialize')

        self._end_hook_s = None
        self._end_hook_d = None

        self._single_step_hook_s = emu.add_code_hook(self._singlestep())
        self._single_step_hook_d = emu.add_dyn_code_hook(self._singlestep())
        self._disable_single_step()

        stackfix = self._stackfix(self)
        stackfix.hook = emu.add_code_hook(stackfix)

        emu.emu.timeout = 0

        # prevent memory hook from being overridden, this is a bug in speakeasy
        emu.emu.add_interrupt_hook(cb=emu.emu._hook_interrupt)
        emu.emu.builtin_hooks_set = True

        if self.hooked(Hook.CodeExecute):
            emu.add_code_hook(self.hook_code_execute, ctx=self.state)
            emu.add_dyn_code_hook(self.hook_code_execute, ctx=self.state)

        if self.hooked(Hook.MemoryRead):
            emu.add_mem_read_hook(self.hook_mem_read)

        if self.hooked(Hook.MemoryWrite):
            emu.add_mem_write_hook(self.hook_mem_write)

        if self.hooked(Hook.MemoryError):
            emu.add_mem_invalid_hook(self.hook_mem_error)

        if self.hooked(Hook.ApiCall):
            emu.add_api_hook(self.hook_api_call, '*', '*')

    def _enable_single_step(self):
        hd = self._single_step_hook_d
        hs = self._single_step_hook_s
        if hd is None or hs is None:
            raise RuntimeError('single stepping hooks failed to be installed')
        hd.cb.stepped = False
        hs.cb.stepped = False
        hd.enable()
        hs.enable()

    def _disable_single_step(self):
        if hook := self._single_step_hook_d:
            hook.disable()
        if hook := self._single_step_hook_s:
            hook.disable()

    @property
    def stack_region(self):
        emu = self.speakeasy
        tos = self.sp
        mms: list[MemMap] = emu.get_mem_maps()
        if tos != emu.get_stack_ptr():
            raise EmulationError('Unexpected stack pointer misalignment')
        try:
            sm, = (mm for mm in mms if tos in range(mm.base, mm.base + mm.size))
        except Exception:
            raise EmulationError('Ambiguous memory, unable to locate the stack.')
        return sm

    @property
    def stack_base(self):
        return self.stack_region.base

    @stack_base.setter
    def stack_base(self, value):
        raise AttributeError

    @property
    def stack_size(self):
        return self.stack_region.size

    @stack_size.setter
    def stack_size(self, value):
        raise AttributeError

    def _map_update(self):
        self._memorymap.clear()
        if (e := self.speakeasy.emu) and (eng := e.emu_eng) and (emu := eng.emu):
            for a, b, _ in emu.mem_regions():
                self._memorymap.addi(a, b - a + 1)
        else:
            raise SpeakeasyNotInitialized

    def malloc(self, size: int) -> int:
        return self.speakeasy.mem_alloc(size)

    def morestack(self):
        spksy = self.speakeasy
        stack = self.stack_region
        base = stack.base - self.alloc_size
        if (emu := spksy.emu) is None:
            raise SpeakeasyNotInitialized
        emu.mem_map(self.alloc_size, base)
        stack.base = base
        stack.size = stack.size + self.alloc_size

    class _stop:
        hook: SeHook | None
        address: int | None

        def __init__(self, address: int | None = None):
            self.address = address
            self.hook = None

        def __call__(self, spky: Se, address: int, size: int, ctx: list):
            if hook := self.hook:
                if address == self.address:
                    spky.stop()
                    hook.disable()

    _end_hook_s: _stop | None
    _end_hook_d: _stop | None

    def _remove_hook(self, hook: SeHook | None):
        if hook is None:
            return
        hook.emu_eng.hook_del(hook.handle)
        emu = self.speakeasy.emu
        assert emu is not None
        for hooklist in emu.hooks.values():
            assert isinstance(hooklist, list)
            for k, h in enumerate(hooklist):
                if h is hook:
                    del hooklist[k]
                    break

    def _set_end(self, end: int | None):
        if h := self._end_hook_s:
            self._remove_hook(h.hook)
        if h := self._end_hook_d:
            self._remove_hook(h.hook)
        if end is None:
            self._end_hook_s = None
            self._end_hook_d = None
        else:
            self._end_hook_s = h = self._stop(end)
            h.hook = self.speakeasy.add_code_hook(h, end, end + 1)
            self._end_hook_d = h = self._stop(end)
            h.hook = self.speakeasy.add_dyn_code_hook(h)

    def _emulate(self, start: int, end: int | None = None):
        spk = self.speakeasy

        if (inner := spk.emu) is None:
            raise SpeakeasyNotInitialized

        win32 = isinstance(inner, se.Win32Emulator)
        self._set_end(end)

        if inner.get_current_run():
            return spk.resume(start)

        if self.exe.blob:
            offset = start - self.base
            if offset < 0:
                raise ValueError(F'invalid offset 0x{start:X} specified; base address is 0x{self.base:X}')
            spk.run_shellcode(self.base, offset=offset)
        else:
            inner.stack_base, _ = inner.alloc_stack(self.stack_size)
            inner.set_func_args(inner.stack_base, inner.return_hook)

            run = se.profiler.Run()
            run.type = 'thread'         # type:ignore
            run.start_addr = start      # type:ignore
            run.instr_cnt = 0           # type:ignore
            run.args = ()               # type:ignore

            inner.add_run(run)

            if win32:
                if not (process := inner.init_container_process()):
                    process = se.windows.objman.Process(self)
                inner.processes.append(process)
                inner.curr_process = process
            else:
                process = None

            if mm := inner.get_address_map(start): # type:ignore
                mm: MemMap
                mm.set_process(inner.curr_process)

            t = se.windows.objman.Thread(inner, stack_base=inner.stack_base, stack_commit=self.stack_size)

            inner.om.objects.update({t.address: t})
            inner.curr_process.threads.append(t)
            inner.curr_thread = t

            if win32:
                peb = inner.alloc_peb(process)
                inner.init_teb(t, peb)

            inner.start()

    def halt(self):
        return self.speakeasy.stop()

    def _set_register(self, register: str, v: int):
        return self.speakeasy.reg_write(register, v)

    def _get_register(self, register: str) -> int:
        return self.speakeasy.reg_read(register)

    def _lookup_register(self, var: str) -> Register[str]:
        try:
            reg = self._regs[var]
        except KeyError:
            try:
                size = self.measure_register_size(var)
            except Exception:
                raise LookupError(var)
            else:
                reg = self._regs[var] = Register(var, var, size)
        return reg

    def _map(self, address: int, size: int):
        spksy = self.speakeasy
        if (emu := spksy.emu) is None:
            raise SpeakeasyNotInitialized
        if emu.get_address_map(address):
            raise ValueError(address)
        if mm := emu.get_reserve_map(address):
            mm: MemMap = emu.get_address_map(emu.mem_map_reserve(mm.base))
            if address not in range(mm.base, mm.base + mm.size):
                raise RuntimeError(F'Speakeasy claimed to map 0x{address:X} in map 0x{mm.base:X}-0x{mm.base + mm.size:X}.')
            map_size = mm.size
            map_base = mm.base
            _new_size = size - map_size + address - map_base
            _new_base = address + map_size
            if _new_size > 0 and self._map(_new_base, _new_size) != _new_base:
                raise RuntimeError(F'Attempting to remain rest of size 0x{_new_size:X} at 0x{_new_base:X} failed.')
            return address
        else:
            alloc = spksy.mem_alloc(size, address)
            if alloc != address:
                spksy.mem_free(alloc)
                raise LookupError(F'Unable to allocate {size} bytes at address 0x{address:X} because Speakeasy has reserved this region.')
            return alloc

    def mem_write(self, address: int, data: bytes):
        return self.speakeasy.mem_write(address, data)

    def mem_read(self, address: int, size: int):
        return self.speakeasy.mem_read(address, size)

Ancestors

Class variables

var speakeasy

Instance variables

var stack_region
Expand source code Browse git
@property
def stack_region(self):
    emu = self.speakeasy
    tos = self.sp
    mms: list[MemMap] = emu.get_mem_maps()
    if tos != emu.get_stack_ptr():
        raise EmulationError('Unexpected stack pointer misalignment')
    try:
        sm, = (mm for mm in mms if tos in range(mm.base, mm.base + mm.size))
    except Exception:
        raise EmulationError('Ambiguous memory, unable to locate the stack.')
    return sm
var stack_base
Expand source code Browse git
@property
def stack_base(self):
    return self.stack_region.base
var stack_size
Expand source code Browse git
@property
def stack_size(self):
    return self.stack_region.size

Inherited members