Module refinery.units.formats.pe.pemeta

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import itertools
import json
import re

from contextlib import suppress
from importlib import resources
from datetime import timedelta
from dataclasses import dataclass
from enum import Enum

from pefile import (
    DEBUG_TYPE,
    DIRECTORY_ENTRY,
    image_characteristics,
    MACHINE_TYPE,
    SUBSYSTEM_TYPE,
    PE,
)

from refinery.lib.dotnet.header import DotNetHeader
from refinery.units import Arg, Unit
from refinery.units.sinks.ppjson import ppjson
from refinery.units.formats.pe import get_pe_size
from refinery.lib.tools import date_from_timestamp

from refinery import data


class VIT(str, Enum):
    ERR = 'unknown'
    OBJ = 'object file from C'
    CPP = 'object file from C++'
    ASM = 'object file from assembler'
    RES = 'object from CVTRES'
    LNK = 'linker version'
    IMP = 'dll import in library file'
    EXP = 'dll export in library file'

    @property
    def tag(self) -> str:
        if self in (VIT.OBJ, VIT.CPP, VIT.ASM, VIT.RES):
            return 'object'
        if self is VIT.IMP:
            return 'import'
        if self is VIT.EXP:
            return 'export'
        if self is VIT.LNK:
            return 'linker'
        else:
            return 'unknown'


@dataclass
class VersionInfo:
    pid: str
    ver: str
    err: bool

    def __str__(self):
        return F'{self.ver} [{self.pid.upper()}]'

    def __bool__(self):
        return not self.err


with resources.path(data, 'rich.json') as path:
    with path.open('r') as stream:
        RICH = json.load(stream)


class ShortPID(str, Enum):
    UTC = 'STDLIB' # STDLIBC
    RES = 'CVTRES' # Cvt/RES
    OMF = 'CVTOMF' # Cvt/OMF
    PGD = 'CVTPGD' # Cvt/PGD
    LNK = 'LINKER' # Linker
    EXP = 'EXPORT' # Exports
    IMP = 'IMPORT' # Imports
    OBJ = 'OBJECT' # Object
    PHX = 'PHOENX' # Phoenix
    ASM = 'MASM'   # MASM
    MIL = 'MSIL'   # MSIL
    VB6 = 'VB6OBJ' # VB6

    def __str__(self):
        width = max(len(item.value) for item in self.__class__)
        return F'{self.value:>{width}}'


def get_rich_short_pid(pid: str) -> ShortPID:
    pid = pid.upper()
    if pid.startswith('UTC'):
        return ShortPID.UTC
    if pid.startswith('CVTRES'):
        return ShortPID.RES
    if pid.startswith('CVTOMF'):
        return ShortPID.OMF
    if pid.startswith('CVTPGD'):
        return ShortPID.PGD
    if pid.startswith('LINKER'):
        return ShortPID.LNK
    if pid.startswith('EXPORT'):
        return ShortPID.EXP
    if pid.startswith('IMPORT'):
        return ShortPID.IMP
    if pid.startswith('IMPLIB'):
        return ShortPID.IMP
    if pid.startswith('ALIASOBJ'):
        return ShortPID.OBJ
    if pid.startswith('RESOURCE'):
        return ShortPID.RES
    if pid.startswith('PHX'):
        return ShortPID.PHX
    if pid.startswith('PHOENIX'):
        return ShortPID.PHX
    if pid.startswith('MASM'):
        return ShortPID.ASM
    if pid.startswith('ILASM'):
        return ShortPID.MIL
    if pid.startswith('VISUALBASIC'):
        return ShortPID.VB6
    raise LookupError(pid)


def get_rich_info(vid: int) -> VersionInfo:
    pid = vid >> 0x10
    ver = vid & 0xFFFF
    ver = RICH['ver'].get(F'{ver:04X}')
    pid = RICH['pid'].get(F'{pid:04X}')
    err = ver is None and pid is None
    if ver is not None:
        suffix = ver.get('ver')
        ver = ver['ide']
        if suffix:
            ver = F'{ver} {suffix}'
    else:
        ver = 'Unknown Version'
    pid = pid or 'Unknown Type'
    return VersionInfo(pid, ver, err)


