Module refinery.units.formats.exe.vstack

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import List, Dict, Any, TYPE_CHECKING

import re

from refinery.units import Arg, Unit
from refinery.lib.executable import align, Arch, BO, Executable, Range, ExecutableCodeBlob
from refinery.lib.types import bounds, INF
from refinery.lib.meta import SizeInt
from refinery.lib.tools import NoLogging

from dataclasses import dataclass, field
from collections import defaultdict

if TYPE_CHECKING:
    from typing import Tuple, Optional, Iterator
    from capstone import Cs
    from unicorn import Uc
    from intervaltree import IntervalTree, Interval


@dataclass
class EmuState:
    executable: Executable
    writes: IntervalTree
    expected_address: int
    stack: Range
    blob: bool
    disassembler: Optional[Cs] = None
    waiting: int = 0
    callstack: List[int] = field(default_factory=list)
    retaddr: Optional[int] = None
    stop: Optional[int] = None
    previous_address: int = 0
    sp_register: int = 0
    ip_register: int = 0
    rv_register: int = 0
    callstack_ceiling: int = 0
    allocations: List[Range] = field(default_factory=list)
    ticks: int = field(default_factory=lambda: INF)
    max_wait: int = 0
    max_loop: int = 0
    visits: Dict[int, int] = field(default_factory=lambda: defaultdict(int))
    last_read: Optional[int] = None

    def disassemble(self, address: int, size: int):
        if self.disassembler is None:
            return None
        try:
            pos = self.executable.location_from_address(address).physical.position
            end = pos + size
            return next(self.disassembler.disasm(
                bytes(self.executable.data[pos:end]), address, 1))
        except Exception:
            return None

    def log(self, msg: str) -> str:
        _width = len(str(self.max_wait))
        _depth = len(self.callstack)
        return F'[wait={self.waiting:0{_width}d}] [call={_depth}] {self.fmt(self.previous_address)}: {msg}'

    def fmt(self, address: int) -> str:
        return F'0x{address:0{self.executable.pointer_size // 4}X}'


