Module refinery.units.formats.exe.vstack
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Union, List, Dict, TYPE_CHECKING
import enum
import functools
import re
from refinery.units import Arg, Unit
from refinery.lib.executable import Arch, Range
from refinery.lib.types import bounds, INF
from refinery.lib.meta import metavars
from refinery.lib.tools import isbuffer, NoLogging
from refinery.lib.emulator import Emulator, SpeakeasyEmulator, UnicornEmulator, IcicleEmulator, Hook, EmulationError
from refinery.lib.argformats import PythonExpression, ParserVariableMissing
from dataclasses import dataclass, field
from collections import defaultdict
if TYPE_CHECKING:
from typing import Optional, Iterator, TypeVar
from intervaltree import IntervalTree, Interval
FN = TypeVar('FN')
class Engine(enum.Enum):
speakeasy = SpeakeasyEmulator
icicle = IcicleEmulator
unicorn = UnicornEmulator
@dataclass
class EmuConfig:
__slots__ = (
'wait_calls',
'skip_calls',
'write_range',
'wait',
'block_size',
'stack_size',
'max_visits',
'log_stack_cookies',
'log_writes_in_calls',
'log_stack_addresses',
'log_other_addresses',
'log_zero_overwrites',
)
wait_calls: bool
skip_calls: bool
write_range: slice
wait: int
block_size: int
stack_size: int
max_visits: int
log_stack_cookies: bool
log_writes_in_calls: bool
log_stack_addresses: bool
log_other_addresses: bool
log_zero_overwrites: bool
@dataclass
class EmuState:
cfg: EmuConfig
writes: IntervalTree
expected_address: int
address_width: int
waiting: int = 0
callstack: List[int] = field(default_factory=list)
retaddr: Optional[int] = None
stop: Optional[int] = None
previous_address: int = 0
callstack_ceiling: int = 0
ticks: int = field(default_factory=lambda: INF)
visits: Dict[int, int] = field(default_factory=lambda: defaultdict(int))
init_registers: List[int] = field(default_factory=list)
last_read: Optional[int] = None
def log(self, msg: str) -> str:
_width = len(str(self.cfg.wait))
_depth = len(self.callstack)
return F'[wait={self.waiting:0{_width}d}] [depth={_depth}] {self.fmt(self.previous_address)}: {msg}'
def fmt(self, address: int) -> str:
return F'0x{address:0{self.address_width}X}'
def inject_state_argument(pfn: FN) -> FN:
@functools.wraps(pfn)
def wrapped(self: VStackEmulatorMixin, *args, **kwargs):
if 'state' in kwargs:
kwargs.update(state=self.state)
else:
*head, state = args
if state is None:
args = *head, self.state
try:
return pfn(self, *args, **kwargs)
except KeyboardInterrupt:
self.halt()
return False
return wrapped
class VStackEmulatorMixin(Emulator):
def stackrange(self):
return Range(self.stack_base, self.stack_base + self.stack_size)
def disassemble(self, address: int, size: int):
ea = address - self.base + self.exe.base
try:
_cs = self.disassembler()
pos = self.exe.location_from_address(ea).physical.position
end = pos + size
return next(_cs.disasm(bytes(self.exe.data[pos:end]), address, 1))
except Exception:
return None
@inject_state_argument
def hook_mem_read(self, _, access: int, address: int, size: int, value: int, state: EmuState):
mask = (1 << (size * 8)) - 1
state.last_read = value & mask
@inject_state_argument
def hook_mem_write(self, _, access: int, address: int, size: int, value: int, state: EmuState):
mask = (1 << (size * 8)) - 1
unsigned_value = value & mask
if unsigned_value == state.expected_address:
callstack = state.callstack
state.retaddr = unsigned_value
if not state.cfg.skip_calls:
if not callstack:
state.callstack_ceiling = self.sp
callstack.append(unsigned_value)
return
else:
state.retaddr = None
skipped = False
if (
not state.cfg.log_stack_cookies
and self.sp ^ unsigned_value == state.last_read
):
skipped = 'stack cookie'
elif size not in bounds[state.cfg.write_range]:
skipped = 'size excluded'
elif (
state.callstack_ceiling > 0
and not state.cfg.log_writes_in_calls
and address in range(state.callstack_ceiling - 0x200, state.callstack_ceiling)
):
skipped = 'inside call'
elif not state.cfg.log_stack_addresses and unsigned_value in self.stackrange():
skipped = 'stack address'
elif not state.cfg.log_other_addresses and not self.exe.blob:
for s in self.exe.sections():
if address in s.virtual:
skipped = F'write to section {s.name}'
break
if (
not skipped
and unsigned_value == 0
and state.writes.at(address) is not None
and state.cfg.log_zero_overwrites is False
):
try:
if any(self.mem_read(address, size)):
skipped = 'zero overwrite'
except Exception:
pass
if not skipped:
state.writes.addi(address, address + size + 1)
state.waiting = 0
def info():
data = unsigned_value.to_bytes(size, self.exe.byte_order().value)
ph = self.exe.pointer_size // 4
pt = self.exe.pointer_size // 8
h = data.hex().upper()
t = re.sub('[^!-~]', '.', data.decode('latin1'))
msg = state.log(F'{state.fmt(address)} <- {h:_<{ph}} {t:_<{pt}}')
if skipped:
msg = F'{msg} (ignored: {skipped})'
return msg
vstack.log_info(info)
@inject_state_argument
def hook_mem_error(self, _, access: int, address: int, size: int, value: int, state: EmuState) -> bool:
try:
self.map(self.align(address, down=True), self.alloc_size)
except Exception:
vstack.log_debug(F'error accessing memory at {state.fmt(address)}')
return True
def hook_code_error(self, _, state: EmuState):
vstack.log_debug('aborting emulation; instruction error')
self.halt()
return False
@inject_state_argument
def hook_code_execute(self, _, address: int, size: int, state: EmuState):
if _init := state.init_registers:
tos = self.sp
for reg in _init:
self.set_register(reg, tos)
_init.clear()
state.ticks -= 1
state.visits[address] += 1
if state.visits[address] > state.cfg.max_visits > 0:
vstack.log_info(
F'aborting emulation: 0x{address:0{self.exe.pointer_size // 8}X}'
F' was visited more than {state.cfg.max_visits} times.')
self.halt()
return False
if address == state.stop or state.ticks == 0:
self.halt()
return False
waiting = state.waiting
callstack = state.callstack
depth = len(callstack)
state.previous_address = address
retaddr = state.retaddr
state.retaddr = None
if address != state.expected_address:
if retaddr is not None and state.cfg.skip_calls:
if state.cfg.skip_calls > 1:
self.rv = self.malloc(state.cfg.block_size)
self.ip = retaddr
self.sp = self.sp + (self.exe.pointer_size // 8)
return
if depth and address == callstack[-1]:
depth -= 1
state.callstack.pop()
if depth == 0:
state.callstack_ceiling = 0
state.expected_address = address
elif retaddr is not None and not state.cfg.skip_calls:
# The present address was moved to the stack but we did not branch.
# This is not quite accurate, of course: We could be calling the
# next instruction. However, that sort of code is usually not really
# a function call anyway, but rather a way to get the IP.
callstack.pop()
if waiting > state.cfg.wait:
self.halt()
return False
if not depth or not state.cfg.wait_calls:
state.waiting += 1
state.expected_address += size
def _log():
instruction = self.disassemble(address, size)
if instruction:
return F'{instruction.mnemonic} {instruction.op_str}'
return 'unrecognized instruction'
vstack.log_debug(lambda: state.log(_log()))
class vstack(Unit):
"""
The unit emulates instructions at a given address in the input executable (PE/ELF/MachO) and
extracts data patches that are written to the stack during emulation. Emulation is halted as
soon as a certain number of instructions has not performed any memory writes, or when an error
occurs. By default, most registers are set to the current location in the emulated stack.
However, if you want to initialize certain registers differently, you can set an environment
variable to the desired value.
"""
@Unit.Requires('intervaltree', 'default', 'extended')
def _intervaltree():
import intervaltree
return intervaltree
@Unit.Requires('capstone', 'default', 'extended')
def _capstone():
import capstone
return capstone
@Unit.Requires('unicorn', 'default', 'extended')
def _unicorn():
with NoLogging():
import unicorn
return unicorn
@Unit.Requires('speakeasy-emulator', 'extended')
def _speakeasy():
import speakeasy
return speakeasy
@Unit.Requires('icicle-emu', 'all')
def _icicle():
import icicle
return icicle
def __init__(
self,
*address: Arg.NumSeq(metavar='start', help='Specify the (virtual) addresses of a stack string instruction sequences.'),
stop: Arg.Number('-s', metavar='stop', help='Optional: Stop when reaching this address.') = None,
base: Arg.Number('-b', metavar='Addr', help='Optionally specify a custom base address B.') = None,
arch: Arg.Option('-a', help='Specify for blob inputs: {choices}', choices=Arch) = Arch.X32,
engine: Arg.Option('-e', choices=Engine,
help='The emulator engine. The default is {default}, options are: {choices}') = Engine.unicorn,
meta_registers: Arg.Switch('-r', help=(
'Consume register initialization values from the chunk\'s metadata. If the value is a byte string, '
'the data will be mapped.')) = False,
timeout: Arg.Number('-t', help='Optionally stop emulating after a given number of instructions.') = None,
patch_range: Arg.Bounds('-p', metavar='MIN:MAX',
help='Extract only patches that are in the given range, default is {default}.') = slice(5, None),
write_range: Arg.Bounds('-n', metavar='MIN:MAX',
help='Log only writes whose size is in the given range, default is {default}.') = slice(1, None),
wait: Arg.Number('-w', help=(
'When this many instructions did not write to memory, emulation is halted. The default is {default}.')) = 20,
wait_calls: Arg.Switch('-c', group='CALL',
help='Wait indefinitely when inside a function call.') = False,
skip_calls: Arg.Counts('-C', group='CALL',
help='Skip function calls entirely. Use twice to treat each call as allocating memory.') = 0,
stack_size: Arg.Number('-S', help='Optionally specify the stack size. The default is 0x{default:X}.') = 0x10000,
stack_push: Arg('-u', action='append', type=str, metavar='REG',
help='Push the value of a register to the stack before beginning emulation; implies -r.') = None,
block_size: Arg.Number('-B', help='Standard memory block size for the emulator, 0x{default:X} by default.') = 0x1000,
max_visits: Arg.Number('-V', help='Maximum number of times a code address is visited. Default is {default}.') = 0x1000,
log_writes_in_calls: Arg.Switch('-W', help='Log writes of values that occur in functions calls.') = False,
log_stack_addresses: Arg.Switch('-X', help='Log writes of values that are stack addresses.') = False,
log_other_addresses: Arg.Switch('-Y', help='Log writes of values that are addresses to mapped segments.') = False,
log_zero_overwrites: Arg.Switch('-Z', help='Log writes of zeros to memory that contained nonzero values.') = False,
log_stack_cookies : Arg.Switch('-E', help='Log writes that look like stack cookies.') = False,
):
super().__init__(
address=address,
stop=stop,
base=base,
arch=Arg.AsOption(arch, Arch),
engine=Arg.AsOption(engine, Engine),
meta_registers=meta_registers,
timeout=timeout,
patch_range=patch_range,
write_range=write_range,
wait=wait,
stack_size=stack_size,
stack_push=stack_push,
wait_calls=wait_calls,
skip_calls=skip_calls,
block_size=block_size,
max_visits=max_visits,
log_writes_in_calls=log_writes_in_calls,
log_stack_addresses=log_stack_addresses,
log_other_addresses=log_other_addresses,
log_zero_overwrites=log_zero_overwrites,
log_stack_cookies=log_stack_cookies
)
def process(self, data):
meta = metavars(data)
args = self.args
engine: Engine = args.engine
self.log_debug(F'attempting to use {engine.name}')
getattr(self, F'_{engine.name}')
class Emu(engine.value, VStackEmulatorMixin):
pass
emu = Emu(
data,
args.base,
args.arch,
Hook.Everything,
args.block_size,
args.stack_size,
)
cfg = EmuConfig(
args.wait_calls,
args.skip_calls,
args.write_range,
args.wait,
args.block_size,
args.stack_size,
args.max_visits,
args.log_stack_cookies,
args.log_writes_in_calls,
args.log_stack_addresses,
args.log_other_addresses,
args.log_zero_overwrites,
)
register_values = {}
if args.meta_registers or args.stack_push:
for var, value in list(meta.items()):
try:
register = emu.lookup_register(var)
except LookupError:
continue
meta.discard(var)
register_values[register] = var, value
def parse_address(a: Union[int, bytes]):
if isinstance(a, int):
return a
a = a.decode(self.codec)
if m := re.fullmatch('(?i)([A-F0-9]+)H?', a):
return int(m[1], 16)
try:
return PythonExpression.Evaluate(a, meta)
except ParserVariableMissing:
pass
symbols = list(emu.exe.symbols())
for filter in [
lambda s: s.get_name().casefold() == a.casefold(),
lambda s: s.name == a,
lambda s: s.code,
lambda s: s.exported
]:
symbols = [s for s in symbols if filter(s)]
if len(symbols) == 1:
return symbols[0].address
if len(symbols) > 1:
raise RuntimeError(F'there are {len(symbols)} exported function symbol named "{a}", please specify the address')
if not symbols:
raise LookupError(F'no symbol with name "{a}" was found')
addresses = [parse_address(a) for a in args.address]
if not addresses:
for symbol in emu.exe.symbols():
if symbol.name is None:
addresses.append(symbol.address)
break
for address in addresses:
tree = self._intervaltree.IntervalTree()
state = EmuState(cfg, tree, address, emu.exe.pointer_size // 4, stop=args.stop)
emu.reset(state)
for reg in emu.general_purpose_registers():
if reg not in register_values:
state.init_registers.append(reg)
for reg, (var, value) in register_values.items():
if isinstance(value, int):
self.log_info(F'setting {var} to integer value 0x{value:X}')
emu.set_register(reg, value)
continue
if isinstance(value, str):
value = value.encode()
if isbuffer(value):
base = emu.malloc(len(value))
emu.mem_write(base, bytes(value))
emu.set_register(reg, base)
self.log_info(F'setting {var} to mapped buffer of size 0x{size:X}')
continue
_tn = value.__class__.__name__
self.log_warn(F'canot interpret value of type {_tn} for register {var}')
if push := args.stack_push:
for reg in push:
emu.push_register(reg)
timeout = args.timeout
if timeout is not None:
self.log_info(F'setting timeout of {timeout} steps')
state.ticks = timeout
try:
emu.emulate(address, args.stop)
except EmulationError:
pass
tree.merge_overlaps()
it: Iterator[Interval] = iter(tree)
for interval in it:
size = interval.end - interval.begin - 1
if size not in bounds[args.patch_range]:
continue
try:
patch = emu.mem_read(interval.begin, size)
except Exception as error:
width = emu.exe.pointer_size // 4
self.log_info(F'error reading 0x{interval.begin:0{width}X}:{size}: {error!s}')
continue
if not any(patch):
continue
self.log_info(F'memory patch at {state.fmt(interval.begin)} of size {size}')
yield patch
Functions
def inject_state_argument(pfn)
-
Expand source code Browse git
def inject_state_argument(pfn: FN) -> FN: @functools.wraps(pfn) def wrapped(self: VStackEmulatorMixin, *args, **kwargs): if 'state' in kwargs: kwargs.update(state=self.state) else: *head, state = args if state is None: args = *head, self.state try: return pfn(self, *args, **kwargs) except KeyboardInterrupt: self.halt() return False return wrapped
Classes
class Engine (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code Browse git
class Engine(enum.Enum): speakeasy = SpeakeasyEmulator icicle = IcicleEmulator unicorn = UnicornEmulator
Ancestors
- enum.Enum
Class variables
var speakeasy
var icicle
var unicorn
class EmuConfig (wait_calls, skip_calls, write_range, wait, block_size, stack_size, max_visits, log_stack_cookies, log_writes_in_calls, log_stack_addresses, log_other_addresses, log_zero_overwrites)
-
EmuConfig(wait_calls: 'bool', skip_calls: 'bool', write_range: 'slice', wait: 'int', block_size: 'int', stack_size: 'int', max_visits: 'int', log_stack_cookies: 'bool', log_writes_in_calls: 'bool', log_stack_addresses: 'bool', log_other_addresses: 'bool', log_zero_overwrites: 'bool')
Expand source code Browse git
class EmuConfig: __slots__ = ( 'wait_calls', 'skip_calls', 'write_range', 'wait', 'block_size', 'stack_size', 'max_visits', 'log_stack_cookies', 'log_writes_in_calls', 'log_stack_addresses', 'log_other_addresses', 'log_zero_overwrites', ) wait_calls: bool skip_calls: bool write_range: slice wait: int block_size: int stack_size: int max_visits: int log_stack_cookies: bool log_writes_in_calls: bool log_stack_addresses: bool log_other_addresses: bool log_zero_overwrites: bool
Instance variables
var block_size
-
Return an attribute of instance, which is of type owner.
var log_other_addresses
-
Return an attribute of instance, which is of type owner.
var log_stack_addresses
-
Return an attribute of instance, which is of type owner.
-
Return an attribute of instance, which is of type owner.
var log_writes_in_calls
-
Return an attribute of instance, which is of type owner.
var log_zero_overwrites
-
Return an attribute of instance, which is of type owner.
var max_visits
-
Return an attribute of instance, which is of type owner.
var skip_calls
-
Return an attribute of instance, which is of type owner.
var stack_size
-
Return an attribute of instance, which is of type owner.
var wait
-
Return an attribute of instance, which is of type owner.
var wait_calls
-
Return an attribute of instance, which is of type owner.
var write_range
-
Return an attribute of instance, which is of type owner.
class EmuState (cfg, writes, expected_address, address_width, waiting=0, callstack=<factory>, retaddr=None, stop=None, previous_address=0, callstack_ceiling=0, ticks=<factory>, visits=<factory>, init_registers=<factory>, last_read=None)
-
EmuState(cfg: 'EmuConfig', writes: 'IntervalTree', expected_address: 'int', address_width: 'int', waiting: 'int' = 0, callstack: 'List[int]' =
, retaddr: 'Optional[int]' = None, stop: 'Optional[int]' = None, previous_address: 'int' = 0, callstack_ceiling: 'int' = 0, ticks: 'int' = , visits: 'Dict[int, int]' = , init_registers: 'List[int]' = , last_read: 'Optional[int]' = None) Expand source code Browse git
class EmuState: cfg: EmuConfig writes: IntervalTree expected_address: int address_width: int waiting: int = 0 callstack: List[int] = field(default_factory=list) retaddr: Optional[int] = None stop: Optional[int] = None previous_address: int = 0 callstack_ceiling: int = 0 ticks: int = field(default_factory=lambda: INF) visits: Dict[int, int] = field(default_factory=lambda: defaultdict(int)) init_registers: List[int] = field(default_factory=list) last_read: Optional[int] = None def log(self, msg: str) -> str: _width = len(str(self.cfg.wait)) _depth = len(self.callstack) return F'[wait={self.waiting:0{_width}d}] [depth={_depth}] {self.fmt(self.previous_address)}: {msg}' def fmt(self, address: int) -> str: return F'0x{address:0{self.address_width}X}'
Class variables
var cfg
var writes
var expected_address
var address_width
var callstack
var ticks
var visits
var init_registers
var waiting
var retaddr
var stop
var previous_address
var callstack_ceiling
var last_read
Methods
def log(self, msg)
-
Expand source code Browse git
def log(self, msg: str) -> str: _width = len(str(self.cfg.wait)) _depth = len(self.callstack) return F'[wait={self.waiting:0{_width}d}] [depth={_depth}] {self.fmt(self.previous_address)}: {msg}'
def fmt(self, address)
-
Expand source code Browse git
def fmt(self, address: int) -> str: return F'0x{address:0{self.address_width}X}'
class VStackEmulatorMixin (data, base=None, arch=None, hooks=Hook.OnlyErrors, align_size=4096, alloc_size=4096)
-
The emulator base class.
Expand source code Browse git
class VStackEmulatorMixin(Emulator): def stackrange(self): return Range(self.stack_base, self.stack_base + self.stack_size) def disassemble(self, address: int, size: int): ea = address - self.base + self.exe.base try: _cs = self.disassembler() pos = self.exe.location_from_address(ea).physical.position end = pos + size return next(_cs.disasm(bytes(self.exe.data[pos:end]), address, 1)) except Exception: return None @inject_state_argument def hook_mem_read(self, _, access: int, address: int, size: int, value: int, state: EmuState): mask = (1 << (size * 8)) - 1 state.last_read = value & mask @inject_state_argument def hook_mem_write(self, _, access: int, address: int, size: int, value: int, state: EmuState): mask = (1 << (size * 8)) - 1 unsigned_value = value & mask if unsigned_value == state.expected_address: callstack = state.callstack state.retaddr = unsigned_value if not state.cfg.skip_calls: if not callstack: state.callstack_ceiling = self.sp callstack.append(unsigned_value) return else: state.retaddr = None skipped = False if ( not state.cfg.log_stack_cookies and self.sp ^ unsigned_value == state.last_read ): skipped = 'stack cookie' elif size not in bounds[state.cfg.write_range]: skipped = 'size excluded' elif ( state.callstack_ceiling > 0 and not state.cfg.log_writes_in_calls and address in range(state.callstack_ceiling - 0x200, state.callstack_ceiling) ): skipped = 'inside call' elif not state.cfg.log_stack_addresses and unsigned_value in self.stackrange(): skipped = 'stack address' elif not state.cfg.log_other_addresses and not self.exe.blob: for s in self.exe.sections(): if address in s.virtual: skipped = F'write to section {s.name}' break if ( not skipped and unsigned_value == 0 and state.writes.at(address) is not None and state.cfg.log_zero_overwrites is False ): try: if any(self.mem_read(address, size)): skipped = 'zero overwrite' except Exception: pass if not skipped: state.writes.addi(address, address + size + 1) state.waiting = 0 def info(): data = unsigned_value.to_bytes(size, self.exe.byte_order().value) ph = self.exe.pointer_size // 4 pt = self.exe.pointer_size // 8 h = data.hex().upper() t = re.sub('[^!-~]', '.', data.decode('latin1')) msg = state.log(F'{state.fmt(address)} <- {h:_<{ph}} {t:_<{pt}}') if skipped: msg = F'{msg} (ignored: {skipped})' return msg vstack.log_info(info) @inject_state_argument def hook_mem_error(self, _, access: int, address: int, size: int, value: int, state: EmuState) -> bool: try: self.map(self.align(address, down=True), self.alloc_size) except Exception: vstack.log_debug(F'error accessing memory at {state.fmt(address)}') return True def hook_code_error(self, _, state: EmuState): vstack.log_debug('aborting emulation; instruction error') self.halt() return False @inject_state_argument def hook_code_execute(self, _, address: int, size: int, state: EmuState): if _init := state.init_registers: tos = self.sp for reg in _init: self.set_register(reg, tos) _init.clear() state.ticks -= 1 state.visits[address] += 1 if state.visits[address] > state.cfg.max_visits > 0: vstack.log_info( F'aborting emulation: 0x{address:0{self.exe.pointer_size // 8}X}' F' was visited more than {state.cfg.max_visits} times.') self.halt() return False if address == state.stop or state.ticks == 0: self.halt() return False waiting = state.waiting callstack = state.callstack depth = len(callstack) state.previous_address = address retaddr = state.retaddr state.retaddr = None if address != state.expected_address: if retaddr is not None and state.cfg.skip_calls: if state.cfg.skip_calls > 1: self.rv = self.malloc(state.cfg.block_size) self.ip = retaddr self.sp = self.sp + (self.exe.pointer_size // 8) return if depth and address == callstack[-1]: depth -= 1 state.callstack.pop() if depth == 0: state.callstack_ceiling = 0 state.expected_address = address elif retaddr is not None and not state.cfg.skip_calls: # The present address was moved to the stack but we did not branch. # This is not quite accurate, of course: We could be calling the # next instruction. However, that sort of code is usually not really # a function call anyway, but rather a way to get the IP. callstack.pop() if waiting > state.cfg.wait: self.halt() return False if not depth or not state.cfg.wait_calls: state.waiting += 1 state.expected_address += size def _log(): instruction = self.disassemble(address, size) if instruction: return F'{instruction.mnemonic} {instruction.op_str}' return 'unrecognized instruction' vstack.log_debug(lambda: state.log(_log()))
Ancestors
- Emulator
- abc.ABC
- typing.Generic
Methods
def stackrange(self)
-
Expand source code Browse git
def stackrange(self): return Range(self.stack_base, self.stack_base + self.stack_size)
def disassemble(self, address, size)
-
Expand source code Browse git
def disassemble(self, address: int, size: int): ea = address - self.base + self.exe.base try: _cs = self.disassembler() pos = self.exe.location_from_address(ea).physical.position end = pos + size return next(_cs.disasm(bytes(self.exe.data[pos:end]), address, 1)) except Exception: return None
Inherited members
class vstack (*address, stop=None, base=None, arch=Arch.X32, engine=Engine.unicorn, meta_registers=False, timeout=None, patch_range=slice(5, None, None), write_range=slice(1, None, None), wait=20, wait_calls=False, skip_calls=0, stack_size=65536, stack_push=None, block_size=4096, max_visits=4096, log_writes_in_calls=False, log_stack_addresses=False, log_other_addresses=False, log_zero_overwrites=False, log_stack_cookies=False)
-
The unit emulates instructions at a given address in the input executable (PE/ELF/MachO) and extracts data patches that are written to the stack during emulation. Emulation is halted as soon as a certain number of instructions has not performed any memory writes, or when an error occurs. By default, most registers are set to the current location in the emulated stack. However, if you want to initialize certain registers differently, you can set an environment variable to the desired value.
Expand source code Browse git
class vstack(Unit): """ The unit emulates instructions at a given address in the input executable (PE/ELF/MachO) and extracts data patches that are written to the stack during emulation. Emulation is halted as soon as a certain number of instructions has not performed any memory writes, or when an error occurs. By default, most registers are set to the current location in the emulated stack. However, if you want to initialize certain registers differently, you can set an environment variable to the desired value. """ @Unit.Requires('intervaltree', 'default', 'extended') def _intervaltree(): import intervaltree return intervaltree @Unit.Requires('capstone', 'default', 'extended') def _capstone(): import capstone return capstone @Unit.Requires('unicorn', 'default', 'extended') def _unicorn(): with NoLogging(): import unicorn return unicorn @Unit.Requires('speakeasy-emulator', 'extended') def _speakeasy(): import speakeasy return speakeasy @Unit.Requires('icicle-emu', 'all') def _icicle(): import icicle return icicle def __init__( self, *address: Arg.NumSeq(metavar='start', help='Specify the (virtual) addresses of a stack string instruction sequences.'), stop: Arg.Number('-s', metavar='stop', help='Optional: Stop when reaching this address.') = None, base: Arg.Number('-b', metavar='Addr', help='Optionally specify a custom base address B.') = None, arch: Arg.Option('-a', help='Specify for blob inputs: {choices}', choices=Arch) = Arch.X32, engine: Arg.Option('-e', choices=Engine, help='The emulator engine. The default is {default}, options are: {choices}') = Engine.unicorn, meta_registers: Arg.Switch('-r', help=( 'Consume register initialization values from the chunk\'s metadata. If the value is a byte string, ' 'the data will be mapped.')) = False, timeout: Arg.Number('-t', help='Optionally stop emulating after a given number of instructions.') = None, patch_range: Arg.Bounds('-p', metavar='MIN:MAX', help='Extract only patches that are in the given range, default is {default}.') = slice(5, None), write_range: Arg.Bounds('-n', metavar='MIN:MAX', help='Log only writes whose size is in the given range, default is {default}.') = slice(1, None), wait: Arg.Number('-w', help=( 'When this many instructions did not write to memory, emulation is halted. The default is {default}.')) = 20, wait_calls: Arg.Switch('-c', group='CALL', help='Wait indefinitely when inside a function call.') = False, skip_calls: Arg.Counts('-C', group='CALL', help='Skip function calls entirely. Use twice to treat each call as allocating memory.') = 0, stack_size: Arg.Number('-S', help='Optionally specify the stack size. The default is 0x{default:X}.') = 0x10000, stack_push: Arg('-u', action='append', type=str, metavar='REG', help='Push the value of a register to the stack before beginning emulation; implies -r.') = None, block_size: Arg.Number('-B', help='Standard memory block size for the emulator, 0x{default:X} by default.') = 0x1000, max_visits: Arg.Number('-V', help='Maximum number of times a code address is visited. Default is {default}.') = 0x1000, log_writes_in_calls: Arg.Switch('-W', help='Log writes of values that occur in functions calls.') = False, log_stack_addresses: Arg.Switch('-X', help='Log writes of values that are stack addresses.') = False, log_other_addresses: Arg.Switch('-Y', help='Log writes of values that are addresses to mapped segments.') = False, log_zero_overwrites: Arg.Switch('-Z', help='Log writes of zeros to memory that contained nonzero values.') = False, log_stack_cookies : Arg.Switch('-E', help='Log writes that look like stack cookies.') = False, ): super().__init__( address=address, stop=stop, base=base, arch=Arg.AsOption(arch, Arch), engine=Arg.AsOption(engine, Engine), meta_registers=meta_registers, timeout=timeout, patch_range=patch_range, write_range=write_range, wait=wait, stack_size=stack_size, stack_push=stack_push, wait_calls=wait_calls, skip_calls=skip_calls, block_size=block_size, max_visits=max_visits, log_writes_in_calls=log_writes_in_calls, log_stack_addresses=log_stack_addresses, log_other_addresses=log_other_addresses, log_zero_overwrites=log_zero_overwrites, log_stack_cookies=log_stack_cookies ) def process(self, data): meta = metavars(data) args = self.args engine: Engine = args.engine self.log_debug(F'attempting to use {engine.name}') getattr(self, F'_{engine.name}') class Emu(engine.value, VStackEmulatorMixin): pass emu = Emu( data, args.base, args.arch, Hook.Everything, args.block_size, args.stack_size, ) cfg = EmuConfig( args.wait_calls, args.skip_calls, args.write_range, args.wait, args.block_size, args.stack_size, args.max_visits, args.log_stack_cookies, args.log_writes_in_calls, args.log_stack_addresses, args.log_other_addresses, args.log_zero_overwrites, ) register_values = {} if args.meta_registers or args.stack_push: for var, value in list(meta.items()): try: register = emu.lookup_register(var) except LookupError: continue meta.discard(var) register_values[register] = var, value def parse_address(a: Union[int, bytes]): if isinstance(a, int): return a a = a.decode(self.codec) if m := re.fullmatch('(?i)([A-F0-9]+)H?', a): return int(m[1], 16) try: return PythonExpression.Evaluate(a, meta) except ParserVariableMissing: pass symbols = list(emu.exe.symbols()) for filter in [ lambda s: s.get_name().casefold() == a.casefold(), lambda s: s.name == a, lambda s: s.code, lambda s: s.exported ]: symbols = [s for s in symbols if filter(s)] if len(symbols) == 1: return symbols[0].address if len(symbols) > 1: raise RuntimeError(F'there are {len(symbols)} exported function symbol named "{a}", please specify the address') if not symbols: raise LookupError(F'no symbol with name "{a}" was found') addresses = [parse_address(a) for a in args.address] if not addresses: for symbol in emu.exe.symbols(): if symbol.name is None: addresses.append(symbol.address) break for address in addresses: tree = self._intervaltree.IntervalTree() state = EmuState(cfg, tree, address, emu.exe.pointer_size // 4, stop=args.stop) emu.reset(state) for reg in emu.general_purpose_registers(): if reg not in register_values: state.init_registers.append(reg) for reg, (var, value) in register_values.items(): if isinstance(value, int): self.log_info(F'setting {var} to integer value 0x{value:X}') emu.set_register(reg, value) continue if isinstance(value, str): value = value.encode() if isbuffer(value): base = emu.malloc(len(value)) emu.mem_write(base, bytes(value)) emu.set_register(reg, base) self.log_info(F'setting {var} to mapped buffer of size 0x{size:X}') continue _tn = value.__class__.__name__ self.log_warn(F'canot interpret value of type {_tn} for register {var}') if push := args.stack_push: for reg in push: emu.push_register(reg) timeout = args.timeout if timeout is not None: self.log_info(F'setting timeout of {timeout} steps') state.ticks = timeout try: emu.emulate(address, args.stop) except EmulationError: pass tree.merge_overlaps() it: Iterator[Interval] = iter(tree) for interval in it: size = interval.end - interval.begin - 1 if size not in bounds[args.patch_range]: continue try: patch = emu.mem_read(interval.begin, size) except Exception as error: width = emu.exe.pointer_size // 4 self.log_info(F'error reading 0x{interval.begin:0{width}X}:{size}: {error!s}') continue if not any(patch): continue self.log_info(F'memory patch at {state.fmt(interval.begin)} of size {size}') yield patch
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Inherited members