Module refinery.units.pattern.carve_pe

Expand source code Browse git
from __future__ import annotations

from struct import unpack

from refinery.lib import lief
from refinery.lib.mime import FileMagicInfo as magic
from refinery.lib.types import Param
from refinery.units.formats import Arg, PathExtractorUnit, UnpackResult
from refinery.units.formats.pe import get_pe_size
from refinery.units.formats.pe.pemeta import pemeta


class carve_pe(PathExtractorUnit):
    """
    Extracts anything from the input data that looks like a Portable
    Executable (PE) file.
    """
    def __init__(
        self, *paths, list=False, join_path=False, drop_path=False, path=b'name',
        recursive: Param[bool, Arg.Switch('-r', help='Extract PE files that are contained in already extracted PEs.')] = False,
        keep_root: Param[bool, Arg.Switch('-k', help='If the input chunk is itself a PE, include it as an output chunk.')] = False,
        memdump: Param[bool, Arg.Switch('-m', help='Use the virtual memory layout of a PE file to calculate its size.')] = False,
        fileinfo: Param[bool, Arg.Switch('-f', help='Use the PE meta information to deduce a file name meta variable.')] = False
    ):
        super().__init__(
            *paths,
            list=list,
            join_path=join_path,
            drop_path=drop_path,
            path=path,
            recursive=recursive,
            keep_root=keep_root,
            memdump=memdump,
            fileinfo=fileinfo,
        )

    def unpack(self, data):
        cursor = 0
        mv = memoryview(data)

        while True:
            offset = data.find(B'MZ', cursor)
            if offset < cursor: break
            cursor = offset + 2
            ntoffset = mv[offset + 0x3C:offset + 0x3E]
            if len(ntoffset) < 2:
                return
            ntoffset, = unpack('H', ntoffset)
            if mv[offset + ntoffset:offset + ntoffset + 2] != B'PE':
                self.log_debug(F'invalid NT header signature for candidate at 0x{offset:08X}')
                continue
            try:
                pe = lief.load_pe_fast(mv[offset:])
            except Exception as err:
                self.log_debug(F'parsing of PE header at 0x{offset:08X} failed:', err)
                continue

            pesize = get_pe_size(pe, memdump=self.args.memdump)
            pedata = mv[offset:offset + pesize]
            info = {}
            if self.args.fileinfo:
                pe_meta_parser = pemeta()
                try:
                    info = pe_meta_parser.parse_version(pe) or {}
                except Exception as error:
                    self.log_warn(F'Unable to obtain file information: {error!s}')
                try:
                    info.update(pe_meta_parser.parse_header(pe) or {})
                except Exception:
                    pass
            try:
                path = info['OriginalFilename']
            except KeyError:
                try:
                    path = info['ExportName']
                except KeyError:
                    path = F'carve-0x{offset:08X}.{magic(pedata).extension}'

            if offset > 0 or self.args.keep_root:
                yield UnpackResult(path, pedata, offset=offset)
                self.log_info(F'extracted PE file of size 0x{pesize:08X} from 0x{offset:08X}')
            else:
                self.log_info(F'ignored root file of size 0x{pesize:08X} from 0x{offset:08X}')
                continue

            if not offset or self.args.recursive:
                cursor += pe.optional_header.sizeof_headers
            else:
                cursor += pesize - 2

Classes

class carve_pe (*paths, list=False, join_path=False, drop_path=False, path=b'name', recursive=False, keep_root=False, memdump=False, fileinfo=False)

Extracts anything from the input data that looks like a Portable Executable (PE) file.

