Module refinery.lib.emulator.ic
Implements refinery.lib.emulator.interface.RawMetalEmulator for the icicle backend.
Expand source code Browse git
"""
Implements `refinery.lib.emulator.interface.RawMetalEmulator` for the icicle backend.
"""
from __future__ import annotations
from functools import partial
from typing import TYPE_CHECKING, TypeVar
from refinery.lib.emulator.abstract import EmulationError, Hook, RawMetalEmulator, Register
from refinery.lib.executable import Arch
from refinery.lib.shared import icicle as ic
if TYPE_CHECKING:
from icicle import Icicle as Ic
else:
class Ic:
pass
_T = TypeVar('_T')
class IcicleEmulator(RawMetalEmulator[Ic, str, _T]):
"""
An Icicle-based emulator. Icicle is a more recent emulator engine and not yet as mature as
Unicorn. There are some compelling arguments for its robustness, but with the current
interface it is completely lacking any memory write hook support, which makes it difficult
to use for most of our applications. See also the [Icicle paper][ICE].
[ICE]: https://arxiv.org/pdf/2301.13346
"""
icicle: Ic
def _init(self):
super()._init()
self._single_step = False
def _reset(self):
super()._reset()
exe = self.exe
try:
arch = {
Arch.X32: 'i686',
Arch.X64: 'x86_64',
}[exe.arch()]
except KeyError:
arch = None
if arch not in ic.architectures():
raise NotImplementedError(F'Icicle cannot handle executables of arch {exe.arch().name}')
self.icicle = ice = ic.Icicle(arch)
self.regmap = {reg.casefold(): val[1] for reg, val in ice.reg_list().items()}
self._map_segments()
self._map_stack_and_heap()
if self.hooked(Hook.ApiCall):
self._install_api_trampoline()
def _enable_single_step(self):
self._single_step = True
def _disable_single_step(self):
self._single_step = False
def _emulate(self, start: int, end: int | None = None):
RS = ic.RunStatus
MP = ic.MemoryProtection
ice = self.icicle
code_hooked = self.hooked(Hook.CodeExecute)
apis_hooked = self.hooked(Hook.ApiCall)
mm_e_hooked = self.hooked(Hook.MemoryError)
mm_w_hooked = self.hooked(Hook.MemoryWrite)
mm_r_hooked = self.hooked(Hook.MemoryRead)
halt = self._single_step
dasm = self.exe.disassembler()
dasm.detail = True
if code_hooked or halt:
step = partial(ice.step, 1)
elif end is not None:
step = partial(ice.run_until, end)
else:
step = ice.run
self.ip = ip = start
mprotect: list[tuple[int, int]] = []
cb_write = None
retrying = 0
while True:
insn = None
self.ip = ip
if (code_hooked or apis_hooked) and not retrying:
insn = next(dasm.disasm(self.mem_read(ip, 20), 1))
args = (ice, ip, insn.size, self.state)
if apis_hooked:
self._hook_api_call_check(*args)
if code_hooked:
self.hook_code_execute(*args)
if mprotect:
ice.mem_protect(*mprotect[-1], MP.ExecuteReadWrite)
if (status := step()) == RS.InstructionLimit:
for p in mprotect:
ice.mem_protect(*p, MP.ExecuteOnly)
if cb_write:
addr, size = cb_write
value = self.mem_read_int(addr, size)
if self.hook_mem_write(ice, 0, addr, size, value, self.state) is False:
break
cb_write = None
mprotect.clear()
retrying = 0
ip = self.ip
elif status in (
RS.Breakpoint,
RS.Halt,
RS.Killed,
):
break
elif status == RS.UnhandledException:
insn = insn or next(dasm.disasm(self.mem_read(ip, 20), 1))
size = max((op.size for op in insn.operands), default=insn.addr_size)
EC = ic.ExceptionCode
ea = ice.exception_value
ec = ice.exception_code
if ec in (EC.ReadUnmapped, EC.WriteUnmapped) and mm_e_hooked:
if self.hook_mem_error(ice, 0, ea, size, 0, self.state) is not False:
retrying += 1
continue
elif ec == EC.ReadPerm and mm_r_hooked:
value = self.mem_read_int(ea, size)
if self.hook_mem_read(ice, 0, ea, size, value, self.state) is not False:
mprotect.append((ea, size))
retrying += 1
continue
elif ec == EC.WritePerm and mm_w_hooked:
cb_write = (ea, size)
mprotect.append(cb_write)
retrying += 1
continue
else:
raise EmulationError(ec.name)
elif status != RS.Running:
raise EmulationError(status.name)
if halt:
break
def halt(self):
self.icicle.add_breakpoint(self.ip)
def _lookup_register(self, var: str) -> Register[str]:
name = var.casefold()
size = self.regmap[name]
return Register(name, name, size)
def _map(self, address: int, size: int):
MP = ic.MemoryProtection
if self.hooks & Hook.MemoryAccess:
perm = MP.ExecuteOnly
else:
perm = MP.ExecuteReadWrite
return self.icicle.mem_map(address, size, perm)
def _set_register(self, register: str, v: int) -> None:
return self.icicle.reg_write(register, v)
def _get_register(self, register: str) -> int:
return self.icicle.reg_read(register)
def mem_write(self, address: int, data: bytes):
return self.icicle.mem_write(address, data)
def mem_read(self, address: int, size: int):
return self.icicle.mem_read(address, size)
Classes
class Ic-
Expand source code Browse git
class Ic: pass class IcicleEmulator (data, base=None, arch=None, hooks=18, align_size=4096, alloc_size=4096)-
An Icicle-based emulator. Icicle is a more recent emulator engine and not yet as mature as Unicorn. There are some compelling arguments for its robustness, but with the current interface it is completely lacking any memory write hook support, which makes it difficult to use for most of our applications. See also the Icicle paper.
Expand source code Browse git
class IcicleEmulator(RawMetalEmulator[Ic, str, _T]): """ An Icicle-based emulator. Icicle is a more recent emulator engine and not yet as mature as Unicorn. There are some compelling arguments for its robustness, but with the current interface it is completely lacking any memory write hook support, which makes it difficult to use for most of our applications. See also the [Icicle paper][ICE]. [ICE]: https://arxiv.org/pdf/2301.13346 """ icicle: Ic def _init(self): super()._init() self._single_step = False def _reset(self): super()._reset() exe = self.exe try: arch = { Arch.X32: 'i686', Arch.X64: 'x86_64', }[exe.arch()] except KeyError: arch = None if arch not in ic.architectures(): raise NotImplementedError(F'Icicle cannot handle executables of arch {exe.arch().name}') self.icicle = ice = ic.Icicle(arch) self.regmap = {reg.casefold(): val[1] for reg, val in ice.reg_list().items()} self._map_segments() self._map_stack_and_heap() if self.hooked(Hook.ApiCall): self._install_api_trampoline() def _enable_single_step(self): self._single_step = True def _disable_single_step(self): self._single_step = False def _emulate(self, start: int, end: int | None = None): RS = ic.RunStatus MP = ic.MemoryProtection ice = self.icicle code_hooked = self.hooked(Hook.CodeExecute) apis_hooked = self.hooked(Hook.ApiCall) mm_e_hooked = self.hooked(Hook.MemoryError) mm_w_hooked = self.hooked(Hook.MemoryWrite) mm_r_hooked = self.hooked(Hook.MemoryRead) halt = self._single_step dasm = self.exe.disassembler() dasm.detail = True if code_hooked or halt: step = partial(ice.step, 1) elif end is not None: step = partial(ice.run_until, end) else: step = ice.run self.ip = ip = start mprotect: list[tuple[int, int]] = [] cb_write = None retrying = 0 while True: insn = None self.ip = ip if (code_hooked or apis_hooked) and not retrying: insn = next(dasm.disasm(self.mem_read(ip, 20), 1)) args = (ice, ip, insn.size, self.state) if apis_hooked: self._hook_api_call_check(*args) if code_hooked: self.hook_code_execute(*args) if mprotect: ice.mem_protect(*mprotect[-1], MP.ExecuteReadWrite) if (status := step()) == RS.InstructionLimit: for p in mprotect: ice.mem_protect(*p, MP.ExecuteOnly) if cb_write: addr, size = cb_write value = self.mem_read_int(addr, size) if self.hook_mem_write(ice, 0, addr, size, value, self.state) is False: break cb_write = None mprotect.clear() retrying = 0 ip = self.ip elif status in ( RS.Breakpoint, RS.Halt, RS.Killed, ): break elif status == RS.UnhandledException: insn = insn or next(dasm.disasm(self.mem_read(ip, 20), 1)) size = max((op.size for op in insn.operands), default=insn.addr_size) EC = ic.ExceptionCode ea = ice.exception_value ec = ice.exception_code if ec in (EC.ReadUnmapped, EC.WriteUnmapped) and mm_e_hooked: if self.hook_mem_error(ice, 0, ea, size, 0, self.state) is not False: retrying += 1 continue elif ec == EC.ReadPerm and mm_r_hooked: value = self.mem_read_int(ea, size) if self.hook_mem_read(ice, 0, ea, size, value, self.state) is not False: mprotect.append((ea, size)) retrying += 1 continue elif ec == EC.WritePerm and mm_w_hooked: cb_write = (ea, size) mprotect.append(cb_write) retrying += 1 continue else: raise EmulationError(ec.name) elif status != RS.Running: raise EmulationError(status.name) if halt: break def halt(self): self.icicle.add_breakpoint(self.ip) def _lookup_register(self, var: str) -> Register[str]: name = var.casefold() size = self.regmap[name] return Register(name, name, size) def _map(self, address: int, size: int): MP = ic.MemoryProtection if self.hooks & Hook.MemoryAccess: perm = MP.ExecuteOnly else: perm = MP.ExecuteReadWrite return self.icicle.mem_map(address, size, perm) def _set_register(self, register: str, v: int) -> None: return self.icicle.reg_write(register, v) def _get_register(self, register: str) -> int: return self.icicle.reg_read(register) def mem_write(self, address: int, data: bytes): return self.icicle.mem_write(address, data) def mem_read(self, address: int, size: int): return self.icicle.mem_read(address, size)Ancestors
- RawMetalEmulator
- Emulator
- abc.ABC
- typing.Generic
Class variables
var icicle
Inherited members
RawMetalEmulator:alignbase_emu_to_exebase_exe_to_emudisassemble_instructionemulategeneral_purpose_registersget_registerhalthook_code_errorhook_code_executehook_mem_errorhook_mem_readhook_mem_writehookedipis_mappedlookup_registermallocmapmeasure_register_sizemem_readmem_read_intmem_writemem_write_intmorestackpoppushpush_registerresetrvset_registerspstep