class vstack(Unit):
    """
    The unit emulates instructions at a given address in the input executable (PE/ELF/MachO) and
    extracts data patches that are written to the stack during emulation. Emulation is halted as
    soon as a certain number of instructions has not performed any memory writes, or when an error
    occurs. By default, most registers are set to the current location in the emulated stack.
    However, if you want to initialize certain registers differently, you can set an environment
    variable to the desired value.
    """

    @Unit.Requires('intervaltree', 'default', 'extended')
    def _intervaltree():
        import intervaltree
        return intervaltree

    @Unit.Requires('unicorn==2.0.1.post1', 'default', 'extended')
    def _unicorn():
        with NoLogging():
            import unicorn
        import unicorn.x86_const
        import unicorn.arm64_const
        import unicorn.mips_const
        import unicorn.sparc_const
        try:
            import unicorn.ppc_const
        except ImportError:
            pass
        return unicorn

    @Unit.Requires('capstone', 'default', 'extended')
    def _capstone():
        import capstone
        return capstone

    def __init__(
        self,
        *address: Arg.Number(metavar='start', help='Specify the (virtual) addresses of a stack string instruction sequences.'),
        stop: Arg.Number('-s', metavar='stop', help='Optional: Stop when reaching this address.') = None,
        base: Arg.Number('-b', metavar='Addr', help='Optionally specify a custom base address B.') = None,
        arch: Arg.Option('-a', help='Specify for blob inputs: {choices}', choices=Arch) = Arch.X32,
        meta_registers: Arg.Switch('-r', help='Consume register initialization values from the chunk\'s metadata.') = False,
        timeout: Arg.Number('-t', help='Optionally stop emulating after a given number of instructions.') = None,
        patch_range: Arg.Bounds('-p', metavar='MIN:MAX',
            help='Extract only patches that are in the given range, default is {default}.') = slice(5, None),
        write_range: Arg.Bounds('-n', metavar='MIN:MAX',
            help='Log only writes whose size is in the given range, default is {default}.') = slice(1, None),
        wait: Arg.Number('-w', help=(
            'When this many instructions did not write to memory, emulation is halted. The default is {default}.')) = 20,
        wait_calls: Arg.Switch('-c', group='CALL',
            help='Wait indefinitely when inside a function call.') = False,
        skip_calls: Arg.Counts('-C', group='CALL',
            help='Skip function calls entirely. Use twice to treat each call as allocating memory.') = 0,
        stack_size: Arg.Number('-S', help='Optionally specify the stack size. The default is 0x{default:X}.') = 0x10000,
        block_size: Arg.Number('-B', help='Standard memory block size for the emulator, 0x{default:X} by default.') = 0x1000,
        max_visits: Arg.Number('-V', help='Maximum number of times a code address is visited. Default is {default}.') = 0x1000,
        log_writes_in_calls: Arg.Switch('-W', help='Log writes of values that occur in functions calls.') = False,
        log_stack_addresses: Arg.Switch('-X', help='Log writes of values that are stack addresses.') = False,
        log_other_addresses: Arg.Switch('-Y', help='Log writes of values that are addresses to mapped segments.') = False,
        log_zero_overwrites: Arg.Switch('-Z', help='Log writes of zeros to memory that contained nonzero values.') = False,
        log_stack_cookies  : Arg.Switch('-E', help='Log writes that look like stack cookies.') = False,
    ):
        super().__init__(
            address=address or [0],
            stop=stop,
            base=base,
            arch=Arg.AsOption(arch, Arch),
            meta_registers=meta_registers,
            timeout=timeout,
            patch_range=patch_range,
            write_range=write_range,
            wait=wait,
            stack_size=stack_size,
            wait_calls=wait_calls,
            skip_calls=skip_calls,
            block_size=block_size,
            max_visits=max_visits,
            log_writes_in_calls=log_writes_in_calls,
            log_stack_addresses=log_stack_addresses,
            log_other_addresses=log_other_addresses,
            log_zero_overwrites=log_zero_overwrites,
            log_stack_cookies=log_stack_cookies
        )

    def _find_stack_location(self, exe: Executable):
        stack_size = self.args.stack_size
        memory_max = 1 << exe.pointer_size
        space = exe.image_defined_address_space()
        aligned = align(stack_size, space.upper)
        if aligned + stack_size < memory_max:
            return aligned
        aligned = align(stack_size, space.lower - stack_size, down=True)
        if aligned > 0:
            return aligned
        raise RuntimeError('The primitive method used to map stack memory has failed.')

    def process(self, data):
        uc = self._unicorn
        blob = False
        try:
            exe = Executable.Load(data, self.args.base)
        except ValueError:
            exe = ExecutableCodeBlob(data, self.args.base, self.args.arch)
            blob = True
        arch = exe.arch()
        width = exe.pointer_size // 4
        block_size = self.args.block_size
        stack_size = self.args.stack_size
        stack_addr = self._find_stack_location(exe)
        self.log_info(F'mapping {SizeInt(stack_size)!r} of stack at 0x{stack_addr:X}')
        image = memoryview(data)
        disassembler = self._capstone.Cs(*self._cs_arch(arch, exe.byte_order()))
        register_values = {}

        if arch in (Arch.PPC32, Arch.PPC64):
            try:
                sp = uc.ppc_const.UC_PPC_REG_1
                rv = uc.ppc_const.UC_PPC_REG_3
                ip = uc.ppc_const.UC_PPC_REG_PC
            except AttributeError:
                raise RuntimeError('The installed unicorn version does not support the PPC architecture.')
        else:
            sp, ip, rv = {
                Arch.X32     : (
                    uc.x86_const.UC_X86_REG_ESP,
                    uc.x86_const.UC_X86_REG_EIP,
                    uc.x86_const.UC_X86_REG_EAX,
                ),
                Arch.X64     : (
                    uc.x86_const.UC_X86_REG_RSP,
                    uc.x86_const.UC_X86_REG_RIP,
                    uc.x86_const.UC_X86_REG_RAX,
                ),
                Arch.ARM32   : (
                    uc.arm_const.UC_ARM_REG_SP,
                    uc.arm_const.UC_ARM_REG_IP,
                    uc.arm_const.UC_ARM_REG_R0,
                ),
                Arch.ARM64   : (
                    uc.arm_const.UC_ARM_REG_SP,
                    uc.arm_const.UC_ARM_REG_IP,
                    uc.arm_const.UC_ARM_REG_R0,
                ),
                Arch.MIPS16  : (
                    uc.mips_const.UC_MIPS_REG_SP,
                    uc.mips_const.UC_MIPS_REG_PC,
                    uc.mips_const.UC_MIPS_REG_0,
                ),
                Arch.MIPS32  : (
                    uc.mips_const.UC_MIPS_REG_SP,
                    uc.mips_const.UC_MIPS_REG_PC,
                    uc.mips_const.UC_MIPS_REG_V0,
                ),
                Arch.MIPS64  : (
                    uc.mips_const.UC_MIPS_REG_SP,
                    uc.mips_const.UC_MIPS_REG_PC,
                    uc.mips_const.UC_MIPS_REG_V0,
                ),
                Arch.SPARC32 : (
                    uc.sparc_const.UC_SPARC_REG_SP,
                    uc.sparc_const.UC_SPARC_REG_PC,
                    uc.sparc_const.UC_SPARC_REG_O0,
                ),
                Arch.SPARC64 : (
                    uc.sparc_const.UC_SPARC_REG_SP,
                    uc.sparc_const.UC_SPARC_REG_PC,
                    uc.sparc_const.UC_SPARC_REG_O0,
                ),
            }[arch]

        if self.args.meta_registers:
            from refinery.lib.meta import metavars
            meta = metavars(data)
            for module in [uc.x86_const, uc.arm_const, uc.mips_const, uc.sparc_const]:
                md: Dict[str, Any] = module.__dict__
                for name, register in md.items():
                    try:
                        u, *_, kind, name = name.split('_')
                    except Exception:
                        continue
                    if kind != 'REG' or u != 'UC':
                        continue
                    for var, value in list(meta.items()):
                        if var.upper() != name:
                            continue
                        meta.discard(var)
                        register_values[register] = value
                        break

        for address in self.args.address:

            emulator = uc.Uc(*self._uc_arch(arch, exe.byte_order()))
            stack = Range(stack_addr, stack_addr + 3 * stack_size)

            emulator.mem_map(stack.lower, len(stack))
            emulator.reg_write(sp, stack.lower + 2 * len(stack) // 3)

            if arch is Arch.X32:
                for reg in [
                    uc.x86_const.UC_X86_REG_EAX,
                    uc.x86_const.UC_X86_REG_EBX,
                    uc.x86_const.UC_X86_REG_ECX,
                    uc.x86_const.UC_X86_REG_EDX,
                    uc.x86_const.UC_X86_REG_ESI,
                    uc.x86_const.UC_X86_REG_EDI,
                    uc.x86_const.UC_X86_REG_EBP,
                ]:
                    emulator.reg_write(reg, stack_addr + stack_size)
            if arch is Arch.X64:
                for reg in [
                    uc.x86_const.UC_X86_REG_RAX,
                    uc.x86_const.UC_X86_REG_RBX,
                    uc.x86_const.UC_X86_REG_RCX,
                    uc.x86_const.UC_X86_REG_RDX,
                    uc.x86_const.UC_X86_REG_RSI,
                    uc.x86_const.UC_X86_REG_RDI,
                    uc.x86_const.UC_X86_REG_RBP,
                    uc.x86_const.UC_X86_REG_R8,
                    uc.x86_const.UC_X86_REG_R9,
                    uc.x86_const.UC_X86_REG_R10,
                    uc.x86_const.UC_X86_REG_R11,
                    uc.x86_const.UC_X86_REG_R12,
                    uc.x86_const.UC_X86_REG_R13,
                    uc.x86_const.UC_X86_REG_R14,
                    uc.x86_const.UC_X86_REG_R15,
                ]:
                    emulator.reg_write(reg, stack_addr + stack_size)

            for reg, value in register_values.items():
                emulator.reg_write(reg, value)

            for segment in exe.segments():
                pmem = segment.physical
                vmem = segment.virtual
                try:
                    emulator.mem_map(vmem.lower, align(block_size, len(vmem)))
                    emulator.mem_write(vmem.lower, bytes(image[pmem.slice()]))
                except KeyboardInterrupt:
                    raise
                except Exception as error:
                    if address in vmem:
                        raise
                    self.log_info(F'error mapping segment [{vmem.lower:0{width}X}-{vmem.upper:0{width}X}]: {error!s}')

            tree = self._intervaltree.IntervalTree()
            state = EmuState(
                exe, tree, address, stack, blob, disassembler,
                stop=self.args.stop,
                sp_register=sp,
                ip_register=ip,
                rv_register=rv,
                allocations=[stack],
                max_wait=self.args.wait,
                max_loop=self.args.max_visits,
            )

            timeout = self.args.timeout
            if timeout is not None:
                self.log_info(F'setting timeout of {timeout} steps')
                state.ticks = timeout

            emulator.hook_add(uc.UC_HOOK_CODE, self._hook_code, user_data=state)
            emulator.hook_add(uc.UC_HOOK_MEM_WRITE, self._hook_mem_write, user_data=state, )
            emulator.hook_add(uc.UC_HOOK_MEM_READ_AFTER, self._hook_mem_read, user_data=state, )
            emulator.hook_add(uc.UC_HOOK_INSN_INVALID, self._hook_insn_error, user_data=state)
            emulator.hook_add(uc.UC_HOOK_MEM_INVALID, self._hook_mem_error, user_data=state)

            end_of_code = exe.location_from_address(address).virtual.box.upper

            try:
                emulator.emu_start(address, end_of_code)
            except uc.UcError:
                pass

            it: Iterator[Interval] = iter(tree)
            for interval in it:
                size = interval.end - interval.begin - 1
                if size not in bounds[self.args.patch_range]:
                    continue
                try:
                    patch = emulator.mem_read(interval.begin, size)
                except uc.UcError as error:
                    self.log_info(F'error reading 0x{interval.begin:0{width}X}:{size}: {error!s}')
                    continue
                if not any(patch):
                    continue
                self.log_info(F'memory patch at {state.fmt(interval.begin)} of size {size}')
                yield patch

    def _hook_mem_read(self, emu: Uc, access: int, address: int, size: int, value: int, state: EmuState):
        mask = (1 << (size * 8)) - 1
        state.last_read = value & mask

    def _hook_mem_write(self, emu: Uc, access: int, address: int, size: int, value: int, state: EmuState):
        try:
            mask = (1 << (size * 8)) - 1
            unsigned_value = value & mask

            if unsigned_value == state.expected_address:
                callstack = state.callstack
                state.retaddr = unsigned_value
                if not self.args.skip_calls:
                    if not callstack:
                        state.callstack_ceiling = emu.reg_read(state.sp_register)
                    callstack.append(unsigned_value)
                return
            else:
                state.retaddr = None

            skipped = False

            if (
                not self.args.log_stack_cookies
                and emu.reg_read(state.sp_register) ^ unsigned_value == state.last_read
            ):
                skipped = 'stack cookie'
            elif size not in bounds[self.args.write_range]:
                skipped = 'size excluded'
            elif (
                state.callstack_ceiling > 0
                and not self.args.log_writes_in_calls
                and address in range(state.callstack_ceiling - 0x200, state.callstack_ceiling)
            ):
                skipped = 'inside call'
            elif not self.args.log_stack_addresses and unsigned_value in state.stack:
                skipped = 'stack address'
            elif not self.args.log_other_addresses and not state.blob:
                for s in state.executable.sections():
                    if address in s.virtual:
                        skipped = F'write to section {s.name}'
                        break

            if (
                not skipped
                and unsigned_value == 0
                and state.writes.at(address) is not None
                and self.args.log_zero_overwrites is False
            ):
                try:
                    if any(emu.mem_read(address, size)):
                        skipped = 'zero overwrite'
                except Exception:
                    pass

            if not skipped:
                state.writes.addi(address, address + size + 1)
                state.writes.merge_overlaps()
                state.waiting = 0

            def info():
                data = unsigned_value.to_bytes(size, state.executable.byte_order().value)
                ph = state.executable.pointer_size // 4
                pt = state.executable.pointer_size // 8
                h = data.hex().upper()
                t = re.sub('[^!-~]', '.', data.decode('latin1'))
                msg = state.log(F'{state.fmt(address)} <- {h:_<{ph}} {t:_<{pt}}')
                if skipped:
                    msg = F'{msg} (ignored: {skipped})'
                return msg

            self.log_info(info)

        except KeyboardInterrupt:
            emu.emu_stop()
            return False

    def _hook_insn_error(self, emu: Uc, state: EmuState):
        self.log_debug('aborting emulation; instruction error')
        emu.emu_stop()
        return False

    def _hook_mem_error(self, emu: Uc, access: int, address: int, size: int, value: int, state: EmuState):
        bs = self.args.block_size
        try:
            emu.mem_map(align(bs, address, down=True), 2 * bs)
        except Exception:
            self.log_info(state.log(F'{state.fmt(address)} :: MEMORY ERROR'))
            return False
        else:
            return True

    def _hook_code(self, emu: Uc, address: int, size: int, state: EmuState):
        try:
            state.ticks -= 1
            state.visits[address] += 1
            if state.visits[address] > state.max_loop > 0:
                self.log_info(
                    F'aborting emulation: 0x{address:0{state.executable.pointer_size // 8}X}'
                    F' was visited more than {state.max_loop} times.')
                emu.emu_stop()
                return False
            if address == state.stop or state.ticks == 0:
                emu.emu_stop()
                return False
            waiting = state.waiting
            callstack = state.callstack
            depth = len(callstack)
            state.previous_address = address
            retaddr = state.retaddr
            state.retaddr = None

            if address != state.expected_address:
                if retaddr is not None and self.args.skip_calls:
                    if self.args.skip_calls > 1:
                        stack_size = self.args.stack_size
                        block_size = self.args.block_size
                        rv = state.rv_register
                        alloc_addr = align(block_size, state.allocations[-1].upper)
                        state.allocations.append(Range(alloc_addr, alloc_addr + stack_size))
                        emu.mem_map(alloc_addr, stack_size)
                        emu.reg_write(rv, alloc_addr)
                    ip = state.ip_register
                    sp = state.sp_register
                    ps = state.executable.pointer_size // 8
                    emu.reg_write(ip, retaddr)
                    emu.reg_write(sp, emu.reg_read(sp) + ps)
                    return
                if depth and address == callstack[-1]:
                    depth -= 1
                    state.callstack.pop()
                    if depth == 0:
                        state.callstack_ceiling = 0
                state.expected_address = address
            elif retaddr is not None and not self.args.skip_calls:
                # The present address was moved to the stack but we did not branch.
                # This is not quite accurate, of course: We could be calling the
                # next instruction. However, that sort of code is usually not really
                # a function call anyway, but rather a way to get the IP.
                callstack.pop()

            if waiting > self.args.wait:
                emu.emu_stop()
                return False
            if not depth or not self.args.wait_calls:
                state.waiting += 1
            state.expected_address += size

            instruction = state.disassemble(address, size)
            if instruction:
                instruction = F'{instruction.mnemonic} {instruction.op_str}'
                self.log_debug(state.log(instruction))
            else:
                self.log_debug(state.log('unrecognized instruction, aborting'))
                emu.emu_stop()

        except KeyboardInterrupt:
            emu.emu_stop()
            return False

    def _uc_arch(self, arch: Arch, bo: Optional[BO] = None) -> Tuple[int, int]:
        uc = self._unicorn
        arch, mode = {
            Arch.X32     : (uc.UC_ARCH_X86,   uc.UC_MODE_32),     # noqa
            Arch.X64     : (uc.UC_ARCH_X86,   uc.UC_MODE_64),     # noqa
            Arch.ARM32   : (uc.UC_ARCH_ARM,   uc.UC_MODE_ARM),    # noqa
            Arch.ARM64   : (uc.UC_ARCH_ARM,   uc.UC_MODE_THUMB),  # noqa
            Arch.MIPS16  : (uc.UC_ARCH_MIPS,  uc.UC_MODE_16),     # noqa
            Arch.MIPS32  : (uc.UC_ARCH_MIPS,  uc.UC_MODE_32),     # noqa
            Arch.MIPS64  : (uc.UC_ARCH_MIPS,  uc.UC_MODE_64),     # noqa
            Arch.PPC32   : (uc.UC_ARCH_PPC,   uc.UC_MODE_32),     # noqa
            Arch.PPC64   : (uc.UC_ARCH_PPC,   uc.UC_MODE_64),     # noqa
            Arch.SPARC32 : (uc.UC_ARCH_SPARC, uc.UC_MODE_32),     # noqa
            Arch.SPARC64 : (uc.UC_ARCH_SPARC, uc.UC_MODE_V9),     # noqa
        }[arch]
        if bo is not None:
            mode |= {
                BO.BE: uc.UC_MODE_BIG_ENDIAN,
                BO.LE: uc.UC_MODE_LITTLE_ENDIAN,
            }[bo]
        return arch, mode

    def _cs_arch(self, arch: Arch, bo: Optional[BO] = None) -> Tuple[int, int]:
        cs = self._capstone
        arch, mode = {
            Arch.X32     : (cs.CS_ARCH_X86,   cs.CS_MODE_32),     # noqa
            Arch.X64     : (cs.CS_ARCH_X86,   cs.CS_MODE_64),     # noqa
            Arch.ARM32   : (cs.CS_ARCH_ARM,   cs.CS_MODE_ARM),    # noqa
            Arch.ARM64   : (cs.CS_ARCH_ARM,   cs.CS_MODE_THUMB),  # noqa
            Arch.MIPS16  : (cs.CS_ARCH_MIPS,  cs.CS_MODE_16),     # noqa
            Arch.MIPS32  : (cs.CS_ARCH_MIPS,  cs.CS_MODE_32),     # noqa
            Arch.MIPS64  : (cs.CS_ARCH_MIPS,  cs.CS_MODE_64),     # noqa
            Arch.PPC32   : (cs.CS_ARCH_PPC,   cs.CS_MODE_32),     # noqa
            Arch.PPC64   : (cs.CS_ARCH_PPC,   cs.CS_MODE_64),     # noqa
            Arch.SPARC32 : (cs.CS_ARCH_SPARC, cs.CS_MODE_32),     # noqa
            Arch.SPARC64 : (cs.CS_ARCH_SPARC, cs.CS_MODE_V9),     # noqa
        }[arch]
        if bo is not None:
            mode |= {
                BO.BE: cs.CS_MODE_BIG_ENDIAN,
                BO.LE: cs.CS_MODE_LITTLE_ENDIAN,
            }[bo]
        return arch, mode

Classes

class EmuState (executable, writes, expected_address, stack, blob, disassembler=None, waiting=0, callstack=<factory>, retaddr=None, stop=None, previous_address=0, sp_register=0, ip_register=0, rv_register=0, callstack_ceiling=0, allocations=<factory>, ticks=<factory>, max_wait=0, max_loop=0, visits=<factory>, last_read=None)

EmuState(executable: 'Executable', writes: 'IntervalTree', expected_address: 'int', stack: 'Range', blob: 'bool', disassembler: 'Optional[Cs]' = None, waiting: 'int' = 0, callstack: 'List[int]' = , retaddr: 'Optional[int]' = None, stop: 'Optional[int]' = None, previous_address: 'int' = 0, sp_register: 'int' = 0, ip_register: 'int' = 0, rv_register: 'int' = 0, callstack_ceiling: 'int' = 0, allocations: 'List[Range]' = , ticks: 'int' = , max_wait: 'int' = 0, max_loop: 'int' = 0, visits: 'Dict[int, int]' = , last_read: 'Optional[int]' = None)

Expand source code Browse git
class EmuState:
    executable: Executable
    writes: IntervalTree
    expected_address: int
    stack: Range
    blob: bool
    disassembler: Optional[Cs] = None
    waiting: int = 0
    callstack: List[int] = field(default_factory=list)
    retaddr: Optional[int] = None
    stop: Optional[int] = None
    previous_address: int = 0
    sp_register: int = 0
    ip_register: int = 0
    rv_register: int = 0
    callstack_ceiling: int = 0
    allocations: List[Range] = field(default_factory=list)
    ticks: int = field(default_factory=lambda: INF)
    max_wait: int = 0
    max_loop: int = 0
    visits: Dict[int, int] = field(default_factory=lambda: defaultdict(int))
    last_read: Optional[int] = None

    def disassemble(self, address: int, size: int):
        if self.disassembler is None:
            return None
        try:
            pos = self.executable.location_from_address(address).physical.position
            end = pos + size
            return next(self.disassembler.disasm(
                bytes(self.executable.data[pos:end]), address, 1))
        except Exception:
            return None

    def log(self, msg: str) -> str:
        _width = len(str(self.max_wait))
        _depth = len(self.callstack)
        return F'[wait={self.waiting:0{_width}d}] [call={_depth}] {self.fmt(self.previous_address)}: {msg}'

    def fmt(self, address: int) -> str:
        return F'0x{address:0{self.executable.pointer_size // 4}X}'

Class variables

var executable
var writes
var expected_address
var stack
var blob
var callstack
var allocations
var ticks
var visits
var disassembler
var waiting
var retaddr
var stop
var previous_address
var sp_register
var ip_register
var rv_register
var callstack_ceiling
var max_wait
var max_loop
var last_read

Methods

def disassemble(self, address, size)
Expand source code Browse git
def disassemble(self, address: int, size: int):
    if self.disassembler is None:
        return None
    try:
        pos = self.executable.location_from_address(address).physical.position
        end = pos + size
        return next(self.disassembler.disasm(
            bytes(self.executable.data[pos:end]), address, 1))
    except Exception:
        return None
def log(self, msg)
Expand source code Browse git
def log(self, msg: str) -> str:
    _width = len(str(self.max_wait))
    _depth = len(self.callstack)
    return F'[wait={self.waiting:0{_width}d}] [call={_depth}] {self.fmt(self.previous_address)}: {msg}'
def fmt(self, address)
Expand source code Browse git
def fmt(self, address: int) -> str:
    return F'0x{address:0{self.executable.pointer_size // 4}X}'
class vstack (*address, stop=None, base=None, arch=Arch.X32, meta_registers=False, timeout=None, patch_range=slice(5, None, None), write_range=slice(1, None, None), wait=20, wait_calls=False, skip_calls=0, stack_size=65536, block_size=4096, max_visits=4096, log_writes_in_calls=False, log_stack_addresses=False, log_other_addresses=False, log_zero_overwrites=False, log_stack_cookies=False)

The unit emulates instructions at a given address in the input executable (PE/ELF/MachO) and extracts data patches that are written to the stack during emulation. Emulation is halted as soon as a certain number of instructions has not performed any memory writes, or when an error occurs. By default, most registers are set to the current location in the emulated stack. However, if you want to initialize certain registers differently, you can set an environment variable to the desired value.

Expand source code Browse git
class vstack(Unit):
    """
    The unit emulates instructions at a given address in the input executable (PE/ELF/MachO) and
    extracts data patches that are written to the stack during emulation. Emulation is halted as
    soon as a certain number of instructions has not performed any memory writes, or when an error
    occurs. By default, most registers are set to the current location in the emulated stack.
    However, if you want to initialize certain registers differently, you can set an environment
    variable to the desired value.
    """

    @Unit.Requires('intervaltree', 'default', 'extended')
    def _intervaltree():
        import intervaltree
        return intervaltree

    @Unit.Requires('unicorn==2.0.1.post1', 'default', 'extended')
    def _unicorn():
        with NoLogging():
            import unicorn
        import unicorn.x86_const
        import unicorn.arm64_const
        import unicorn.mips_const
        import unicorn.sparc_const
        try:
            import unicorn.ppc_const
        except ImportError:
            pass
        return unicorn

    @Unit.Requires('capstone', 'default', 'extended')
    def _capstone():
        import capstone
        return capstone

    def __init__(
        self,
        *address: Arg.Number(metavar='start', help='Specify the (virtual) addresses of a stack string instruction sequences.'),
        stop: Arg.Number('-s', metavar='stop', help='Optional: Stop when reaching this address.') = None,
        base: Arg.Number('-b', metavar='Addr', help='Optionally specify a custom base address B.') = None,
        arch: Arg.Option('-a', help='Specify for blob inputs: {choices}', choices=Arch) = Arch.X32,
        meta_registers: Arg.Switch('-r', help='Consume register initialization values from the chunk\'s metadata.') = False,
        timeout: Arg.Number('-t', help='Optionally stop emulating after a given number of instructions.') = None,
        patch_range: Arg.Bounds('-p', metavar='MIN:MAX',
            help='Extract only patches that are in the given range, default is {default}.') = slice(5, None),
        write_range: Arg.Bounds('-n', metavar='MIN:MAX',
            help='Log only writes whose size is in the given range, default is {default}.') = slice(1, None),
        wait: Arg.Number('-w', help=(
            'When this many instructions did not write to memory, emulation is halted. The default is {default}.')) = 20,
        wait_calls: Arg.Switch('-c', group='CALL',
            help='Wait indefinitely when inside a function call.') = False,
        skip_calls: Arg.Counts('-C', group='CALL',
            help='Skip function calls entirely. Use twice to treat each call as allocating memory.') = 0,
        stack_size: Arg.Number('-S', help='Optionally specify the stack size. The default is 0x{default:X}.') = 0x10000,
        block_size: Arg.Number('-B', help='Standard memory block size for the emulator, 0x{default:X} by default.') = 0x1000,
        max_visits: Arg.Number('-V', help='Maximum number of times a code address is visited. Default is {default}.') = 0x1000,
        log_writes_in_calls: Arg.Switch('-W', help='Log writes of values that occur in functions calls.') = False,
        log_stack_addresses: Arg.Switch('-X', help='Log writes of values that are stack addresses.') = False,
        log_other_addresses: Arg.Switch('-Y', help='Log writes of values that are addresses to mapped segments.') = False,
        log_zero_overwrites: Arg.Switch('-Z', help='Log writes of zeros to memory that contained nonzero values.') = False,
        log_stack_cookies  : Arg.Switch('-E', help='Log writes that look like stack cookies.') = False,
    ):
        super().__init__(
            address=address or [0],
            stop=stop,
            base=base,
            arch=Arg.AsOption(arch, Arch),
            meta_registers=meta_registers,
            timeout=timeout,
            patch_range=patch_range,
            write_range=write_range,
            wait=wait,
            stack_size=stack_size,
            wait_calls=wait_calls,
            skip_calls=skip_calls,
            block_size=block_size,
            max_visits=max_visits,
            log_writes_in_calls=log_writes_in_calls,
            log_stack_addresses=log_stack_addresses,
            log_other_addresses=log_other_addresses,
            log_zero_overwrites=log_zero_overwrites,
            log_stack_cookies=log_stack_cookies
        )

    def _find_stack_location(self, exe: Executable):
        stack_size = self.args.stack_size
        memory_max = 1 << exe.pointer_size
        space = exe.image_defined_address_space()
        aligned = align(stack_size, space.upper)
        if aligned + stack_size < memory_max:
            return aligned
        aligned = align(stack_size, space.lower - stack_size, down=True)
        if aligned > 0:
            return aligned
        raise RuntimeError('The primitive method used to map stack memory has failed.')

    def process(self, data):
        uc = self._unicorn
        blob = False
        try:
            exe = Executable.Load(data, self.args.base)
        except ValueError:
            exe = ExecutableCodeBlob(data, self.args.base, self.args.arch)
            blob = True
        arch = exe.arch()
        width = exe.pointer_size // 4
        block_size = self.args.block_size
        stack_size = self.args.stack_size
        stack_addr = self._find_stack_location(exe)
        self.log_info(F'mapping {SizeInt(stack_size)!r} of stack at 0x{stack_addr:X}')
        image = memoryview(data)
        disassembler = self._capstone.Cs(*self._cs_arch(arch, exe.byte_order()))
        register_values = {}

        if arch in (Arch.PPC32, Arch.PPC64):
            try:
                sp = uc.ppc_const.UC_PPC_REG_1
                rv = uc.ppc_const.UC_PPC_REG_3
                ip = uc.ppc_const.UC_PPC_REG_PC
            except AttributeError:
                raise RuntimeError('The installed unicorn version does not support the PPC architecture.')
        else:
            sp, ip, rv = {
                Arch.X32     : (
                    uc.x86_const.UC_X86_REG_ESP,
                    uc.x86_const.UC_X86_REG_EIP,
                    uc.x86_const.UC_X86_REG_EAX,
                ),
                Arch.X64     : (
                    uc.x86_const.UC_X86_REG_RSP,
                    uc.x86_const.UC_X86_REG_RIP,
                    uc.x86_const.UC_X86_REG_RAX,
                ),
                Arch.ARM32   : (
                    uc.arm_const.UC_ARM_REG_SP,
                    uc.arm_const.UC_ARM_REG_IP,
                    uc.arm_const.UC_ARM_REG_R0,
                ),
                Arch.ARM64   : (
                    uc.arm_const.UC_ARM_REG_SP,
                    uc.arm_const.UC_ARM_REG_IP,
                    uc.arm_const.UC_ARM_REG_R0,
                ),
                Arch.MIPS16  : (
                    uc.mips_const.UC_MIPS_REG_SP,
                    uc.mips_const.UC_MIPS_REG_PC,
                    uc.mips_const.UC_MIPS_REG_0,
                ),
                Arch.MIPS32  : (
                    uc.mips_const.UC_MIPS_REG_SP,
                    uc.mips_const.UC_MIPS_REG_PC,
                    uc.mips_const.UC_MIPS_REG_V0,
                ),
                Arch.MIPS64  : (
                    uc.mips_const.UC_MIPS_REG_SP,
                    uc.mips_const.UC_MIPS_REG_PC,
                    uc.mips_const.UC_MIPS_REG_V0,
                ),
                Arch.SPARC32 : (
                    uc.sparc_const.UC_SPARC_REG_SP,
                    uc.sparc_const.UC_SPARC_REG_PC,
                    uc.sparc_const.UC_SPARC_REG_O0,
                ),
                Arch.SPARC64 : (
                    uc.sparc_const.UC_SPARC_REG_SP,
                    uc.sparc_const.UC_SPARC_REG_PC,
                    uc.sparc_const.UC_SPARC_REG_O0,
                ),
            }[arch]

        if self.args.meta_registers:
            from refinery.lib.meta import metavars
            meta = metavars(data)
            for module in [uc.x86_const, uc.arm_const, uc.mips_const, uc.sparc_const]:
                md: Dict[str, Any] = module.__dict__
                for name, register in md.items():
                    try:
                        u, *_, kind, name = name.split('_')
                    except Exception:
                        continue
                    if kind != 'REG' or u != 'UC':
                        continue
                    for var, value in list(meta.items()):
                        if var.upper() != name:
                            continue
                        meta.discard(var)
                        register_values[register] = value
                        break

        for address in self.args.address:

            emulator = uc.Uc(*self._uc_arch(arch, exe.byte_order()))
            stack = Range(stack_addr, stack_addr + 3 * stack_size)

            emulator.mem_map(stack.lower, len(stack))
            emulator.reg_write(sp, stack.lower + 2 * len(stack) // 3)

            if arch is Arch.X32:
                for reg in [
                    uc.x86_const.UC_X86_REG_EAX,
                    uc.x86_const.UC_X86_REG_EBX,
                    uc.x86_const.UC_X86_REG_ECX,
                    uc.x86_const.UC_X86_REG_EDX,
                    uc.x86_const.UC_X86_REG_ESI,
                    uc.x86_const.UC_X86_REG_EDI,
                    uc.x86_const.UC_X86_REG_EBP,
                ]:
                    emulator.reg_write(reg, stack_addr + stack_size)
            if arch is Arch.X64:
                for reg in [
                    uc.x86_const.UC_X86_REG_RAX,
                    uc.x86_const.UC_X86_REG_RBX,
                    uc.x86_const.UC_X86_REG_RCX,
                    uc.x86_const.UC_X86_REG_RDX,
                    uc.x86_const.UC_X86_REG_RSI,
                    uc.x86_const.UC_X86_REG_RDI,
                    uc.x86_const.UC_X86_REG_RBP,
                    uc.x86_const.UC_X86_REG_R8,
                    uc.x86_const.UC_X86_REG_R9,
                    uc.x86_const.UC_X86_REG_R10,
                    uc.x86_const.UC_X86_REG_R11,
                    uc.x86_const.UC_X86_REG_R12,
                    uc.x86_const.UC_X86_REG_R13,
                    uc.x86_const.UC_X86_REG_R14,
                    uc.x86_const.UC_X86_REG_R15,
                ]:
                    emulator.reg_write(reg, stack_addr + stack_size)

            for reg, value in register_values.items():
                emulator.reg_write(reg, value)

            for segment in exe.segments():
                pmem = segment.physical
                vmem = segment.virtual
                try:
                    emulator.mem_map(vmem.lower, align(block_size, len(vmem)))
                    emulator.mem_write(vmem.lower, bytes(image[pmem.slice()]))
                except KeyboardInterrupt:
                    raise
                except Exception as error:
                    if address in vmem:
                        raise
                    self.log_info(F'error mapping segment [{vmem.lower:0{width}X}-{vmem.upper:0{width}X}]: {error!s}')

            tree = self._intervaltree.IntervalTree()
            state = EmuState(
                exe, tree, address, stack, blob, disassembler,
                stop=self.args.stop,
                sp_register=sp,
                ip_register=ip,
                rv_register=rv,
                allocations=[stack],
                max_wait=self.args.wait,
                max_loop=self.args.max_visits,
            )

            timeout = self.args.timeout
            if timeout is not None:
                self.log_info(F'setting timeout of {timeout} steps')
                state.ticks = timeout

            emulator.hook_add(uc.UC_HOOK_CODE, self._hook_code, user_data=state)
            emulator.hook_add(uc.UC_HOOK_MEM_WRITE, self._hook_mem_write, user_data=state, )
            emulator.hook_add(uc.UC_HOOK_MEM_READ_AFTER, self._hook_mem_read, user_data=state, )
            emulator.hook_add(uc.UC_HOOK_INSN_INVALID, self._hook_insn_error, user_data=state)
            emulator.hook_add(uc.UC_HOOK_MEM_INVALID, self._hook_mem_error, user_data=state)

            end_of_code = exe.location_from_address(address).virtual.box.upper

            try:
                emulator.emu_start(address, end_of_code)
            except uc.UcError:
                pass

            it: Iterator[Interval] = iter(tree)
            for interval in it:
                size = interval.end - interval.begin - 1
                if size not in bounds[self.args.patch_range]:
                    continue
                try:
                    patch = emulator.mem_read(interval.begin, size)
                except uc.UcError as error:
                    self.log_info(F'error reading 0x{interval.begin:0{width}X}:{size}: {error!s}')
                    continue
                if not any(patch):
                    continue
                self.log_info(F'memory patch at {state.fmt(interval.begin)} of size {size}')
                yield patch

    def _hook_mem_read(self, emu: Uc, access: int, address: int, size: int, value: int, state: EmuState):
        mask = (1 << (size * 8)) - 1
        state.last_read = value & mask

    def _hook_mem_write(self, emu: Uc, access: int, address: int, size: int, value: int, state: EmuState):
        try:
            mask = (1 << (size * 8)) - 1
            unsigned_value = value & mask

            if unsigned_value == state.expected_address:
                callstack = state.callstack
                state.retaddr = unsigned_value
                if not self.args.skip_calls:
                    if not callstack:
                        state.callstack_ceiling = emu.reg_read(state.sp_register)
                    callstack.append(unsigned_value)
                return
            else:
                state.retaddr = None

            skipped = False

            if (
                not self.args.log_stack_cookies
                and emu.reg_read(state.sp_register) ^ unsigned_value == state.last_read
            ):
                skipped = 'stack cookie'
            elif size not in bounds[self.args.write_range]:
                skipped = 'size excluded'
            elif (
                state.callstack_ceiling > 0
                and not self.args.log_writes_in_calls
                and address in range(state.callstack_ceiling - 0x200, state.callstack_ceiling)
            ):
                skipped = 'inside call'
            elif not self.args.log_stack_addresses and unsigned_value in state.stack:
                skipped = 'stack address'
            elif not self.args.log_other_addresses and not state.blob:
                for s in state.executable.sections():
                    if address in s.virtual:
                        skipped = F'write to section {s.name}'
                        break

            if (
                not skipped
                and unsigned_value == 0
                and state.writes.at(address) is not None
                and self.args.log_zero_overwrites is False
            ):
                try:
                    if any(emu.mem_read(address, size)):
                        skipped = 'zero overwrite'
                except Exception:
                    pass

            if not skipped:
                state.writes.addi(address, address + size + 1)
                state.writes.merge_overlaps()
                state.waiting = 0

            def info():
                data = unsigned_value.to_bytes(size, state.executable.byte_order().value)
                ph = state.executable.pointer_size // 4
                pt = state.executable.pointer_size // 8
                h = data.hex().upper()
                t = re.sub('[^!-~]', '.', data.decode('latin1'))
                msg = state.log(F'{state.fmt(address)} <- {h:_<{ph}} {t:_<{pt}}')
                if skipped:
                    msg = F'{msg} (ignored: {skipped})'
                return msg

            self.log_info(info)

        except KeyboardInterrupt:
            emu.emu_stop()
            return False

    def _hook_insn_error(self, emu: Uc, state: EmuState):
        self.log_debug('aborting emulation; instruction error')
        emu.emu_stop()
        return False

    def _hook_mem_error(self, emu: Uc, access: int, address: int, size: int, value: int, state: EmuState):
        bs = self.args.block_size
        try:
            emu.mem_map(align(bs, address, down=True), 2 * bs)
        except Exception:
            self.log_info(state.log(F'{state.fmt(address)} :: MEMORY ERROR'))
            return False
        else:
            return True

    def _hook_code(self, emu: Uc, address: int, size: int, state: EmuState):
        try:
            state.ticks -= 1
            state.visits[address] += 1
            if state.visits[address] > state.max_loop > 0:
                self.log_info(
                    F'aborting emulation: 0x{address:0{state.executable.pointer_size // 8}X}'
                    F' was visited more than {state.max_loop} times.')
                emu.emu_stop()
                return False
            if address == state.stop or state.ticks == 0:
                emu.emu_stop()
                return False
            waiting = state.waiting
            callstack = state.callstack
            depth = len(callstack)
            state.previous_address = address
            retaddr = state.retaddr
            state.retaddr = None

            if address != state.expected_address:
                if retaddr is not None and self.args.skip_calls:
                    if self.args.skip_calls > 1:
                        stack_size = self.args.stack_size
                        block_size = self.args.block_size
                        rv = state.rv_register
                        alloc_addr = align(block_size, state.allocations[-1].upper)
                        state.allocations.append(Range(alloc_addr, alloc_addr + stack_size))
                        emu.mem_map(alloc_addr, stack_size)
                        emu.reg_write(rv, alloc_addr)
                    ip = state.ip_register
                    sp = state.sp_register
                    ps = state.executable.pointer_size // 8
                    emu.reg_write(ip, retaddr)
                    emu.reg_write(sp, emu.reg_read(sp) + ps)
                    return
                if depth and address == callstack[-1]:
                    depth -= 1
                    state.callstack.pop()
                    if depth == 0:
                        state.callstack_ceiling = 0
                state.expected_address = address
            elif retaddr is not None and not self.args.skip_calls:
                # The present address was moved to the stack but we did not branch.
                # This is not quite accurate, of course: We could be calling the
                # next instruction. However, that sort of code is usually not really
                # a function call anyway, but rather a way to get the IP.
                callstack.pop()

            if waiting > self.args.wait:
                emu.emu_stop()
                return False
            if not depth or not self.args.wait_calls:
                state.waiting += 1
            state.expected_address += size

            instruction = state.disassemble(address, size)
            if instruction:
                instruction = F'{instruction.mnemonic} {instruction.op_str}'
                self.log_debug(state.log(instruction))
            else:
                self.log_debug(state.log('unrecognized instruction, aborting'))
                emu.emu_stop()

        except KeyboardInterrupt:
            emu.emu_stop()
            return False

    def _uc_arch(self, arch: Arch, bo: Optional[BO] = None) -> Tuple[int, int]:
        uc = self._unicorn
        arch, mode = {
            Arch.X32     : (uc.UC_ARCH_X86,   uc.UC_MODE_32),     # noqa
            Arch.X64     : (uc.UC_ARCH_X86,   uc.UC_MODE_64),     # noqa
            Arch.ARM32   : (uc.UC_ARCH_ARM,   uc.UC_MODE_ARM),    # noqa
            Arch.ARM64   : (uc.UC_ARCH_ARM,   uc.UC_MODE_THUMB),  # noqa
            Arch.MIPS16  : (uc.UC_ARCH_MIPS,  uc.UC_MODE_16),     # noqa
            Arch.MIPS32  : (uc.UC_ARCH_MIPS,  uc.UC_MODE_32),     # noqa
            Arch.MIPS64  : (uc.UC_ARCH_MIPS,  uc.UC_MODE_64),     # noqa
            Arch.PPC32   : (uc.UC_ARCH_PPC,   uc.UC_MODE_32),     # noqa
            Arch.PPC64   : (uc.UC_ARCH_PPC,   uc.UC_MODE_64),     # noqa
            Arch.SPARC32 : (uc.UC_ARCH_SPARC, uc.UC_MODE_32),     # noqa
            Arch.SPARC64 : (uc.UC_ARCH_SPARC, uc.UC_MODE_V9),     # noqa
        }[arch]
        if bo is not None:
            mode |= {
                BO.BE: uc.UC_MODE_BIG_ENDIAN,
                BO.LE: uc.UC_MODE_LITTLE_ENDIAN,
            }[bo]
        return arch, mode

    def _cs_arch(self, arch: Arch, bo: Optional[BO] = None) -> Tuple[int, int]:
        cs = self._capstone
        arch, mode = {
            Arch.X32     : (cs.CS_ARCH_X86,   cs.CS_MODE_32),     # noqa
            Arch.X64     : (cs.CS_ARCH_X86,   cs.CS_MODE_64),     # noqa
            Arch.ARM32   : (cs.CS_ARCH_ARM,   cs.CS_MODE_ARM),    # noqa
            Arch.ARM64   : (cs.CS_ARCH_ARM,   cs.CS_MODE_THUMB),  # noqa
            Arch.MIPS16  : (cs.CS_ARCH_MIPS,  cs.CS_MODE_16),     # noqa
            Arch.MIPS32  : (cs.CS_ARCH_MIPS,  cs.CS_MODE_32),     # noqa
            Arch.MIPS64  : (cs.CS_ARCH_MIPS,  cs.CS_MODE_64),     # noqa
            Arch.PPC32   : (cs.CS_ARCH_PPC,   cs.CS_MODE_32),     # noqa
            Arch.PPC64   : (cs.CS_ARCH_PPC,   cs.CS_MODE_64),     # noqa
            Arch.SPARC32 : (cs.CS_ARCH_SPARC, cs.CS_MODE_32),     # noqa
            Arch.SPARC64 : (cs.CS_ARCH_SPARC, cs.CS_MODE_V9),     # noqa
        }[arch]
        if bo is not None:
            mode |= {
                BO.BE: cs.CS_MODE_BIG_ENDIAN,
                BO.LE: cs.CS_MODE_LITTLE_ENDIAN,
            }[bo]
        return arch, mode

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Inherited members