Expand source code Browse git
class carve_pe(PathExtractorUnit):
    """
    Extracts anything from the input data that looks like a Portable
    Executable (PE) file.
    """
    def __init__(
        self, *paths, list=False, join_path=False, drop_path=False, path=b'name',
        recursive: Param[bool, Arg.Switch('-r', help='Extract PE files that are contained in already extracted PEs.')] = False,
        keep_root: Param[bool, Arg.Switch('-k', help='If the input chunk is itself a PE, include it as an output chunk.')] = False,
        memdump: Param[bool, Arg.Switch('-m', help='Use the virtual memory layout of a PE file to calculate its size.')] = False,
        fileinfo: Param[bool, Arg.Switch('-f', help='Use the PE meta information to deduce a file name meta variable.')] = False
    ):
        super().__init__(
            *paths,
            list=list,
            join_path=join_path,
            drop_path=drop_path,
            path=path,
            recursive=recursive,
            keep_root=keep_root,
            memdump=memdump,
            fileinfo=fileinfo,
        )

    def unpack(self, data):
        cursor = 0
        mv = memoryview(data)

        while True:
            offset = data.find(B'MZ', cursor)
            if offset < cursor: break
            cursor = offset + 2
            ntoffset = mv[offset + 0x3C:offset + 0x3E]
            if len(ntoffset) < 2:
                return
            ntoffset, = unpack('H', ntoffset)
            if mv[offset + ntoffset:offset + ntoffset + 2] != B'PE':
                self.log_debug(F'invalid NT header signature for candidate at 0x{offset:08X}')
                continue
            try:
                pe = lief.load_pe_fast(mv[offset:])
            except Exception as err:
                self.log_debug(F'parsing of PE header at 0x{offset:08X} failed:', err)
                continue

            pesize = get_pe_size(pe, memdump=self.args.memdump)
            pedata = mv[offset:offset + pesize]
            info = {}
            if self.args.fileinfo:
                pe_meta_parser = pemeta()
                try:
                    info = pe_meta_parser.parse_version(pe) or {}
                except Exception as error:
                    self.log_warn(F'Unable to obtain file information: {error!s}')
                try:
                    info.update(pe_meta_parser.parse_header(pe) or {})
                except Exception:
                    pass
            try:
                path = info['OriginalFilename']
            except KeyError:
                try:
                    path = info['ExportName']
                except KeyError:
                    path = F'carve-0x{offset:08X}.{magic(pedata).extension}'

            if offset > 0 or self.args.keep_root:
                yield UnpackResult(path, pedata, offset=offset)
                self.log_info(F'extracted PE file of size 0x{pesize:08X} from 0x{offset:08X}')
            else:
                self.log_info(F'ignored root file of size 0x{pesize:08X} from 0x{offset:08X}')
                continue

            if not offset or self.args.recursive:
                cursor += pe.optional_header.sizeof_headers
            else:
                cursor += pesize - 2

Ancestors

Subclasses

Class variables

var required_dependencies
var optional_dependencies
var console
var reverse

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data):
    cursor = 0
    mv = memoryview(data)

    while True:
        offset = data.find(B'MZ', cursor)
        if offset < cursor: break
        cursor = offset + 2
        ntoffset = mv[offset + 0x3C:offset + 0x3E]
        if len(ntoffset) < 2:
            return
        ntoffset, = unpack('H', ntoffset)
        if mv[offset + ntoffset:offset + ntoffset + 2] != B'PE':
            self.log_debug(F'invalid NT header signature for candidate at 0x{offset:08X}')
            continue
        try:
            pe = lief.load_pe_fast(mv[offset:])
        except Exception as err:
            self.log_debug(F'parsing of PE header at 0x{offset:08X} failed:', err)
            continue

        pesize = get_pe_size(pe, memdump=self.args.memdump)
        pedata = mv[offset:offset + pesize]
        info = {}
        if self.args.fileinfo:
            pe_meta_parser = pemeta()
            try:
                info = pe_meta_parser.parse_version(pe) or {}
            except Exception as error:
                self.log_warn(F'Unable to obtain file information: {error!s}')
            try:
                info.update(pe_meta_parser.parse_header(pe) or {})
            except Exception:
                pass
        try:
            path = info['OriginalFilename']
        except KeyError:
            try:
                path = info['ExportName']
            except KeyError:
                path = F'carve-0x{offset:08X}.{magic(pedata).extension}'

        if offset > 0 or self.args.keep_root:
            yield UnpackResult(path, pedata, offset=offset)
            self.log_info(F'extracted PE file of size 0x{pesize:08X} from 0x{offset:08X}')
        else:
            self.log_info(F'ignored root file of size 0x{pesize:08X} from 0x{offset:08X}')
            continue

        if not offset or self.args.recursive:
            cursor += pe.optional_header.sizeof_headers
        else:
            cursor += pesize - 2

Inherited members