Module refinery.units.pattern.carve_pe

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pefile import PE, PEFormatError
from struct import unpack

from refinery.units.formats import Arg, PathExtractorUnit, UnpackResult
from refinery.units.formats.pe import get_pe_size
from refinery.units.formats.pe.pemeta import pemeta


class carve_pe(PathExtractorUnit):
    """
    Extracts anything from the input data that looks like a Portable
    Executable (PE) file.
    """
    def __init__(
        self, *paths, list=False, join_path=False, drop_path=False, path=b'name',
        recursive: Arg.Switch('-r', help='Extract PE files that are contained in already extracted PEs.') = False,
        keep_root: Arg.Switch('-k', help='If the input chunk is itself a PE, include it as an output chunk.') = False,
        memdump  : Arg.Switch('-m', help='Use the virtual memory layout of a PE file to calculate its size.') = False,
        fileinfo : Arg.Switch('-f', help='Use the PE meta information to deduce a file name meta variable.') = False
    ):
        super().__init__(
            *paths,
            list=list,
            join_path=join_path,
            drop_path=drop_path,
            path=path,
            recursive=recursive,
            keep_root=keep_root,
            memdump=memdump,
            fileinfo=fileinfo,
        )

    def unpack(self, data):
        cursor = 0
        mv = memoryview(data)

        while True:
            offset = data.find(B'MZ', cursor)
            if offset < cursor: break
            cursor = offset + 2
            ntoffset = mv[offset + 0x3C:offset + 0x3E]
            if len(ntoffset) < 2:
                return
            ntoffset, = unpack('H', ntoffset)
            if mv[offset + ntoffset:offset + ntoffset + 2] != B'PE':
                self.log_debug(F'invalid NT header signature for candidate at 0x{offset:08X}')
                continue
            try:
                pe = PE(data=data[offset:], fast_load=True)
            except PEFormatError as err:
                self.log_debug(F'parsing of PE header at 0x{offset:08X} failed:', err)
                continue

            pesize = get_pe_size(pe, memdump=self.args.memdump)
            pedata = mv[offset:offset + pesize]
            info = {}
            if self.args.fileinfo:
                pe_meta_parser = pemeta()
                try:
                    info = pe_meta_parser.parse_version(pe) or {}
                except Exception as error:
                    self.log_warn(F'Unable to obtain file information: {error!s}')
                try:
                    info.update(pe_meta_parser.parse_header(pe) or {})
                except Exception:
                    pass
            try:
                path = info['OriginalFilename']
            except KeyError:
                try:
                    path = info['ExportName']
                except KeyError:
                    extension = 'exe' if pe.is_exe() else 'dll' if pe.is_dll() else 'sys'
                    path = F'carve-0x{offset:08X}.{extension}'

            if offset > 0 or self.args.keep_root:
                yield UnpackResult(path, pedata, offset=offset)
                self.log_info(F'extracted PE file of size 0x{pesize:08X} from 0x{offset:08X}')
            else:
                self.log_info(F'ignored root file of size 0x{pesize:08X} from 0x{offset:08X}')
                continue

            if not offset or self.args.recursive:
                cursor += pe.OPTIONAL_HEADER.SizeOfHeaders
            else:
                cursor += pesize - 2

Classes

class carve_pe (*paths, list=False, join_path=False, drop_path=False, path=b'name', recursive=False, keep_root=False, memdump=False, fileinfo=False)

Extracts anything from the input data that looks like a Portable Executable (PE) file.