class pemeta(Unit):
    """
    Extract metadata from PE files. By default, all information except for imports and exports are
    extracted.
    """
    def __init__(
        self, custom : Arg('-c', '--custom',
            help='Unless enabled, all default categories will be extracted.') = False,
        debug      : Arg.Switch('-D', help='Parse the PDB path from the debug directory.') = False,
        dotnet     : Arg.Switch('-N', help='Parse the .NET header.') = False,
        signatures : Arg.Switch('-S', help='Parse digital signatures.') = False,
        timestamps : Arg.Counts('-T', help='Extract time stamps. Specify twice for more detail.') = 0,
        version    : Arg.Switch('-V', help='Parse the VERSION resource.') = False,
        header     : Arg.Switch('-H', help='Parse base data from the PE header.') = False,
        exports    : Arg.Counts('-E', help='List all exported functions. Specify twice to include addresses.') = 0,
        imports    : Arg.Counts('-I', help='List all imported functions. Specify twice to include addresses.') = 0,
        tabular    : Arg.Switch('-t', help='Print information in a table rather than as JSON') = False,
        timeraw    : Arg.Switch('-r', help='Extract time stamps as numbers instead of human-readable format.') = False,
    ):
        if not custom and not any((debug, dotnet, signatures, timestamps, version, header)):
            debug = dotnet = signatures = timestamps = version = header = True
        super().__init__(
            debug=debug,
            dotnet=dotnet,
            signatures=signatures,
            timestamps=timestamps,
            version=version,
            header=header,
            imports=imports,
            exports=exports,
            timeraw=timeraw,
            tabular=tabular,
        )

    @classmethod
    def _ensure_string(cls, x):
        if not isinstance(x, str):
            x = repr(x) if not isinstance(x, bytes) else x.decode(cls.codec, 'backslashreplace')
        return x

    @classmethod
    def _parse_pedict(cls, bin):
        return dict((
            cls._ensure_string(key),
            cls._ensure_string(val)
        ) for key, val in bin.items() if val)

    @classmethod
    def parse_signature(cls, data: bytearray) -> dict:
        """
        Extracts a JSON-serializable and human-readable dictionary with information about
        time stamp and code signing certificates that are attached to the input PE file.
        """
        from refinery.units.formats.pkcs7 import pkcs7

        try:
            signature = data | pkcs7 | json.loads
        except Exception as E:
            raise ValueError(F'PKCS7 parser failed with error: {E!s}')

        info = {}

        def find_timestamps(entry):
            if isinstance(entry, dict):
                if set(entry.keys()) == {'type', 'value'}:
                    if entry['type'] == 'signing_time':
                        return {'Timestamp': entry['value']}
                for value in entry.values():
                    result = find_timestamps(value)
                    if result is None:
                        continue
                    with suppress(KeyError):
                        result.setdefault('TimestampIssuer', entry['sid']['issuer']['common_name'])
                    return result
            elif isinstance(entry, list):
                for value in entry:
                    result = find_timestamps(value)
                    if result is None:
                        continue
                    return result

        timestamp_info = find_timestamps(signature)
        if timestamp_info is not None:
            info.update(timestamp_info)

        try:
            certificates = signature['content']['certificates']
        except KeyError:
            return info

        if len(certificates) == 1:
            main_certificate = certificates[0]
        else:
            certificates_with_extended_use = []
            main_certificate = None
            for certificate in certificates:
                with suppress(Exception):
                    crt = certificate['tbs_certificate']
                    ext = [e for e in crt['extensions'] if e['extn_id'] == 'extended_key_usage' and e['extn_value'] != ['time_stamping']]
                    key = [e for e in crt['extensions'] if e['extn_id'] == 'key_usage']
                    if ext:
                        certificates_with_extended_use.append(certificate)
                    if any('key_cert_sign' in e['extn_value'] for e in key):
                        continue
                    if any('code_signing' in e['extn_value'] for e in ext):
                        main_certificate = certificate
                        break
            if main_certificate is None and len(certificates_with_extended_use) == 1:
                main_certificate = certificates_with_extended_use[0]
        if main_certificate:
            crt = main_certificate['tbs_certificate']
            serial = crt['serial_number']
            if isinstance(serial, int):
                serial = F'{serial:x}'
            if len(serial) % 2 != 0:
                serial = F'0{serial}'
            assert bytes.fromhex(serial) in data
            subject = crt['subject']
            location = [subject.get(t, '') for t in ('locality_name', 'state_or_province_name', 'country_name')]
            info.update(Subject=subject['common_name'])
            if any(location):
                info.update(SubjectLocation=', '.join(filter(None, location)))
            for signer_info in signature['content'].get('signer_infos', ()):
                try:
                    if signer_info['sid']['serial_number'] != crt['serial_number']:
                        continue
                    for attr in signer_info['signed_attrs']:
                        if attr['type'] == 'authenticode_info':
                            info.update(ProgramName=attr['value']['programName'])
                            info.update(MoreInfo=attr['value']['moreInfo'])
                except KeyError:
                    continue
            try:
                valid_from = crt['validity']['not_before']
                valid_until = crt['validity']['not_after']
            except KeyError:
                pass
            else:
                info.update(ValidFrom=valid_from, ValidUntil=valid_until)
            info.update(
                Issuer=crt['issuer']['common_name'], Fingerprint=main_certificate['fingerprint'], Serial=serial)
            return info
        return info

    def _pe_characteristics(self, pe: PE):
        return {name for name, mask in image_characteristics
            if pe.FILE_HEADER.Characteristics & mask}

    def _pe_address_width(self, pe: PE, default=16) -> int:
        if 'IMAGE_FILE_16BIT_MACHINE' in self._pe_characteristics(pe):
            return 4
        elif MACHINE_TYPE[pe.FILE_HEADER.Machine] in ['IMAGE_FILE_MACHINE_I386']:
            return 8
        elif MACHINE_TYPE[pe.FILE_HEADER.Machine] in [
            'IMAGE_FILE_MACHINE_AMD64',
            'IMAGE_FILE_MACHINE_IA64',
        ]:
            return 16
        else:
            return default

    def _vint(self, pe: PE, value: int):
        if not self.args.tabular:
            return value
        aw = self._pe_address_width(pe)
        return F'0x{value:0{aw}X}'

    def parse_version(self, pe: PE, data=None) -> dict:
        """
        Extracts a JSON-serializable and human-readable dictionary with information about
        the version resource of an input PE file, if available.
        """
        pe.parse_data_directories(directories=[DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']])
        string_table_entries = []
        for FileInfo in pe.FileInfo:
            for FileInfoEntry in FileInfo:
                with suppress(AttributeError):
                    for StringTableEntry in FileInfoEntry.StringTable:
                        StringTableEntryParsed = self._parse_pedict(StringTableEntry.entries)
                        with suppress(AttributeError):
                            LangID = StringTableEntry.entries.get('LangID', None) or StringTableEntry.LangID
                            LangID = int(LangID, 0x10) if not isinstance(LangID, int) else LangID
                            LangHi = LangID >> 0x10
                            LangLo = LangID & 0xFFFF
                            Language = self._LCID.get(LangHi, 'Language Neutral')
                            Charset = self._CHARSET.get(LangLo, 'Unknown Charset')
                            StringTableEntryParsed.update(
                                LangID=F'{LangID:08X}',
                                Charset=Charset,
                                Language=Language
                            )
                        for key in StringTableEntryParsed:
                            if key.endswith('Version'):
                                value = StringTableEntryParsed[key]
                                separator = ', '
                                if re.match(F'\\d+({re.escape(separator)}\\d+){{3}}', value):
                                    StringTableEntryParsed[key] = '.'.join(value.split(separator))
                        string_table_entries.append(StringTableEntryParsed)
        if not string_table_entries:
            return None
        elif len(string_table_entries) == 1:
            return string_table_entries[0]
        else:
            return string_table_entries

    def parse_exports(self, pe: PE, data=None, include_addresses=False) -> list:
        pe.parse_data_directories(directories=[DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT']])
        base = pe.OPTIONAL_HEADER.ImageBase
        info = []
        for k, exp in enumerate(pe.DIRECTORY_ENTRY_EXPORT.symbols):
            if not exp.name:
                name = F'@{k}'
            else:
                name = exp.name.decode('ascii')
            item = {'Name': name, 'Address': self._vint(pe, exp.address + base)} if include_addresses else name
            info.append(item)
        return info

    def parse_imports(self, pe: PE, data=None, include_addresses=False) -> list:
        info = {}
        dirs = []
        for name in [
            'DIRECTORY_ENTRY_IMPORT',
            'DIRECTORY_ENTRY_DELAY_IMPORT',
        ]:
            pe.parse_data_directories(directories=[DIRECTORY_ENTRY[F'IMAGE_{name}']])
            with suppress(AttributeError):
                dirs.append(getattr(pe, name))
        self.log_warn(dirs)
        for idd in itertools.chain(*dirs):
            dll: bytes = idd.dll
            dll = dll.decode('ascii')
            if dll.lower().endswith('.dll'):
                dll = dll[:~3]
            imports: list[str] = info.setdefault(dll, [])
            with suppress(AttributeError):
                symbols = idd.imports
            with suppress(AttributeError):
                symbols = idd.entries
            try:
                for imp in symbols:
                    name: bytes = imp.name
                    name = name and name.decode('ascii') or F'@{imp.ordinal}'
                    if not include_addresses:
                        imports.append(name)
                    else:
                        imports.append(dict(Name=name, Address=self._vint(pe, imp.address)))
            except Exception as e:
                self.log_warn(F'error parsing {name}: {e!s}')
        return info

    def parse_header(self, pe: PE, data=None) -> dict:
        def format_macro_name(name: str, prefix, convert=True):
            name = name.split('_')[prefix:]
            if convert:
                for k, part in enumerate(name):
                    name[k] = part.upper() if len(part) <= 3 else part.capitalize()
            return ' '.join(name)

        major = pe.OPTIONAL_HEADER.MajorOperatingSystemVersion
        minor = pe.OPTIONAL_HEADER.MinorOperatingSystemVersion
        version = self._WINVER.get(major, {0: 'Unknown'})

        try:
            MinimumOS = version[minor]
        except LookupError:
            MinimumOS = version[0]
        header_information = {
            'Machine': format_macro_name(MACHINE_TYPE[pe.FILE_HEADER.Machine], 3, False),
            'Subsystem': format_macro_name(SUBSYSTEM_TYPE[pe.OPTIONAL_HEADER.Subsystem], 2),
            'MinimumOS': MinimumOS,
        }

        pe.parse_data_directories(directories=[
            DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'],
        ])

        try:
            export_name = pe.DIRECTORY_ENTRY_EXPORT.name
            if isinstance(export_name, bytes):
                export_name = export_name.decode('utf8')
            if not export_name.isprintable():
                export_name = None
        except Exception:
            export_name = None
        if export_name:
            header_information['ExportName'] = export_name

        rich_header = pe.parse_rich_header()
        rich = []

        if rich_header:
            it = rich_header.get('values', [])
            if self.args.tabular:
                cw = max(len(F'{c:d}') for c in it[1::2])
            for idv, count in zip(it[0::2], it[1::2]):
                info = get_rich_info(idv)
                if not info:
                    continue
                pid = info.pid.upper()
                if self.args.tabular:
                    short_pid = get_rich_short_pid(pid)
                    rich.append(F'[{idv:08x}] {count:>0{cw}d} {short_pid!s} {info.ver}')
                else:
                    rich.append({
                        'Counter': count,
                        'Encoded': F'{idv:08x}',
                        'Library': pid,
                        'Product': info.ver,
                    })
            header_information['RICH'] = rich

        characteristics = self._pe_characteristics(pe)
        for typespec, flag in {
            'EXE': 'IMAGE_FILE_EXECUTABLE_IMAGE',
            'DLL': 'IMAGE_FILE_DLL',
            'SYS': 'IMAGE_FILE_SYSTEM'
        }.items():
            if flag in characteristics:
                header_information['Type'] = typespec

        base = pe.OPTIONAL_HEADER.ImageBase
        header_information['ImageBase'] = self._vint(pe, base)
        header_information['ImageSize'] = get_pe_size(pe)
        header_information['Bits'] = 4 * self._pe_address_width(pe, 16)
        header_information['EntryPoint'] = self._vint(pe, pe.OPTIONAL_HEADER.AddressOfEntryPoint + base)
        return header_information

    def parse_time_stamps(self, pe: PE, raw_time_stamps: bool, more_detail: bool) -> dict:
        """
        Extracts time stamps from the PE header (link time), as well as from the imports,
        exports, debug, and resource directory. The resource time stamp is also parsed as
        a DOS time stamp and returned as the "Delphi" time stamp.
        """
        if raw_time_stamps:
            def dt(ts): return ts
        else:
            dt = date_from_timestamp

        pe.parse_data_directories(directories=[
            DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT'],
            DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'],
            DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_BOUND_IMPORT'],
            DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_DELAY_IMPORT'],
            DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_DEBUG'],
            DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']
        ])

        info = {}

        with suppress(AttributeError):
            info.update(Linker=dt(pe.FILE_HEADER.TimeDateStamp))

        for dir_name, _dll, info_key in [
            ('DIRECTORY_ENTRY_IMPORT',       'dll',  'Import'), # noqa
            ('DIRECTORY_ENTRY_DELAY_IMPORT', 'dll',  'Symbol'), # noqa
            ('DIRECTORY_ENTRY_BOUND_IMPORT', 'name', 'Module'), # noqa
        ]:
            impts = {}
            for entry in getattr(pe, dir_name, []):
                ts = 0
                with suppress(AttributeError):
                    ts = entry.struct.dwTimeDateStamp
                with suppress(AttributeError):
                    ts = entry.struct.TimeDateStamp
                if ts == 0 or ts == 0xFFFFFFFF:
                    continue
                name = getattr(entry, _dll, B'').decode()
                if name.lower().endswith('.dll'):
                    name = name[:-4]
                impts[name] = dt(ts)
            if not impts:
                continue
            if not more_detail:
                dmin = min(impts.values())
                dmax = max(impts.values())
                small_delta = 2 * 60 * 60
                if not raw_time_stamps:
                    small_delta = timedelta(seconds=small_delta)
                if dmax - dmin < small_delta:
                    impts = dmin
            info[info_key] = impts

        with suppress(AttributeError):
            Export = pe.DIRECTORY_ENTRY_EXPORT.struct.TimeDateStamp
            if Export: info.update(Export=dt(Export))

        with suppress(AttributeError):
            res_timestamp = pe.DIRECTORY_ENTRY_RESOURCE.struct.TimeDateStamp
            if res_timestamp:
                with suppress(ValueError):
                    from refinery.units.misc.datefix import datefix
                    dos = datefix.dostime(res_timestamp)
                    info.update(Delphi=dos)
                    info.update(RsrcTS=dt(res_timestamp))

        def norm(value):
            if isinstance(value, list):
                return [norm(v) for v in value]
            if isinstance(value, dict):
                return {k: norm(v) for k, v in value.items()}
            if isinstance(value, int):
                return value
            return str(value)

        return {key: norm(value) for key, value in info.items()}

    def parse_dotnet(self, pe: PE, data):
        """
        Extracts a JSON-serializable and human-readable dictionary with information about
        the .NET metadata of an input PE file.
        """
        header = DotNetHeader(data, pe=pe)
        tables = header.meta.Streams.Tables
        info = dict(
            RuntimeVersion=F'{header.head.MajorRuntimeVersion}.{header.head.MinorRuntimeVersion}',
            Version=F'{header.meta.MajorVersion}.{header.meta.MinorVersion}',
            VersionString=header.meta.VersionString
        )

        info['Flags'] = [name for name, check in header.head.KnownFlags.items() if check]

        if len(tables.Assembly) == 1:
            assembly = tables.Assembly[0]
            info.update(
                AssemblyName=assembly.Name,
                Release='{}.{}.{}.{}'.format(
                    assembly.MajorVersion,
                    assembly.MinorVersion,
                    assembly.BuildNumber,
                    assembly.RevisionNumber
                )
            )

        try:
            entry = self._vint(pe, header.head.EntryPointToken + pe.OPTIONAL_HEADER.ImageBase)
            info.update(EntryPoint=entry)
        except AttributeError:
            pass

        if len(tables.Module) == 1:
            module = tables.Module[0]
            info.update(ModuleName=module.Name)

        return info

    def parse_debug(self, pe: PE, data=None):
        result = {}
        pe.parse_data_directories(directories=[
            DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_DEBUG']])
        for dbg in pe.DIRECTORY_ENTRY_DEBUG:
            if DEBUG_TYPE.get(dbg.struct.Type, None) != 'IMAGE_DEBUG_TYPE_CODEVIEW':
                continue
            with suppress(Exception):
                pdb = dbg.entry.PdbFileName
                if 0 in pdb:
                    pdb = pdb[:pdb.index(0)]
                result.update(
                    PdbPath=pdb.decode(self.codec),
                    PdbAge=dbg.entry.Age
                )
        return result

    def process(self, data):
        result = {}
        pe = PE(data=data, fast_load=True)

        for switch, resolver, name in [
            (self.args.debug,   self.parse_debug,    'Debug'),    # noqa
            (self.args.dotnet,  self.parse_dotnet,   'DotNet'),   # noqa
            (self.args.header,  self.parse_header,   'Header'),   # noqa
            (self.args.version, self.parse_version,  'Version'),  # noqa
            (self.args.imports, self.parse_imports,  'Imports'),  # noqa
            (self.args.exports, self.parse_exports,  'Exports'),  # noqa
        ]:
            if not switch:
                continue
            self.log_debug(F'parsing: {name}')
            args = pe, data
            if switch > 1:
                args = *args, True
            try:
                info = resolver(*args)
            except Exception as E:
                self.log_info(F'failed to obtain {name}: {E!s}')
                continue
            if info:
                result[name] = info

        signature = {}

        if self.args.timestamps or self.args.signatures:
            with suppress(Exception):
                from refinery.units.formats.pe.pesig import pesig
                signature = self.parse_signature(next(data | pesig))

        if self.args.timestamps:
            ts = self.parse_time_stamps(pe, self.args.timeraw, self.args.timestamps > 1)
            with suppress(KeyError):
                ts.update(Signed=signature['Timestamp'])
            result.update(TimeStamp=ts)

        if signature and self.args.signatures:
            result['Signature'] = signature

        if result:
            yield from ppjson(tabular=self.args.tabular)._pretty_output(result, indent=4, ensure_ascii=False)

    _LCID = {
        0x0C00: 'Default Custom Locale Language',
        0x1400: 'Default Custom MUI Locale Language',
        0x007F: 'Invariant Locale Language',
        0x0000: 'Neutral Locale Language',
        0x0800: 'System Default Locale Language',
        0x1000: 'Unspecified Custom Locale Language',
        0x0400: 'User Default Locale Language',
        0x0436: 'Afrikaans-South Africa',
        0x041c: 'Albanian-Albania',
        0x045e: 'Amharic-Ethiopia',
        0x0401: 'Arabic (Saudi Arabia)',
        0x1401: 'Arabic (Algeria)',
        0x3c01: 'Arabic (Bahrain)',
        0x0c01: 'Arabic (Egypt)',
        0x0801: 'Arabic (Iraq)',
        0x2c01: 'Arabic (Jordan)',
        0x3401: 'Arabic (Kuwait)',
        0x3001: 'Arabic (Lebanon)',
        0x1001: 'Arabic (Libya)',
        0x1801: 'Arabic (Morocco)',
        0x2001: 'Arabic (Oman)',
        0x4001: 'Arabic (Qatar)',
        0x2801: 'Arabic (Syria)',
        0x1c01: 'Arabic (Tunisia)',
        0x3801: 'Arabic (U.A.E.)',
        0x2401: 'Arabic (Yemen)',
        0x042b: 'Armenian-Armenia',
        0x044d: 'Assamese',
        0x082c: 'Azeri (Cyrillic)',
        0x042c: 'Azeri (Latin)',
        0x042d: 'Basque',
        0x0423: 'Belarusian',
        0x0445: 'Bengali (India)',
        0x0845: 'Bengali (Bangladesh)',
        0x141A: 'Bosnian (Bosnia/Herzegovina)',
        0x0402: 'Bulgarian',
        0x0455: 'Burmese',
        0x0403: 'Catalan',
        0x045c: 'Cherokee-United States',
        0x0804: 'Chinese (People\'s Republic of China)',
        0x1004: 'Chinese (Singapore)',
        0x0404: 'Chinese (Taiwan)',
        0x0c04: 'Chinese (Hong Kong SAR)',
        0x1404: 'Chinese (Macao SAR)',
        0x041a: 'Croatian',
        0x101a: 'Croatian (Bosnia/Herzegovina)',
        0x0405: 'Czech',
        0x0406: 'Danish',
        0x0465: 'Divehi',
        0x0413: 'Dutch-Netherlands',
        0x0813: 'Dutch-Belgium',
        0x0466: 'Edo',
        0x0409: 'English (United States)',
        0x0809: 'English (United Kingdom)',
        0x0c09: 'English (Australia)',
        0x2809: 'English (Belize)',
        0x1009: 'English (Canada)',
        0x2409: 'English (Caribbean)',
        0x3c09: 'English (Hong Kong SAR)',
        0x4009: 'English (India)',
        0x3809: 'English (Indonesia)',
        0x1809: 'English (Ireland)',
        0x2009: 'English (Jamaica)',
        0x4409: 'English (Malaysia)',
        0x1409: 'English (New Zealand)',
        0x3409: 'English (Philippines)',
        0x4809: 'English (Singapore)',
        0x1c09: 'English (South Africa)',
        0x2c09: 'English (Trinidad)',
        0x3009: 'English (Zimbabwe)',
        0x0425: 'Estonian',
        0x0438: 'Faroese',
        0x0429: 'Farsi',
        0x0464: 'Filipino',
        0x040b: 'Finnish',
        0x040c: 'French (France)',
        0x080c: 'French (Belgium)',
        0x2c0c: 'French (Cameroon)',
        0x0c0c: 'French (Canada)',
        0x240c: 'French (Democratic Rep. of Congo)',
        0x300c: 'French (Cote d\'Ivoire)',
        0x3c0c: 'French (Haiti)',
        0x140c: 'French (Luxembourg)',
        0x340c: 'French (Mali)',
        0x180c: 'French (Monaco)',
        0x380c: 'French (Morocco)',
        0xe40c: 'French (North Africa)',
        0x200c: 'French (Reunion)',
        0x280c: 'French (Senegal)',
        0x100c: 'French (Switzerland)',
        0x1c0c: 'French (West Indies)',
        0x0462: 'Frisian-Netherlands',
        0x0467: 'Fulfulde-Nigeria',
        0x042f: 'FYRO Macedonian',
        0x083c: 'Gaelic (Ireland)',
        0x043c: 'Gaelic (Scotland)',
        0x0456: 'Galician',
        0x0437: 'Georgian',
        0x0407: 'German (Germany)',
        0x0c07: 'German (Austria)',
        0x1407: 'German (Liechtenstein)',
        0x1007: 'German (Luxembourg)',
        0x0807: 'German (Switzerland)',
        0x0408: 'Greek',
        0x0474: 'Guarani-Paraguay',
        0x0447: 'Gujarati',
        0x0468: 'Hausa-Nigeria',
        0x0475: 'Hawaiian (United States)',
        0x040d: 'Hebrew',
        0x0439: 'Hindi',
        0x040e: 'Hungarian',
        0x0469: 'Ibibio-Nigeria',
        0x040f: 'Icelandic',
        0x0470: 'Igbo-Nigeria',
        0x0421: 'Indonesian',
        0x045d: 'Inuktitut',
        0x0410: 'Italian (Italy)',
        0x0810: 'Italian (Switzerland)',
        0x0411: 'Japanese',
        0x044b: 'Kannada',
        0x0471: 'Kanuri-Nigeria',
        0x0860: 'Kashmiri',
        0x0460: 'Kashmiri (Arabic)',
        0x043f: 'Kazakh',
        0x0453: 'Khmer',
        0x0457: 'Konkani',
        0x0412: 'Korean',
        0x0440: 'Kyrgyz (Cyrillic)',
        0x0454: 'Lao',
        0x0476: 'Latin',
        0x0426: 'Latvian',
        0x0427: 'Lithuanian',
        0x043e: 'Malay-Malaysia',
        0x083e: 'Malay-Brunei Darussalam',
        0x044c: 'Malayalam',
        0x043a: 'Maltese',
        0x0458: 'Manipuri',
        0x0481: 'Maori-New Zealand',
        0x044e: 'Marathi',
        0x0450: 'Mongolian (Cyrillic)',
        0x0850: 'Mongolian (Mongolian)',
        0x0461: 'Nepali',
        0x0861: 'Nepali-India',
        0x0414: 'Norwegian (Bokmål)',
        0x0814: 'Norwegian (Nynorsk)',
        0x0448: 'Oriya',
        0x0472: 'Oromo',
        0x0479: 'Papiamentu',
        0x0463: 'Pashto',
        0x0415: 'Polish',
        0x0416: 'Portuguese-Brazil',
        0x0816: 'Portuguese-Portugal',
        0x0446: 'Punjabi',
        0x0846: 'Punjabi (Pakistan)',
        0x046B: 'Quecha (Bolivia)',
        0x086B: 'Quecha (Ecuador)',
        0x0C6B: 'Quecha (Peru)',
        0x0417: 'Rhaeto-Romanic',
        0x0418: 'Romanian',
        0x0818: 'Romanian (Moldava)',
        0x0419: 'Russian',
        0x0819: 'Russian (Moldava)',
        0x043b: 'Sami (Lappish)',
        0x044f: 'Sanskrit',
        0x046c: 'Sepedi',
        0x0c1a: 'Serbian (Cyrillic)',
        0x081a: 'Serbian (Latin)',
        0x0459: 'Sindhi (India)',
        0x0859: 'Sindhi (Pakistan)',
        0x045b: 'Sinhalese-Sri Lanka',
        0x041b: 'Slovak',
        0x0424: 'Slovenian',
        0x0477: 'Somali',
        0x042e: 'Sorbian',
        0x0c0a: 'Spanish (Modern Sort)',
        0x040a: 'Spanish (Traditional Sort)',
        0x2c0a: 'Spanish (Argentina)',
        0x400a: 'Spanish (Bolivia)',
        0x340a: 'Spanish (Chile)',
        0x240a: 'Spanish (Colombia)',
        0x140a: 'Spanish (Costa Rica)',
        0x1c0a: 'Spanish (Dominican Republic)',
        0x300a: 'Spanish (Ecuador)',
        0x440a: 'Spanish (El Salvador)',
        0x100a: 'Spanish (Guatemala)',
        0x480a: 'Spanish (Honduras)',
        0x580a: 'Spanish (Latin America)',
        0x080a: 'Spanish (Mexico)',
        0x4c0a: 'Spanish (Nicaragua)',
        0x180a: 'Spanish (Panama)',
        0x3c0a: 'Spanish (Paraguay)',
        0x280a: 'Spanish (Peru)',
        0x500a: 'Spanish (Puerto Rico)',
        0x540a: 'Spanish (United States)',
        0x380a: 'Spanish (Uruguay)',
        0x200a: 'Spanish (Venezuela)',
        0x0430: 'Sutu',
        0x0441: 'Swahili',
        0x041d: 'Swedish',
        0x081d: 'Swedish-Finland',
        0x045a: 'Syriac',
        0x0428: 'Tajik',
        0x045f: 'Tamazight (Arabic)',
        0x085f: 'Tamazight (Latin)',
        0x0449: 'Tamil',
        0x0444: 'Tatar',
        0x044a: 'Telugu',
        0x041e: 'Thai',
        0x0851: 'Tibetan (Bhutan)',
        0x0451: 'Tibetan (People\'s Republic of China)',
        0x0873: 'Tigrigna (Eritrea)',
        0x0473: 'Tigrigna (Ethiopia)',
        0x0431: 'Tsonga',
        0x0432: 'Tswana',
        0x041f: 'Turkish',
        0x0442: 'Turkmen',
        0x0480: 'Uighur-China',
        0x0422: 'Ukrainian',
        0x0420: 'Urdu',
        0x0820: 'Urdu-India',
        0x0843: 'Uzbek (Cyrillic)',
        0x0443: 'Uzbek (Latin)',
        0x0433: 'Venda',
        0x042a: 'Vietnamese',
        0x0452: 'Welsh',
        0x0434: 'Xhosa',
        0x0478: 'Yi',
        0x043d: 'Yiddish',
        0x046a: 'Yoruba',
        0x0435: 'Zulu',
        0x04ff: 'HID (Human Interface DeVITe)'
    }

    _CHARSET = {
        0x0000: '7-bit ASCII',
        0x03A4: 'Japan (Shift ? JIS X-0208)',
        0x03B5: 'Korea (Shift ? KSC 5601)',
        0x03B6: 'Taiwan (Big5)',
        0x04B0: 'Unicode',
        0x04E2: 'Latin-2 (Eastern European)',
        0x04E3: 'Cyrillic',
        0x04E4: 'Multilingual',
        0x04E5: 'Greek',
        0x04E6: 'Turkish',
        0x04E7: 'Hebrew',
        0x04E8: 'Arabic',
    }

    _WINVER = {
        3: {
            0x00: 'Windows NT 3',
            0x0A: 'Windows NT 3.1',
            0x32: 'Windows NT 3.5',
            0x33: 'Windows NT 3.51',
        },
        4: {
            0x00: 'Windows 95',
            0x0A: 'Windows 98',
        },
        5: {
            0x00: 'Windows 2000',
            0x5A: 'Windows Me',
            0x01: 'Windows XP',
            0x02: 'Windows Server 2003',
        },
        6: {
            0x00: 'Windows Vista',
            0x01: 'Windows 7',
            0x02: 'Windows 8',
            0x03: 'Windows 8.1',
        },
        10: {
            0x00: 'Windows 10',
        }
    }

Functions

def get_rich_short_pid(pid)
Expand source code Browse git
def get_rich_short_pid(pid: str) -> ShortPID:
    pid = pid.upper()
    if pid.startswith('UTC'):
        return ShortPID.UTC
    if pid.startswith('CVTRES'):
        return ShortPID.RES
    if pid.startswith('CVTOMF'):
        return ShortPID.OMF
    if pid.startswith('CVTPGD'):
        return ShortPID.PGD
    if pid.startswith('LINKER'):
        return ShortPID.LNK
    if pid.startswith('EXPORT'):
        return ShortPID.EXP
    if pid.startswith('IMPORT'):
        return ShortPID.IMP
    if pid.startswith('IMPLIB'):
        return ShortPID.IMP
    if pid.startswith('ALIASOBJ'):
        return ShortPID.OBJ
    if pid.startswith('RESOURCE'):
        return ShortPID.RES
    if pid.startswith('PHX'):
        return ShortPID.PHX
    if pid.startswith('PHOENIX'):
        return ShortPID.PHX
    if pid.startswith('MASM'):
        return ShortPID.ASM
    if pid.startswith('ILASM'):
        return ShortPID.MIL
    if pid.startswith('VISUALBASIC'):
        return ShortPID.VB6
    raise LookupError(pid)
def get_rich_info(vid)
Expand source code Browse git
def get_rich_info(vid: int) -> VersionInfo:
    pid = vid >> 0x10
    ver = vid & 0xFFFF
    ver = RICH['ver'].get(F'{ver:04X}')
    pid = RICH['pid'].get(F'{pid:04X}')
    err = ver is None and pid is None
    if ver is not None:
        suffix = ver.get('ver')
        ver = ver['ide']
        if suffix:
            ver = F'{ver} {suffix}'
    else:
        ver = 'Unknown Version'
    pid = pid or 'Unknown Type'
    return VersionInfo(pid, ver, err)

Classes

class VIT (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code Browse git
class VIT(str, Enum):
    ERR = 'unknown'
    OBJ = 'object file from C'
    CPP = 'object file from C++'
    ASM = 'object file from assembler'
    RES = 'object from CVTRES'
    LNK = 'linker version'
    IMP = 'dll import in library file'
    EXP = 'dll export in library file'

    @property
    def tag(self) -> str:
        if self in (VIT.OBJ, VIT.CPP, VIT.ASM, VIT.RES):
            return 'object'
        if self is VIT.IMP:
            return 'import'
        if self is VIT.EXP:
            return 'export'
        if self is VIT.LNK:
            return 'linker'
        else:
            return 'unknown'

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var ERR
var OBJ
var CPP
var ASM
var RES
var LNK
var IMP
var EXP

Instance variables

var tag
Expand source code Browse git
@property
def tag(self) -> str:
    if self in (VIT.OBJ, VIT.CPP, VIT.ASM, VIT.RES):
        return 'object'
    if self is VIT.IMP:
        return 'import'
    if self is VIT.EXP:
        return 'export'
    if self is VIT.LNK:
        return 'linker'
    else:
        return 'unknown'
class VersionInfo (pid, ver, err)

VersionInfo(pid: str, ver: str, err: bool)

Expand source code Browse git
class VersionInfo:
    pid: str
    ver: str
    err: bool

    def __str__(self):
        return F'{self.ver} [{self.pid.upper()}]'

    def __bool__(self):
        return not self.err

Class variables

var pid
var ver
var err
class ShortPID (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code Browse git
class ShortPID(str, Enum):
    UTC = 'STDLIB' # STDLIBC
    RES = 'CVTRES' # Cvt/RES
    OMF = 'CVTOMF' # Cvt/OMF
    PGD = 'CVTPGD' # Cvt/PGD
    LNK = 'LINKER' # Linker
    EXP = 'EXPORT' # Exports
    IMP = 'IMPORT' # Imports
    OBJ = 'OBJECT' # Object
    PHX = 'PHOENX' # Phoenix
    ASM = 'MASM'   # MASM
    MIL = 'MSIL'   # MSIL
    VB6 = 'VB6OBJ' # VB6

    def __str__(self):
        width = max(len(item.value) for item in self.__class__)
        return F'{self.value:>{width}}'

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var UTC
var RES
var OMF
var PGD
var LNK
var EXP
var IMP
var OBJ
var PHX
var ASM
var MIL
var VB6
class pemeta (custom=False, debug=False, dotnet=False, signatures=False, timestamps=0, version=False, header=False, exports=0, imports=0, tabular=False, timeraw=False)

Extract metadata from PE files. By default, all information except for imports and exports are extracted.

Expand source code Browse git
class pemeta(Unit):
    """
    Extract metadata from PE files. By default, all information except for imports and exports are
    extracted.
    """
    def __init__(
        self, custom : Arg('-c', '--custom',
            help='Unless enabled, all default categories will be extracted.') = False,
        debug      : Arg.Switch('-D', help='Parse the PDB path from the debug directory.') = False,
        dotnet     : Arg.Switch('-N', help='Parse the .NET header.') = False,
        signatures : Arg.Switch('-S', help='Parse digital signatures.') = False,
        timestamps : Arg.Counts('-T', help='Extract time stamps. Specify twice for more detail.') = 0,
        version    : Arg.Switch('-V', help='Parse the VERSION resource.') = False,
        header     : Arg.Switch('-H', help='Parse base data from the PE header.') = False,
        exports    : Arg.Counts('-E', help='List all exported functions. Specify twice to include addresses.') = 0,
        imports    : Arg.Counts('-I', help='List all imported functions. Specify twice to include addresses.') = 0,
        tabular    : Arg.Switch('-t', help='Print information in a table rather than as JSON') = False,
        timeraw    : Arg.Switch('-r', help='Extract time stamps as numbers instead of human-readable format.') = False,
    ):
        if not custom and not any((debug, dotnet, signatures, timestamps, version, header)):
            debug = dotnet = signatures = timestamps = version = header = True
        super().__init__(
            debug=debug,
            dotnet=dotnet,
            signatures=signatures,
            timestamps=timestamps,
            version=version,
            header=header,
            imports=imports,
            exports=exports,
            timeraw=timeraw,
            tabular=tabular,
        )

    @classmethod
    def _ensure_string(cls, x):
        if not isinstance(x, str):
            x = repr(x) if not isinstance(x, bytes) else x.decode(cls.codec, 'backslashreplace')
        return x

    @classmethod
    def _parse_pedict(cls, bin):
        return dict((
            cls._ensure_string(key),
            cls._ensure_string(val)
        ) for key, val in bin.items() if val)

    @classmethod
    def parse_signature(cls, data: bytearray) -> dict:
        """
        Extracts a JSON-serializable and human-readable dictionary with information about
        time stamp and code signing certificates that are attached to the input PE file.
        """
        from refinery.units.formats.pkcs7 import pkcs7

        try:
            signature = data | pkcs7 | json.loads
        except Exception as E:
            raise ValueError(F'PKCS7 parser failed with error: {E!s}')

        info = {}

        def find_timestamps(entry):
            if isinstance(entry, dict):
                if set(entry.keys()) == {'type', 'value'}:
                    if entry['type'] == 'signing_time':
                        return {'Timestamp': entry['value']}
                for value in entry.values():
                    result = find_timestamps(value)
                    if result is None:
                        continue
                    with suppress(KeyError):
                        result.setdefault('TimestampIssuer', entry['sid']['issuer']['common_name'])
                    return result
            elif isinstance(entry, list):
                for value in entry:
                    result = find_timestamps(value)
                    if result is None:
                        continue
                    return result

        timestamp_info = find_timestamps(signature)
        if timestamp_info is not None:
            info.update(timestamp_info)

        try:
            certificates = signature['content']['certificates']
        except KeyError:
            return info

        if len(certificates) == 1:
            main_certificate = certificates[0]
        else:
            certificates_with_extended_use = []
            main_certificate = None
            for certificate in certificates:
                with suppress(Exception):
                    crt = certificate['tbs_certificate']
                    ext = [e for e in crt['extensions'] if e['extn_id'] == 'extended_key_usage' and e['extn_value'] != ['time_stamping']]
                    key = [e for e in crt['extensions'] if e['extn_id'] == 'key_usage']
                    if ext:
                        certificates_with_extended_use.append(certificate)
                    if any('key_cert_sign' in e['extn_value'] for e in key):
                        continue
                    if any('code_signing' in e['extn_value'] for e in ext):
                        main_certificate = certificate
                        break
            if main_certificate is None and len(certificates_with_extended_use) == 1:
                main_certificate = certificates_with_extended_use[0]
        if main_certificate:
            crt = main_certificate['tbs_certificate']
            serial = crt['serial_number']
            if isinstance(serial, int):
                serial = F'{serial:x}'
            if len(serial) % 2 != 0:
                serial = F'0{serial}'
            assert bytes.fromhex(serial) in data
            subject = crt['subject']
            location = [subject.get(t, '') for t in ('locality_name', 'state_or_province_name', 'country_name')]
            info.update(Subject=subject['common_name'])
            if any(location):
                info.update(SubjectLocation=', '.join(filter(None, location)))
            for signer_info in signature['content'].get('signer_infos', ()):
                try:
                    if signer_info['sid']['serial_number'] != crt['serial_number']:
                        continue
                    for attr in signer_info['signed_attrs']:
                        if attr['type'] == 'authenticode_info':
                            info.update(ProgramName=attr['value']['programName'])
                            info.update(MoreInfo=attr['value']['moreInfo'])
                except KeyError:
                    continue
            try:
                valid_from = crt['validity']['not_before']
                valid_until = crt['validity']['not_after']
            except KeyError:
                pass
            else:
                info.update(ValidFrom=valid_from, ValidUntil=valid_until)
            info.update(
                Issuer=crt['issuer']['common_name'], Fingerprint=main_certificate['fingerprint'], Serial=serial)
            return info
        return info

    def _pe_characteristics(self, pe: PE):
        return {name for name, mask in image_characteristics
            if pe.FILE_HEADER.Characteristics & mask}

    def _pe_address_width(self, pe: PE, default=16) -> int:
        if 'IMAGE_FILE_16BIT_MACHINE' in self._pe_characteristics(pe):
            return 4
        elif MACHINE_TYPE[pe.FILE_HEADER.Machine] in ['IMAGE_FILE_MACHINE_I386']:
            return 8
        elif MACHINE_TYPE[pe.FILE_HEADER.Machine] in [
            'IMAGE_FILE_MACHINE_AMD64',
            'IMAGE_FILE_MACHINE_IA64',
        ]:
            return 16
        else:
            return default

    def _vint(self, pe: PE, value: int):
        if not self.args.tabular:
            return value
        aw = self._pe_address_width(pe)
        return F'0x{value:0{aw}X}'

    def parse_version(self, pe: PE, data=None) -> dict:
        """
        Extracts a JSON-serializable and human-readable dictionary with information about
        the version resource of an input PE file, if available.
        """
        pe.parse_data_directories(directories=[DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']])
        string_table_entries = []
        for FileInfo in pe.FileInfo:
            for FileInfoEntry in FileInfo:
                with suppress(AttributeError):
                    for StringTableEntry in FileInfoEntry.StringTable:
                        StringTableEntryParsed = self._parse_pedict(StringTableEntry.entries)
                        with suppress(AttributeError):
                            LangID = StringTableEntry.entries.get('LangID', None) or StringTableEntry.LangID
                            LangID = int(LangID, 0x10) if not isinstance(LangID, int) else LangID
                            LangHi = LangID >> 0x10
                            LangLo = LangID & 0xFFFF
                            Language = self._LCID.get(LangHi, 'Language Neutral')
                            Charset = self._CHARSET.get(LangLo, 'Unknown Charset')
                            StringTableEntryParsed.update(
                                LangID=F'{LangID:08X}',
                                Charset=Charset,
                                Language=Language
                            )
                        for key in StringTableEntryParsed:
                            if key.endswith('Version'):
                                value = StringTableEntryParsed[key]
                                separator = ', '
                                if re.match(F'\\d+({re.escape(separator)}\\d+){{3}}', value):
                                    StringTableEntryParsed[key] = '.'.join(value.split(separator))
                        string_table_entries.append(StringTableEntryParsed)
        if not string_table_entries:
            return None
        elif len(string_table_entries) == 1:
            return string_table_entries[0]
        else:
            return string_table_entries

    def parse_exports(self, pe: PE, data=None, include_addresses=False) -> list:
        pe.parse_data_directories(directories=[DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT']])
        base = pe.OPTIONAL_HEADER.ImageBase
        info = []
        for k, exp in enumerate(pe.DIRECTORY_ENTRY_EXPORT.symbols):
            if not exp.name:
                name = F'@{k}'
            else:
                name = exp.name.decode('ascii')
            item = {'Name': name, 'Address': self._vint(pe, exp.address + base)} if include_addresses else name
            info.append(item)
        return info

    def parse_imports(self, pe: PE, data=None, include_addresses=False) -> list:
        info = {}
        dirs = []
        for name in [
            'DIRECTORY_ENTRY_IMPORT',
            'DIRECTORY_ENTRY_DELAY_IMPORT',
        ]:
            pe.parse_data_directories(directories=[DIRECTORY_ENTRY[F'IMAGE_{name}']])
            with suppress(AttributeError):
                dirs.append(getattr(pe, name))
        self.log_warn(dirs)
        for idd in itertools.chain(*dirs):
            dll: bytes = idd.dll
            dll = dll.decode('ascii')
            if dll.lower().endswith('.dll'):
                dll = dll[:~3]
            imports: list[str] = info.setdefault(dll, [])
            with suppress(AttributeError):
                symbols = idd.imports
            with suppress(AttributeError):
                symbols = idd.entries
            try:
                for imp in symbols:
                    name: bytes = imp.name
                    name = name and name.decode('ascii') or F'@{imp.ordinal}'
                    if not include_addresses:
                        imports.append(name)
                    else:
                        imports.append(dict(Name=name, Address=self._vint(pe, imp.address)))
            except Exception as e:
                self.log_warn(F'error parsing {name}: {e!s}')
        return info

    def parse_header(self, pe: PE, data=None) -> dict:
        def format_macro_name(name: str, prefix, convert=True):
            name = name.split('_')[prefix:]
            if convert:
                for k, part in enumerate(name):
                    name[k] = part.upper() if len(part) <= 3 else part.capitalize()
            return ' '.join(name)

        major = pe.OPTIONAL_HEADER.MajorOperatingSystemVersion
        minor = pe.OPTIONAL_HEADER.MinorOperatingSystemVersion
        version = self._WINVER.get(major, {0: 'Unknown'})

        try:
            MinimumOS = version[minor]
        except LookupError:
            MinimumOS = version[0]
        header_information = {
            'Machine': format_macro_name(MACHINE_TYPE[pe.FILE_HEADER.Machine], 3, False),
            'Subsystem': format_macro_name(SUBSYSTEM_TYPE[pe.OPTIONAL_HEADER.Subsystem], 2),
            'MinimumOS': MinimumOS,
        }

        pe.parse_data_directories(directories=[
            DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'],
        ])

        try:
            export_name = pe.DIRECTORY_ENTRY_EXPORT.name
            if isinstance(export_name, bytes):
                export_name = export_name.decode('utf8')
            if not export_name.isprintable():
                export_name = None
        except Exception:
            export_name = None
        if export_name:
            header_information['ExportName'] = export_name

        rich_header = pe.parse_rich_header()
        rich = []

        if rich_header:
            it = rich_header.get('values', [])
            if self.args.tabular:
                cw = max(len(F'{c:d}') for c in it[1::2])
            for idv, count in zip(it[0::2], it[1::2]):
                info = get_rich_info(idv)
                if not info:
                    continue
                pid = info.pid.upper()
                if self.args.tabular:
                    short_pid = get_rich_short_pid(pid)
                    rich.append(F'[{idv:08x}] {count:>0{cw}d} {short_pid!s} {info.ver}')
                else:
                    rich.append({
                        'Counter': count,
                        'Encoded': F'{idv:08x}',
                        'Library': pid,
                        'Product': info.ver,
                    })
            header_information['RICH'] = rich

        characteristics = self._pe_characteristics(pe)
        for typespec, flag in {
            'EXE': 'IMAGE_FILE_EXECUTABLE_IMAGE',
            'DLL': 'IMAGE_FILE_DLL',
            'SYS': 'IMAGE_FILE_SYSTEM'
        }.items():
            if flag in characteristics:
                header_information['Type'] = typespec

        base = pe.OPTIONAL_HEADER.ImageBase
        header_information['ImageBase'] = self._vint(pe, base)
        header_information['ImageSize'] = get_pe_size(pe)
        header_information['Bits'] = 4 * self._pe_address_width(pe, 16)
        header_information['EntryPoint'] = self._vint(pe, pe.OPTIONAL_HEADER.AddressOfEntryPoint + base)
        return header_information

    def parse_time_stamps(self, pe: PE, raw_time_stamps: bool, more_detail: bool) -> dict:
        """
        Extracts time stamps from the PE header (link time), as well as from the imports,
        exports, debug, and resource directory. The resource time stamp is also parsed as
        a DOS time stamp and returned as the "Delphi" time stamp.
        """
        if raw_time_stamps:
            def dt(ts): return ts
        else:
            dt = date_from_timestamp

        pe.parse_data_directories(directories=[
            DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT'],
            DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'],
            DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_BOUND_IMPORT'],
            DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_DELAY_IMPORT'],
            DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_DEBUG'],
            DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']
        ])

        info = {}

        with suppress(AttributeError):
            info.update(Linker=dt(pe.FILE_HEADER.TimeDateStamp))

        for dir_name, _dll, info_key in [
            ('DIRECTORY_ENTRY_IMPORT',       'dll',  'Import'), # noqa
            ('DIRECTORY_ENTRY_DELAY_IMPORT', 'dll',  'Symbol'), # noqa
            ('DIRECTORY_ENTRY_BOUND_IMPORT', 'name', 'Module'), # noqa
        ]:
            impts = {}
            for entry in getattr(pe, dir_name, []):
                ts = 0
                with suppress(AttributeError):
                    ts = entry.struct.dwTimeDateStamp
                with suppress(AttributeError):
                    ts = entry.struct.TimeDateStamp
                if ts == 0 or ts == 0xFFFFFFFF:
                    continue
                name = getattr(entry, _dll, B'').decode()
                if name.lower().endswith('.dll'):
                    name = name[:-4]
                impts[name] = dt(ts)
            if not impts:
                continue
            if not more_detail:
                dmin = min(impts.values())
                dmax = max(impts.values())
                small_delta = 2 * 60 * 60
                if not raw_time_stamps:
                    small_delta = timedelta(seconds=small_delta)
                if dmax - dmin < small_delta:
                    impts = dmin
            info[info_key] = impts

        with suppress(AttributeError):
            Export = pe.DIRECTORY_ENTRY_EXPORT.struct.TimeDateStamp
            if Export: info.update(Export=dt(Export))

        with suppress(AttributeError):
            res_timestamp = pe.DIRECTORY_ENTRY_RESOURCE.struct.TimeDateStamp
            if res_timestamp:
                with suppress(ValueError):
                    from refinery.units.misc.datefix import datefix
                    dos = datefix.dostime(res_timestamp)
                    info.update(Delphi=dos)
                    info.update(RsrcTS=dt(res_timestamp))

        def norm(value):
            if isinstance(value, list):
                return [norm(v) for v in value]
            if isinstance(value, dict):
                return {k: norm(v) for k, v in value.items()}
            if isinstance(value, int):
                return value
            return str(value)

        return {key: norm(value) for key, value in info.items()}

    def parse_dotnet(self, pe: PE, data):
        """
        Extracts a JSON-serializable and human-readable dictionary with information about
        the .NET metadata of an input PE file.
        """
        header = DotNetHeader(data, pe=pe)
        tables = header.meta.Streams.Tables
        info = dict(
            RuntimeVersion=F'{header.head.MajorRuntimeVersion}.{header.head.MinorRuntimeVersion}',
            Version=F'{header.meta.MajorVersion}.{header.meta.MinorVersion}',
            VersionString=header.meta.VersionString
        )

        info['Flags'] = [name for name, check in header.head.KnownFlags.items() if check]

        if len(tables.Assembly) == 1:
            assembly = tables.Assembly[0]
            info.update(
                AssemblyName=assembly.Name,
                Release='{}.{}.{}.{}'.format(
                    assembly.MajorVersion,
                    assembly.MinorVersion,
                    assembly.BuildNumber,
                    assembly.RevisionNumber
                )
            )

        try:
            entry = self._vint(pe, header.head.EntryPointToken + pe.OPTIONAL_HEADER.ImageBase)
            info.update(EntryPoint=entry)
        except AttributeError:
            pass

        if len(tables.Module) == 1:
            module = tables.Module[0]
            info.update(ModuleName=module.Name)

        return info

    def parse_debug(self, pe: PE, data=None):
        result = {}
        pe.parse_data_directories(directories=[
            DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_DEBUG']])
        for dbg in pe.DIRECTORY_ENTRY_DEBUG:
            if DEBUG_TYPE.get(dbg.struct.Type, None) != 'IMAGE_DEBUG_TYPE_CODEVIEW':
                continue
            with suppress(Exception):
                pdb = dbg.entry.PdbFileName
                if 0 in pdb:
                    pdb = pdb[:pdb.index(0)]
                result.update(
                    PdbPath=pdb.decode(self.codec),
                    PdbAge=dbg.entry.Age
                )
        return result

    def process(self, data):
        result = {}
        pe = PE(data=data, fast_load=True)

        for switch, resolver, name in [
            (self.args.debug,   self.parse_debug,    'Debug'),    # noqa
            (self.args.dotnet,  self.parse_dotnet,   'DotNet'),   # noqa
            (self.args.header,  self.parse_header,   'Header'),   # noqa
            (self.args.version, self.parse_version,  'Version'),  # noqa
            (self.args.imports, self.parse_imports,  'Imports'),  # noqa
            (self.args.exports, self.parse_exports,  'Exports'),  # noqa
        ]:
            if not switch:
                continue
            self.log_debug(F'parsing: {name}')
            args = pe, data
            if switch > 1:
                args = *args, True
            try:
                info = resolver(*args)
            except Exception as E:
                self.log_info(F'failed to obtain {name}: {E!s}')
                continue
            if info:
                result[name] = info

        signature = {}

        if self.args.timestamps or self.args.signatures:
            with suppress(Exception):
                from refinery.units.formats.pe.pesig import pesig
                signature = self.parse_signature(next(data | pesig))

        if self.args.timestamps:
            ts = self.parse_time_stamps(pe, self.args.timeraw, self.args.timestamps > 1)
            with suppress(KeyError):
                ts.update(Signed=signature['Timestamp'])
            result.update(TimeStamp=ts)

        if signature and self.args.signatures:
            result['Signature'] = signature

        if result:
            yield from ppjson(tabular=self.args.tabular)._pretty_output(result, indent=4, ensure_ascii=False)

    _LCID = {
        0x0C00: 'Default Custom Locale Language',
        0x1400: 'Default Custom MUI Locale Language',
        0x007F: 'Invariant Locale Language',
        0x0000: 'Neutral Locale Language',
        0x0800: 'System Default Locale Language',
        0x1000: 'Unspecified Custom Locale Language',
        0x0400: 'User Default Locale Language',
        0x0436: 'Afrikaans-South Africa',
        0x041c: 'Albanian-Albania',
        0x045e: 'Amharic-Ethiopia',
        0x0401: 'Arabic (Saudi Arabia)',
        0x1401: 'Arabic (Algeria)',
        0x3c01: 'Arabic (Bahrain)',
        0x0c01: 'Arabic (Egypt)',
        0x0801: 'Arabic (Iraq)',
        0x2c01: 'Arabic (Jordan)',
        0x3401: 'Arabic (Kuwait)',
        0x3001: 'Arabic (Lebanon)',
        0x1001: 'Arabic (Libya)',
        0x1801: 'Arabic (Morocco)',
        0x2001: 'Arabic (Oman)',
        0x4001: 'Arabic (Qatar)',
        0x2801: 'Arabic (Syria)',
        0x1c01: 'Arabic (Tunisia)',
        0x3801: 'Arabic (U.A.E.)',
        0x2401: 'Arabic (Yemen)',
        0x042b: 'Armenian-Armenia',
        0x044d: 'Assamese',
        0x082c: 'Azeri (Cyrillic)',
        0x042c: 'Azeri (Latin)',
        0x042d: 'Basque',
        0x0423: 'Belarusian',
        0x0445: 'Bengali (India)',
        0x0845: 'Bengali (Bangladesh)',
        0x141A: 'Bosnian (Bosnia/Herzegovina)',
        0x0402: 'Bulgarian',
        0x0455: 'Burmese',
        0x0403: 'Catalan',
        0x045c: 'Cherokee-United States',
        0x0804: 'Chinese (People\'s Republic of China)',
        0x1004: 'Chinese (Singapore)',
        0x0404: 'Chinese (Taiwan)',
        0x0c04: 'Chinese (Hong Kong SAR)',
        0x1404: 'Chinese (Macao SAR)',
        0x041a: 'Croatian',
        0x101a: 'Croatian (Bosnia/Herzegovina)',
        0x0405: 'Czech',
        0x0406: 'Danish',
        0x0465: 'Divehi',
        0x0413: 'Dutch-Netherlands',
        0x0813: 'Dutch-Belgium',
        0x0466: 'Edo',
        0x0409: 'English (United States)',
        0x0809: 'English (United Kingdom)',
        0x0c09: 'English (Australia)',
        0x2809: 'English (Belize)',
        0x1009: 'English (Canada)',
        0x2409: 'English (Caribbean)',
        0x3c09: 'English (Hong Kong SAR)',
        0x4009: 'English (India)',
        0x3809: 'English (Indonesia)',
        0x1809: 'English (Ireland)',
        0x2009: 'English (Jamaica)',
        0x4409: 'English (Malaysia)',
        0x1409: 'English (New Zealand)',
        0x3409: 'English (Philippines)',
        0x4809: 'English (Singapore)',
        0x1c09: 'English (South Africa)',
        0x2c09: 'English (Trinidad)',
        0x3009: 'English (Zimbabwe)',
        0x0425: 'Estonian',
        0x0438: 'Faroese',
        0x0429: 'Farsi',
        0x0464: 'Filipino',
        0x040b: 'Finnish',
        0x040c: 'French (France)',
        0x080c: 'French (Belgium)',
        0x2c0c: 'French (Cameroon)',
        0x0c0c: 'French (Canada)',
        0x240c: 'French (Democratic Rep. of Congo)',
        0x300c: 'French (Cote d\'Ivoire)',
        0x3c0c: 'French (Haiti)',
        0x140c: 'French (Luxembourg)',
        0x340c: 'French (Mali)',
        0x180c: 'French (Monaco)',
        0x380c: 'French (Morocco)',
        0xe40c: 'French (North Africa)',
        0x200c: 'French (Reunion)',
        0x280c: 'French (Senegal)',
        0x100c: 'French (Switzerland)',
        0x1c0c: 'French (West Indies)',
        0x0462: 'Frisian-Netherlands',
        0x0467: 'Fulfulde-Nigeria',
        0x042f: 'FYRO Macedonian',
        0x083c: 'Gaelic (Ireland)',
        0x043c: 'Gaelic (Scotland)',
        0x0456: 'Galician',
        0x0437: 'Georgian',
        0x0407: 'German (Germany)',
        0x0c07: 'German (Austria)',
        0x1407: 'German (Liechtenstein)',
        0x1007: 'German (Luxembourg)',
        0x0807: 'German (Switzerland)',
        0x0408: 'Greek',
        0x0474: 'Guarani-Paraguay',
        0x0447: 'Gujarati',
        0x0468: 'Hausa-Nigeria',
        0x0475: 'Hawaiian (United States)',
        0x040d: 'Hebrew',
        0x0439: 'Hindi',
        0x040e: 'Hungarian',
        0x0469: 'Ibibio-Nigeria',
        0x040f: 'Icelandic',
        0x0470: 'Igbo-Nigeria',
        0x0421: 'Indonesian',
        0x045d: 'Inuktitut',
        0x0410: 'Italian (Italy)',
        0x0810: 'Italian (Switzerland)',
        0x0411: 'Japanese',
        0x044b: 'Kannada',
        0x0471: 'Kanuri-Nigeria',
        0x0860: 'Kashmiri',
        0x0460: 'Kashmiri (Arabic)',
        0x043f: 'Kazakh',
        0x0453: 'Khmer',
        0x0457: 'Konkani',
        0x0412: 'Korean',
        0x0440: 'Kyrgyz (Cyrillic)',
        0x0454: 'Lao',
        0x0476: 'Latin',
        0x0426: 'Latvian',
        0x0427: 'Lithuanian',
        0x043e: 'Malay-Malaysia',
        0x083e: 'Malay-Brunei Darussalam',
        0x044c: 'Malayalam',
        0x043a: 'Maltese',
        0x0458: 'Manipuri',
        0x0481: 'Maori-New Zealand',
        0x044e: 'Marathi',
        0x0450: 'Mongolian (Cyrillic)',
        0x0850: 'Mongolian (Mongolian)',
        0x0461: 'Nepali',
        0x0861: 'Nepali-India',
        0x0414: 'Norwegian (Bokmål)',
        0x0814: 'Norwegian (Nynorsk)',
        0x0448: 'Oriya',
        0x0472: 'Oromo',
        0x0479: 'Papiamentu',
        0x0463: 'Pashto',
        0x0415: 'Polish',
        0x0416: 'Portuguese-Brazil',
        0x0816: 'Portuguese-Portugal',
        0x0446: 'Punjabi',
        0x0846: 'Punjabi (Pakistan)',
        0x046B: 'Quecha (Bolivia)',
        0x086B: 'Quecha (Ecuador)',
        0x0C6B: 'Quecha (Peru)',
        0x0417: 'Rhaeto-Romanic',
        0x0418: 'Romanian',
        0x0818: 'Romanian (Moldava)',
        0x0419: 'Russian',
        0x0819: 'Russian (Moldava)',
        0x043b: 'Sami (Lappish)',
        0x044f: 'Sanskrit',
        0x046c: 'Sepedi',
        0x0c1a: 'Serbian (Cyrillic)',
        0x081a: 'Serbian (Latin)',
        0x0459: 'Sindhi (India)',
        0x0859: 'Sindhi (Pakistan)',
        0x045b: 'Sinhalese-Sri Lanka',
        0x041b: 'Slovak',
        0x0424: 'Slovenian',
        0x0477: 'Somali',
        0x042e: 'Sorbian',
        0x0c0a: 'Spanish (Modern Sort)',
        0x040a: 'Spanish (Traditional Sort)',
        0x2c0a: 'Spanish (Argentina)',
        0x400a: 'Spanish (Bolivia)',
        0x340a: 'Spanish (Chile)',
        0x240a: 'Spanish (Colombia)',
        0x140a: 'Spanish (Costa Rica)',
        0x1c0a: 'Spanish (Dominican Republic)',
        0x300a: 'Spanish (Ecuador)',
        0x440a: 'Spanish (El Salvador)',
        0x100a: 'Spanish (Guatemala)',
        0x480a: 'Spanish (Honduras)',
        0x580a: 'Spanish (Latin America)',
        0x080a: 'Spanish (Mexico)',
        0x4c0a: 'Spanish (Nicaragua)',
        0x180a: 'Spanish (Panama)',
        0x3c0a: 'Spanish (Paraguay)',
        0x280a: 'Spanish (Peru)',
        0x500a: 'Spanish (Puerto Rico)',
        0x540a: 'Spanish (United States)',
        0x380a: 'Spanish (Uruguay)',
        0x200a: 'Spanish (Venezuela)',
        0x0430: 'Sutu',
        0x0441: 'Swahili',
        0x041d: 'Swedish',
        0x081d: 'Swedish-Finland',
        0x045a: 'Syriac',
        0x0428: 'Tajik',
        0x045f: 'Tamazight (Arabic)',
        0x085f: 'Tamazight (Latin)',
        0x0449: 'Tamil',
        0x0444: 'Tatar',
        0x044a: 'Telugu',
        0x041e: 'Thai',
        0x0851: 'Tibetan (Bhutan)',
        0x0451: 'Tibetan (People\'s Republic of China)',
        0x0873: 'Tigrigna (Eritrea)',
        0x0473: 'Tigrigna (Ethiopia)',
        0x0431: 'Tsonga',
        0x0432: 'Tswana',
        0x041f: 'Turkish',
        0x0442: 'Turkmen',
        0x0480: 'Uighur-China',
        0x0422: 'Ukrainian',
        0x0420: 'Urdu',
        0x0820: 'Urdu-India',
        0x0843: 'Uzbek (Cyrillic)',
        0x0443: 'Uzbek (Latin)',
        0x0433: 'Venda',
        0x042a: 'Vietnamese',
        0x0452: 'Welsh',
        0x0434: 'Xhosa',
        0x0478: 'Yi',
        0x043d: 'Yiddish',
        0x046a: 'Yoruba',
        0x0435: 'Zulu',
        0x04ff: 'HID (Human Interface DeVITe)'
    }

    _CHARSET = {
        0x0000: '7-bit ASCII',
        0x03A4: 'Japan (Shift ? JIS X-0208)',
        0x03B5: 'Korea (Shift ? KSC 5601)',
        0x03B6: 'Taiwan (Big5)',
        0x04B0: 'Unicode',
        0x04E2: 'Latin-2 (Eastern European)',
        0x04E3: 'Cyrillic',
        0x04E4: 'Multilingual',
        0x04E5: 'Greek',
        0x04E6: 'Turkish',
        0x04E7: 'Hebrew',
        0x04E8: 'Arabic',
    }

    _WINVER = {
        3: {
            0x00: 'Windows NT 3',
            0x0A: 'Windows NT 3.1',
            0x32: 'Windows NT 3.5',
            0x33: 'Windows NT 3.51',
        },
        4: {
            0x00: 'Windows 95',
            0x0A: 'Windows 98',
        },
        5: {
            0x00: 'Windows 2000',
            0x5A: 'Windows Me',
            0x01: 'Windows XP',
            0x02: 'Windows Server 2003',
        },
        6: {
            0x00: 'Windows Vista',
            0x01: 'Windows 7',
            0x02: 'Windows 8',
            0x03: 'Windows 8.1',
        },
        10: {
            0x00: 'Windows 10',
        }
    }

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Static methods

def parse_signature(data)

Extracts a JSON-serializable and human-readable dictionary with information about time stamp and code signing certificates that are attached to the input PE file.

Expand source code Browse git
@classmethod
def parse_signature(cls, data: bytearray) -> dict:
    """
    Extracts a JSON-serializable and human-readable dictionary with information about
    time stamp and code signing certificates that are attached to the input PE file.
    """
    from refinery.units.formats.pkcs7 import pkcs7

    try:
        signature = data | pkcs7 | json.loads
    except Exception as E:
        raise ValueError(F'PKCS7 parser failed with error: {E!s}')

    info = {}

    def find_timestamps(entry):
        if isinstance(entry, dict):
            if set(entry.keys()) == {'type', 'value'}:
                if entry['type'] == 'signing_time':
                    return {'Timestamp': entry['value']}
            for value in entry.values():
                result = find_timestamps(value)
                if result is None:
                    continue
                with suppress(KeyError):
                    result.setdefault('TimestampIssuer', entry['sid']['issuer']['common_name'])
                return result
        elif isinstance(entry, list):
            for value in entry:
                result = find_timestamps(value)
                if result is None:
                    continue
                return result

    timestamp_info = find_timestamps(signature)
    if timestamp_info is not None:
        info.update(timestamp_info)

    try:
        certificates = signature['content']['certificates']
    except KeyError:
        return info

    if len(certificates) == 1:
        main_certificate = certificates[0]
    else:
        certificates_with_extended_use = []
        main_certificate = None
        for certificate in certificates:
            with suppress(Exception):
                crt = certificate['tbs_certificate']
                ext = [e for e in crt['extensions'] if e['extn_id'] == 'extended_key_usage' and e['extn_value'] != ['time_stamping']]
                key = [e for e in crt['extensions'] if e['extn_id'] == 'key_usage']
                if ext:
                    certificates_with_extended_use.append(certificate)
                if any('key_cert_sign' in e['extn_value'] for e in key):
                    continue
                if any('code_signing' in e['extn_value'] for e in ext):
                    main_certificate = certificate
                    break
        if main_certificate is None and len(certificates_with_extended_use) == 1:
            main_certificate = certificates_with_extended_use[0]
    if main_certificate:
        crt = main_certificate['tbs_certificate']
        serial = crt['serial_number']
        if isinstance(serial, int):
            serial = F'{serial:x}'
        if len(serial) % 2 != 0:
            serial = F'0{serial}'
        assert bytes.fromhex(serial) in data
        subject = crt['subject']
        location = [subject.get(t, '') for t in ('locality_name', 'state_or_province_name', 'country_name')]
        info.update(Subject=subject['common_name'])
        if any(location):
            info.update(SubjectLocation=', '.join(filter(None, location)))
        for signer_info in signature['content'].get('signer_infos', ()):
            try:
                if signer_info['sid']['serial_number'] != crt['serial_number']:
                    continue
                for attr in signer_info['signed_attrs']:
                    if attr['type'] == 'authenticode_info':
                        info.update(ProgramName=attr['value']['programName'])
                        info.update(MoreInfo=attr['value']['moreInfo'])
            except KeyError:
                continue
        try:
            valid_from = crt['validity']['not_before']
            valid_until = crt['validity']['not_after']
        except KeyError:
            pass
        else:
            info.update(ValidFrom=valid_from, ValidUntil=valid_until)
        info.update(
            Issuer=crt['issuer']['common_name'], Fingerprint=main_certificate['fingerprint'], Serial=serial)
        return info
    return info

Methods

def parse_version(self, pe, data=None)

Extracts a JSON-serializable and human-readable dictionary with information about the version resource of an input PE file, if available.

Expand source code Browse git
def parse_version(self, pe: PE, data=None) -> dict:
    """
    Extracts a JSON-serializable and human-readable dictionary with information about
    the version resource of an input PE file, if available.
    """
    pe.parse_data_directories(directories=[DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']])
    string_table_entries = []
    for FileInfo in pe.FileInfo:
        for FileInfoEntry in FileInfo:
            with suppress(AttributeError):
                for StringTableEntry in FileInfoEntry.StringTable:
                    StringTableEntryParsed = self._parse_pedict(StringTableEntry.entries)
                    with suppress(AttributeError):
                        LangID = StringTableEntry.entries.get('LangID', None) or StringTableEntry.LangID
                        LangID = int(LangID, 0x10) if not isinstance(LangID, int) else LangID
                        LangHi = LangID >> 0x10
                        LangLo = LangID & 0xFFFF
                        Language = self._LCID.get(LangHi, 'Language Neutral')
                        Charset = self._CHARSET.get(LangLo, 'Unknown Charset')
                        StringTableEntryParsed.update(
                            LangID=F'{LangID:08X}',
                            Charset=Charset,
                            Language=Language
                        )
                    for key in StringTableEntryParsed:
                        if key.endswith('Version'):
                            value = StringTableEntryParsed[key]
                            separator = ', '
                            if re.match(F'\\d+({re.escape(separator)}\\d+){{3}}', value):
                                StringTableEntryParsed[key] = '.'.join(value.split(separator))
                    string_table_entries.append(StringTableEntryParsed)
    if not string_table_entries:
        return None
    elif len(string_table_entries) == 1:
        return string_table_entries[0]
    else:
        return string_table_entries
def parse_exports(self, pe, data=None, include_addresses=False)
Expand source code Browse git
def parse_exports(self, pe: PE, data=None, include_addresses=False) -> list:
    pe.parse_data_directories(directories=[DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT']])
    base = pe.OPTIONAL_HEADER.ImageBase
    info = []
    for k, exp in enumerate(pe.DIRECTORY_ENTRY_EXPORT.symbols):
        if not exp.name:
            name = F'@{k}'
        else:
            name = exp.name.decode('ascii')
        item = {'Name': name, 'Address': self._vint(pe, exp.address + base)} if include_addresses else name
        info.append(item)
    return info
def parse_imports(self, pe, data=None, include_addresses=False)
Expand source code Browse git
def parse_imports(self, pe: PE, data=None, include_addresses=False) -> list:
    info = {}
    dirs = []
    for name in [
        'DIRECTORY_ENTRY_IMPORT',
        'DIRECTORY_ENTRY_DELAY_IMPORT',
    ]:
        pe.parse_data_directories(directories=[DIRECTORY_ENTRY[F'IMAGE_{name}']])
        with suppress(AttributeError):
            dirs.append(getattr(pe, name))
    self.log_warn(dirs)
    for idd in itertools.chain(*dirs):
        dll: bytes = idd.dll
        dll = dll.decode('ascii')
        if dll.lower().endswith('.dll'):
            dll = dll[:~3]
        imports: list[str] = info.setdefault(dll, [])
        with suppress(AttributeError):
            symbols = idd.imports
        with suppress(AttributeError):
            symbols = idd.entries
        try:
            for imp in symbols:
                name: bytes = imp.name
                name = name and name.decode('ascii') or F'@{imp.ordinal}'
                if not include_addresses:
                    imports.append(name)
                else:
                    imports.append(dict(Name=name, Address=self._vint(pe, imp.address)))
        except Exception as e:
            self.log_warn(F'error parsing {name}: {e!s}')
    return info
def parse_header(self, pe, data=None)
Expand source code Browse git
def parse_header(self, pe: PE, data=None) -> dict:
    def format_macro_name(name: str, prefix, convert=True):
        name = name.split('_')[prefix:]
        if convert:
            for k, part in enumerate(name):
                name[k] = part.upper() if len(part) <= 3 else part.capitalize()
        return ' '.join(name)

    major = pe.OPTIONAL_HEADER.MajorOperatingSystemVersion
    minor = pe.OPTIONAL_HEADER.MinorOperatingSystemVersion
    version = self._WINVER.get(major, {0: 'Unknown'})

    try:
        MinimumOS = version[minor]
    except LookupError:
        MinimumOS = version[0]
    header_information = {
        'Machine': format_macro_name(MACHINE_TYPE[pe.FILE_HEADER.Machine], 3, False),
        'Subsystem': format_macro_name(SUBSYSTEM_TYPE[pe.OPTIONAL_HEADER.Subsystem], 2),
        'MinimumOS': MinimumOS,
    }

    pe.parse_data_directories(directories=[
        DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'],
    ])

    try:
        export_name = pe.DIRECTORY_ENTRY_EXPORT.name
        if isinstance(export_name, bytes):
            export_name = export_name.decode('utf8')
        if not export_name.isprintable():
            export_name = None
    except Exception:
        export_name = None
    if export_name:
        header_information['ExportName'] = export_name

    rich_header = pe.parse_rich_header()
    rich = []

    if rich_header:
        it = rich_header.get('values', [])
        if self.args.tabular:
            cw = max(len(F'{c:d}') for c in it[1::2])
        for idv, count in zip(it[0::2], it[1::2]):
            info = get_rich_info(idv)
            if not info:
                continue
            pid = info.pid.upper()
            if self.args.tabular:
                short_pid = get_rich_short_pid(pid)
                rich.append(F'[{idv:08x}] {count:>0{cw}d} {short_pid!s} {info.ver}')
            else:
                rich.append({
                    'Counter': count,
                    'Encoded': F'{idv:08x}',
                    'Library': pid,
                    'Product': info.ver,
                })
        header_information['RICH'] = rich

    characteristics = self._pe_characteristics(pe)
    for typespec, flag in {
        'EXE': 'IMAGE_FILE_EXECUTABLE_IMAGE',
        'DLL': 'IMAGE_FILE_DLL',
        'SYS': 'IMAGE_FILE_SYSTEM'
    }.items():
        if flag in characteristics:
            header_information['Type'] = typespec

    base = pe.OPTIONAL_HEADER.ImageBase
    header_information['ImageBase'] = self._vint(pe, base)
    header_information['ImageSize'] = get_pe_size(pe)
    header_information['Bits'] = 4 * self._pe_address_width(pe, 16)
    header_information['EntryPoint'] = self._vint(pe, pe.OPTIONAL_HEADER.AddressOfEntryPoint + base)
    return header_information
def parse_time_stamps(self, pe, raw_time_stamps, more_detail)

Extracts time stamps from the PE header (link time), as well as from the imports, exports, debug, and resource directory. The resource time stamp is also parsed as a DOS time stamp and returned as the "Delphi" time stamp.

Expand source code Browse git
def parse_time_stamps(self, pe: PE, raw_time_stamps: bool, more_detail: bool) -> dict:
    """
    Extracts time stamps from the PE header (link time), as well as from the imports,
    exports, debug, and resource directory. The resource time stamp is also parsed as
    a DOS time stamp and returned as the "Delphi" time stamp.
    """
    if raw_time_stamps:
        def dt(ts): return ts
    else:
        dt = date_from_timestamp

    pe.parse_data_directories(directories=[
        DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_IMPORT'],
        DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_EXPORT'],
        DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_BOUND_IMPORT'],
        DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_DELAY_IMPORT'],
        DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_DEBUG'],
        DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']
    ])

    info = {}

    with suppress(AttributeError):
        info.update(Linker=dt(pe.FILE_HEADER.TimeDateStamp))

    for dir_name, _dll, info_key in [
        ('DIRECTORY_ENTRY_IMPORT',       'dll',  'Import'), # noqa
        ('DIRECTORY_ENTRY_DELAY_IMPORT', 'dll',  'Symbol'), # noqa
        ('DIRECTORY_ENTRY_BOUND_IMPORT', 'name', 'Module'), # noqa
    ]:
        impts = {}
        for entry in getattr(pe, dir_name, []):
            ts = 0
            with suppress(AttributeError):
                ts = entry.struct.dwTimeDateStamp
            with suppress(AttributeError):
                ts = entry.struct.TimeDateStamp
            if ts == 0 or ts == 0xFFFFFFFF:
                continue
            name = getattr(entry, _dll, B'').decode()
            if name.lower().endswith('.dll'):
                name = name[:-4]
            impts[name] = dt(ts)
        if not impts:
            continue
        if not more_detail:
            dmin = min(impts.values())
            dmax = max(impts.values())
            small_delta = 2 * 60 * 60
            if not raw_time_stamps:
                small_delta = timedelta(seconds=small_delta)
            if dmax - dmin < small_delta:
                impts = dmin
        info[info_key] = impts

    with suppress(AttributeError):
        Export = pe.DIRECTORY_ENTRY_EXPORT.struct.TimeDateStamp
        if Export: info.update(Export=dt(Export))

    with suppress(AttributeError):
        res_timestamp = pe.DIRECTORY_ENTRY_RESOURCE.struct.TimeDateStamp
        if res_timestamp:
            with suppress(ValueError):
                from refinery.units.misc.datefix import datefix
                dos = datefix.dostime(res_timestamp)
                info.update(Delphi=dos)
                info.update(RsrcTS=dt(res_timestamp))

    def norm(value):
        if isinstance(value, list):
            return [norm(v) for v in value]
        if isinstance(value, dict):
            return {k: norm(v) for k, v in value.items()}
        if isinstance(value, int):
            return value
        return str(value)

    return {key: norm(value) for key, value in info.items()}
def parse_dotnet(self, pe, data)

Extracts a JSON-serializable and human-readable dictionary with information about the .NET metadata of an input PE file.

Expand source code Browse git
def parse_dotnet(self, pe: PE, data):
    """
    Extracts a JSON-serializable and human-readable dictionary with information about
    the .NET metadata of an input PE file.
    """
    header = DotNetHeader(data, pe=pe)
    tables = header.meta.Streams.Tables
    info = dict(
        RuntimeVersion=F'{header.head.MajorRuntimeVersion}.{header.head.MinorRuntimeVersion}',
        Version=F'{header.meta.MajorVersion}.{header.meta.MinorVersion}',
        VersionString=header.meta.VersionString
    )

    info['Flags'] = [name for name, check in header.head.KnownFlags.items() if check]

    if len(tables.Assembly) == 1:
        assembly = tables.Assembly[0]
        info.update(
            AssemblyName=assembly.Name,
            Release='{}.{}.{}.{}'.format(
                assembly.MajorVersion,
                assembly.MinorVersion,
                assembly.BuildNumber,
                assembly.RevisionNumber
            )
        )

    try:
        entry = self._vint(pe, header.head.EntryPointToken + pe.OPTIONAL_HEADER.ImageBase)
        info.update(EntryPoint=entry)
    except AttributeError:
        pass

    if len(tables.Module) == 1:
        module = tables.Module[0]
        info.update(ModuleName=module.Name)

    return info
def parse_debug(self, pe, data=None)
Expand source code Browse git
def parse_debug(self, pe: PE, data=None):
    result = {}
    pe.parse_data_directories(directories=[
        DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_DEBUG']])
    for dbg in pe.DIRECTORY_ENTRY_DEBUG:
        if DEBUG_TYPE.get(dbg.struct.Type, None) != 'IMAGE_DEBUG_TYPE_CODEVIEW':
            continue
        with suppress(Exception):
            pdb = dbg.entry.PdbFileName
            if 0 in pdb:
                pdb = pdb[:pdb.index(0)]
            result.update(
                PdbPath=pdb.decode(self.codec),
                PdbAge=dbg.entry.Age
            )
    return result

Inherited members