Module refinery.units.formats.exe.vmemref

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING, Container

from refinery.units import Arg, Unit
from refinery.lib.executable import Executable, ET, CompartmentNotFound
from refinery.lib.structures import MemoryFile
from refinery.lib.tools import NoLogging


if TYPE_CHECKING:
    from angr.project import Project
    from angr.analyses.cfg.cfg_emulated import CFGEmulated
    from angr.knowledge_plugins.functions.function import Function
    from cle.memory import Clemory


class vmemref(Unit):
    """
    The unit expects an executable as input (PE/ELF/MachO) and scans a function at a given virtual
    address for memory references. For each memory reference, the unit looks up the corresponding
    section and file offset for the reference. It then returns all data from that section starting
    at the given offset.
    """

    @Unit.Requires('angr', 'all')
    def _angr():
        import angr
        import angr.project
        import angr.engines
        return angr

    def _memory_references(
        self,
        function: Function,
        memory: Clemory,
        functions: Container[int],
        pointer_size: int,
        max_dereference: int = 1
    ):
        pointer_size //= 8
        references = []
        code = set()
        for block in function.blocks:
            code.update(block.instruction_addrs)
        try:
            constants = function.code_constants
        except Exception:
            pass
        else:
            def is_valid_data_address(address):
                if not isinstance(address, int):
                    return False
                if address not in memory:
                    return False
                if address in code:
                    return False
                if address in functions:
                    return False
                return True

            def dereference(address):
                data = bytes(memory[k] for k in range(address, address + pointer_size))
                return int.from_bytes(data, 'little')

            for address in constants:
                try:
                    address = int(address)
                except Exception:
                    continue
                times_dereferenced = 0
                while is_valid_data_address(address) and address not in references:
                    references.append(address)
                    times_dereferenced += 1
                    if max_dereference and max_dereference > 0 and times_dereferenced > max_dereference:
                        break
                    try:
                        address = dereference(address)
                    except Exception:
                        break

        return references

    def __init__(
        self,
        address: Arg.Number(metavar='ADDR', help='Specify the address of a function to scan.'),
        base: Arg.Number('-b', metavar='ADDR', help='Optionally specify a custom base address B.') = None,
    ):
        super().__init__(address=address, base=base)

    def process(self, data):
        address = self.args.address
        executable = Executable.Load(data, self.args.base)
        code = executable.location_from_address(address).virtual.box

        self.log_info(R'loading project into angr')
        with NoLogging():
            project: Project = self._angr.Project(MemoryFile(data), load_options={'auto_load_libs': False})

        self.log_info(F'scanning function at 0x{address:X}')
        with NoLogging():
            cfg: CFGEmulated = project.analyses.CFGEmulated(
                call_depth=0,
                starts=[address],
                enable_symbolic_back_traversal=True,
                address_whitelist=code.range(),
            )

        function = cfg.functions[address]
        code_addresses = cfg.functions

        if executable.type is ET.PE:
            code_addresses = code

        self.log_info(R'extracting memory references from lifted function')
        for ref in self._memory_references(
            function,
            project.loader.memory,
            code_addresses,
            executable.pointer_size
        ):
            try:
                yield executable[ref:]
            except CompartmentNotFound:
                self.log_info(F'memory reference could not be resolved: 0x{ref:0{executable.pointer_size // 4}X}')

Classes

class vmemref (address, base=None)

The unit expects an executable as input (PE/ELF/MachO) and scans a function at a given virtual address for memory references. For each memory reference, the unit looks up the corresponding section and file offset for the reference. It then returns all data from that section starting at the given offset.

Expand source code Browse git
class vmemref(Unit):
    """
    The unit expects an executable as input (PE/ELF/MachO) and scans a function at a given virtual
    address for memory references. For each memory reference, the unit looks up the corresponding
    section and file offset for the reference. It then returns all data from that section starting
    at the given offset.
    """

    @Unit.Requires('angr', 'all')
    def _angr():
        import angr
        import angr.project
        import angr.engines
        return angr

    def _memory_references(
        self,
        function: Function,
        memory: Clemory,
        functions: Container[int],
        pointer_size: int,
        max_dereference: int = 1
    ):
        pointer_size //= 8
        references = []
        code = set()
        for block in function.blocks:
            code.update(block.instruction_addrs)
        try:
            constants = function.code_constants
        except Exception:
            pass
        else:
            def is_valid_data_address(address):
                if not isinstance(address, int):
                    return False
                if address not in memory:
                    return False
                if address in code:
                    return False
                if address in functions:
                    return False
                return True

            def dereference(address):
                data = bytes(memory[k] for k in range(address, address + pointer_size))
                return int.from_bytes(data, 'little')

            for address in constants:
                try:
                    address = int(address)
                except Exception:
                    continue
                times_dereferenced = 0
                while is_valid_data_address(address) and address not in references:
                    references.append(address)
                    times_dereferenced += 1
                    if max_dereference and max_dereference > 0 and times_dereferenced > max_dereference:
                        break
                    try:
                        address = dereference(address)
                    except Exception:
                        break

        return references

    def __init__(
        self,
        address: Arg.Number(metavar='ADDR', help='Specify the address of a function to scan.'),
        base: Arg.Number('-b', metavar='ADDR', help='Optionally specify a custom base address B.') = None,
    ):
        super().__init__(address=address, base=base)

    def process(self, data):
        address = self.args.address
        executable = Executable.Load(data, self.args.base)
        code = executable.location_from_address(address).virtual.box

        self.log_info(R'loading project into angr')
        with NoLogging():
            project: Project = self._angr.Project(MemoryFile(data), load_options={'auto_load_libs': False})

        self.log_info(F'scanning function at 0x{address:X}')
        with NoLogging():
            cfg: CFGEmulated = project.analyses.CFGEmulated(
                call_depth=0,
                starts=[address],
                enable_symbolic_back_traversal=True,
                address_whitelist=code.range(),
            )

        function = cfg.functions[address]
        code_addresses = cfg.functions

        if executable.type is ET.PE:
            code_addresses = code

        self.log_info(R'extracting memory references from lifted function')
        for ref in self._memory_references(
            function,
            project.loader.memory,
            code_addresses,
            executable.pointer_size
        ):
            try:
                yield executable[ref:]
            except CompartmentNotFound:
                self.log_info(F'memory reference could not be resolved: 0x{ref:0{executable.pointer_size // 4}X}')

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Inherited members