Expand source code Browse git
class carve_pe(PathExtractorUnit):
    """
    Extracts anything from the input data that looks like a Portable
    Executable (PE) file.
    """
    def __init__(
        self, *paths, list=False, join_path=False, drop_path=False, path=b'name',
        recursive: Arg.Switch('-r', help='Extract PE files that are contained in already extracted PEs.') = False,
        keep_root: Arg.Switch('-k', help='If the input chunk is itself a PE, include it as an output chunk.') = False,
        memdump  : Arg.Switch('-m', help='Use the virtual memory layout of a PE file to calculate its size.') = False,
        fileinfo : Arg.Switch('-f', help='Use the PE meta information to deduce a file name meta variable.') = False
    ):
        super().__init__(
            *paths,
            list=list,
            join_path=join_path,
            drop_path=drop_path,
            path=path,
            recursive=recursive,
            keep_root=keep_root,
            memdump=memdump,
            fileinfo=fileinfo,
        )

    def unpack(self, data):
        cursor = 0
        mv = memoryview(data)

        while True:
            offset = data.find(B'MZ', cursor)
            if offset < cursor: break
            cursor = offset + 2
            ntoffset = mv[offset + 0x3C:offset + 0x3E]
            if len(ntoffset) < 2:
                return
            ntoffset, = unpack('H', ntoffset)
            if mv[offset + ntoffset:offset + ntoffset + 2] != B'PE':
                self.log_debug(F'invalid NT header signature for candidate at 0x{offset:08X}')
                continue
            try:
                pe = PE(data=data[offset:], fast_load=True)
            except PEFormatError as err:
                self.log_debug(F'parsing of PE header at 0x{offset:08X} failed:', err)
                continue

            pesize = get_pe_size(pe, memdump=self.args.memdump)
            pedata = mv[offset:offset + pesize]
            info = {}
            if self.args.fileinfo:
                pe_meta_parser = pemeta()
                try:
                    info = pe_meta_parser.parse_version(pe) or {}
                except Exception as error:
                    self.log_warn(F'Unable to obtain file information: {error!s}')
                try:
                    info.update(pe_meta_parser.parse_header(pe) or {})
                except Exception:
                    pass
            try:
                path = info['OriginalFilename']
            except KeyError:
                try:
                    path = info['ExportName']
                except KeyError:
                    extension = 'exe' if pe.is_exe() else 'dll' if pe.is_dll() else 'sys'
                    path = F'carve-0x{offset:08X}.{extension}'

            if offset > 0 or self.args.keep_root:
                yield UnpackResult(path, pedata, offset=offset)
                self.log_info(F'extracted PE file of size 0x{pesize:08X} from 0x{offset:08X}')
            else:
                self.log_info(F'ignored root file of size 0x{pesize:08X} from 0x{offset:08X}')
                continue

            if not offset or self.args.recursive:
                cursor += pe.OPTIONAL_HEADER.SizeOfHeaders
            else:
                cursor += pesize - 2

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data):
    cursor = 0
    mv = memoryview(data)

    while True:
        offset = data.find(B'MZ', cursor)
        if offset < cursor: break
        cursor = offset + 2
        ntoffset = mv[offset + 0x3C:offset + 0x3E]
        if len(ntoffset) < 2:
            return
        ntoffset, = unpack('H', ntoffset)
        if mv[offset + ntoffset:offset + ntoffset + 2] != B'PE':
            self.log_debug(F'invalid NT header signature for candidate at 0x{offset:08X}')
            continue
        try:
            pe = PE(data=data[offset:], fast_load=True)
        except PEFormatError as err:
            self.log_debug(F'parsing of PE header at 0x{offset:08X} failed:', err)
            continue

        pesize = get_pe_size(pe, memdump=self.args.memdump)
        pedata = mv[offset:offset + pesize]
        info = {}
        if self.args.fileinfo:
            pe_meta_parser = pemeta()
            try:
                info = pe_meta_parser.parse_version(pe) or {}
            except Exception as error:
                self.log_warn(F'Unable to obtain file information: {error!s}')
            try:
                info.update(pe_meta_parser.parse_header(pe) or {})
            except Exception:
                pass
        try:
            path = info['OriginalFilename']
        except KeyError:
            try:
                path = info['ExportName']
            except KeyError:
                extension = 'exe' if pe.is_exe() else 'dll' if pe.is_dll() else 'sys'
                path = F'carve-0x{offset:08X}.{extension}'

        if offset > 0 or self.args.keep_root:
            yield UnpackResult(path, pedata, offset=offset)
            self.log_info(F'extracted PE file of size 0x{pesize:08X} from 0x{offset:08X}')
        else:
            self.log_info(F'ignored root file of size 0x{pesize:08X} from 0x{offset:08X}')
            continue

        if not offset or self.args.recursive:
            cursor += pe.OPTIONAL_HEADER.SizeOfHeaders
        else:
            cursor += pesize - 2

Inherited members