Module refinery.units.formats.pe.pemeta

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations

import itertools
import json

from contextlib import suppress
from datetime import datetime, timedelta, timezone
from dataclasses import dataclass
from enum import Enum

from refinery.lib import lief
from refinery.lib.dotnet.header import DotNetHeader
from refinery.units import Arg, Unit
from refinery.units.sinks.ppjson import ppjson
from refinery.units.formats.pe import get_pe_size
from refinery.lib.tools import date_from_timestamp
from refinery.lib.lcid import LCID
from refinery.lib.resources import datapath


def _FILETIME(value: int) -> datetime:
    s, ns100 = divmod(value - 116444736000000000, 10000000)
    return datetime.fromtimestamp(s, timezone.utc).replace(microsecond=(ns100 // 10))


def _STRING(value: str | bytes, dll: bool = False) -> str:
    if not isinstance(value, str):
        value, _, _ = value.partition(B'\0')
        value = value.decode('utf8')
    if dll and value.lower().endswith('.dll'):
        value = value[~3:]
    return value


class VIT(str, Enum):
    ERR = 'unknown'
    OBJ = 'object file from C'
    CPP = 'object file from C++'
    ASM = 'object file from assembler'
    RES = 'object from CVTRES'
    LNK = 'linker version'
    IMP = 'dll import in library file'
    EXP = 'dll export in library file'

    @property
    def tag(self) -> str:
        if self in (VIT.OBJ, VIT.CPP, VIT.ASM, VIT.RES):
            return 'object'
        if self is VIT.IMP:
            return 'import'
        if self is VIT.EXP:
            return 'export'
        if self is VIT.LNK:
            return 'linker'
        else:
            return 'unknown'


@dataclass
class VersionInfo:
    pid: str
    ver: str
    err: bool

    def __str__(self):
        return F'{self.ver} [{self.pid.upper()}]'

    def __bool__(self):
        return not self.err


with datapath('rich.json').open('r') as stream:
    RICH = json.load(stream)


class ShortPID(str, Enum):
    UTC = 'STDLIB' # STDLIBC
    RES = 'CVTRES' # Cvt/RES
    OMF = 'CVTOMF' # Cvt/OMF
    PGD = 'CVTPGD' # Cvt/PGD
    LNK = 'LINKER' # Linker
    EXP = 'EXPORT' # Exports
    IMP = 'IMPORT' # Imports
    OBJ = 'OBJECT' # Object
    PHX = 'PHOENX' # Phoenix
    ASM = 'MASM'   # MASM
    MIL = 'MSIL'   # MSIL
    VB6 = 'VB6OBJ' # VB6

    def __str__(self):
        width = max(len(item.value) for item in self.__class__)
        return F'{self.value:>{width}}'


def get_rich_short_pid(pid: str) -> ShortPID:
    pid = pid.upper()
    if pid.startswith('UTC'):
        return ShortPID.UTC
    if pid.startswith('CVTRES'):
        return ShortPID.RES
    if pid.startswith('CVTOMF'):
        return ShortPID.OMF
    if pid.startswith('CVTPGD'):
        return ShortPID.PGD
    if pid.startswith('LINKER'):
        return ShortPID.LNK
    if pid.startswith('EXPORT'):
        return ShortPID.EXP
    if pid.startswith('IMPORT'):
        return ShortPID.IMP
    if pid.startswith('IMPLIB'):
        return ShortPID.IMP
    if pid.startswith('ALIASOBJ'):
        return ShortPID.OBJ
    if pid.startswith('RESOURCE'):
        return ShortPID.RES
    if pid.startswith('PHX'):
        return ShortPID.PHX
    if pid.startswith('PHOENIX'):
        return ShortPID.PHX
    if pid.startswith('MASM'):
        return ShortPID.ASM
    if pid.startswith('ILASM'):
        return ShortPID.MIL
    if pid.startswith('VISUALBASIC'):
        return ShortPID.VB6
    raise LookupError(pid)


def get_rich_info(vid: int) -> VersionInfo:
    pid = vid >> 0x10
    ver = vid & 0xFFFF
    ver = RICH['ver'].get(F'{ver:04X}')
    pid = RICH['pid'].get(F'{pid:04X}')
    err = ver is None and pid is None
    if ver is not None:
        suffix = ver.get('ver')
        ver = ver['ide']
        if suffix:
            ver = F'{ver} {suffix}'
    else:
        ver = 'Unknown Version'
    pid = pid or 'Unknown Type'
    return VersionInfo(pid, ver, err)


class pemeta(Unit):
    """
    Extract metadata from PE files. By default, all information except for imports and exports are
    extracted.
    """
    def __init__(
        self, custom : Arg('-c', '--custom',
            help='Unless enabled, all default categories will be extracted.') = False,
        debug      : Arg.Switch('-D', help='Parse the PDB path from the debug directory.') = False,
        dotnet     : Arg.Switch('-N', help='Parse the .NET header.') = False,
        signatures : Arg.Switch('-S', help='Parse digital signatures.') = False,
        timestamps : Arg.Counts('-T', help='Extract time stamps. Specify twice for more detail.') = 0,
        version    : Arg.Switch('-V', help='Parse the VERSION resource.') = False,
        header     : Arg.Switch('-H', help='Parse base data from the PE header.') = False,
        exports    : Arg.Counts('-E', help='List all exported functions. Specify twice to include addresses.') = 0,
        imports    : Arg.Counts('-I', help='List all imported functions. Specify twice to include addresses.') = 0,
        tabular    : Arg.Switch('-t', help='Print information in a table rather than as JSON') = False,
        timeraw    : Arg.Switch('-r', help='Extract time stamps as numbers instead of human-readable format.') = False,
    ):
        if not custom and not any((debug, dotnet, signatures, timestamps, version, header)):
            debug = dotnet = signatures = timestamps = version = header = True
        super().__init__(
            debug=debug,
            dotnet=dotnet,
            signatures=signatures,
            timestamps=timestamps,
            version=version,
            header=header,
            imports=imports,
            exports=exports,
            timeraw=timeraw,
            tabular=tabular,
        )

    @classmethod
    def handles(self, data):
        return data[:2] == B'MZ'

    @classmethod
    def _ensure_string(cls, x):
        if not isinstance(x, str):
            x = repr(x) if not isinstance(x, bytes) else x.decode(cls.codec, 'backslashreplace')
        return x

    @classmethod
    def _parse_pedict(cls, bin):
        return dict((
            cls._ensure_string(key).replace(" ", ""),
            cls._ensure_string(val)
        ) for key, val in bin.items() if val)

    @classmethod
    def parse_signature(cls, data: bytearray) -> dict:
        """
        Extracts a JSON-serializable and human-readable dictionary with information about
        time stamp and code signing certificates that are attached to the input PE file.
        """
        from refinery.units.formats.pkcs7 import pkcs7

        try:
            signature = data | pkcs7 | json.loads
        except Exception as E:
            raise ValueError(F'PKCS7 parser failed with error: {E!s}')

        info = {}

        def _value(doc: dict, require_type=None):
            if require_type is not None:
                if doc.get('type', None) != require_type:
                    raise LookupError
            value = doc.get('value', None)
            value = [value] if value else doc.get('values', [])
            if not value:
                raise LookupError
            return value[0]

        def find_timestamps(entry) -> dict:
            if isinstance(entry, dict):
                try:
                    return {'Timestamp': _value(entry, 'signing_time')}
                except LookupError:
                    pass
                for value in entry.values():
                    result = find_timestamps(value)
                    if result is None:
                        continue
                    with suppress(KeyError):
                        result.setdefault('TimestampIssuer', entry['sid']['issuer']['common_name'])
                    return result
            elif isinstance(entry, list):
                for value in entry:
                    result = find_timestamps(value)
                    if result is None:
                        continue
                    return result

        timestamp_info = find_timestamps(signature)
        if timestamp_info is not None:
            info.update(timestamp_info)

        try:
            certificates = signature['content']['certificates']
        except KeyError:
            return info

        if len(certificates) == 1:
            main_certificate = certificates[0]
        else:
            certificates_with_extended_use = []
            main_certificate = None
            for certificate in certificates:
                with suppress(Exception):
                    crt = certificate['tbs_certificate']
                    ext = [e for e in crt['extensions'] if e['extn_id'] == 'extended_key_usage' and e['extn_value'] != ['time_stamping']]
                    key = [e for e in crt['extensions'] if e['extn_id'] == 'key_usage']
                    if ext:
                        certificates_with_extended_use.append(certificate)
                    if any('key_cert_sign' in e['extn_value'] for e in key):
                        continue
                    if any('code_signing' in e['extn_value'] for e in ext):
                        main_certificate = certificate
                        break
            if main_certificate is None and len(certificates_with_extended_use) == 1:
                main_certificate = certificates_with_extended_use[0]
        if main_certificate:
            crt = main_certificate['tbs_certificate']
            serial = crt['serial_number']
            if isinstance(serial, int):
                serial = F'{serial:x}'
            if len(serial) % 2 != 0:
                serial = F'0{serial}'
            assert bytes.fromhex(serial) in data
            subject = crt['subject']
            location = [subject.get(t, '') for t in ('locality_name', 'state_or_province_name', 'country_name')]
            info.update(Subject=subject['common_name'])
            if any(location):
                info.update(SubjectLocation=', '.join(filter(None, location)))
            for signer_info in signature['content'].get('signer_infos', ()):
                try:
                    if signer_info['sid']['serial_number'] != crt['serial_number']:
                        continue
                    for attr in signer_info['signed_attrs']:
                        if attr['type'] == 'authenticode_info':
                            auth = _value(attr)
                            info.update(ProgramName=auth['programName'])
                            info.update(MoreInfo=auth['moreInfo'])
                except KeyError:
                    continue
            try:
                valid_from = crt['validity']['not_before']
                valid_until = crt['validity']['not_after']
            except KeyError:
                pass
            else:
                info.update(ValidFrom=valid_from, ValidUntil=valid_until)
            info.update(
                Issuer=crt['issuer']['common_name'], Fingerprint=main_certificate['fingerprint'], Serial=serial)
            return info
        return info

    def _pe_characteristics(self, pe: lief.PE.Binary):
        characteristics = {F'IMAGE_FILE_{flag.name}' for flag in lief.PE.Header.CHARACTERISTICS
            if pe.header.characteristics & flag.value}
        if pe.header.characteristics & 0x40:
            # TODO: Missing from LIEF
            characteristics.add('IMAGE_FILE_16BIT_MACHINE')
        return characteristics

    def _pe_address_width(self, pe: lief.PE.Binary, default=16) -> int:
        # TODO: missing from LIEF
        IMAGE_FILE_16BIT_MACHINE = 0x40
        if pe.header.characteristics & IMAGE_FILE_16BIT_MACHINE:
            return 4
        elif pe.header.machine == lief.PE.Header.MACHINE_TYPES.I386:
            return 8
        elif pe.header.machine in (
            lief.PE.Header.MACHINE_TYPES.AMD64,
            lief.PE.Header.MACHINE_TYPES.IA64,
        ):
            return 16
        else:
            return default

    def _vint(self, pe: lief.PE.Binary, value: int):
        if not self.args.tabular:
            return value
        aw = self._pe_address_width(pe)
        return F'0x{value:0{aw}X}'

    def parse_version(self, pe: lief.PE.Binary, data=None) -> dict:
        """
        Extracts a JSON-serializable and human-readable dictionary with information about
        the version resource of an input PE file, if available.
        """
        version_info = {}
        if not pe.resources_manager.has_version:
            return None
        version = pe.resources_manager.version

        if info := version.string_file_info:
            for lng in info.langcode_items:
                version_info.update({
                    k.replace(' ', ''): _STRING(v) for k, v in lng.items.items()
                })
                version_info.update(
                    CodePage=lng.code_page.name,
                    LangID=self._vint(pe, lng.lang << 0x10 | lng.sublang),
                    Language=LCID.get(lng.lang, 'Language Neutral'),
                    Charset=self._CHARSET.get(lng.sublang, 'Unknown Charset'),
                )

        def _to_version_string(hi: int, lo: int):
            a = hi >> 0x10
            b = hi & 0xFFFF
            c = lo >> 0x10
            d = lo & 0xFFFF
            return F'{a}.{b}.{c}.{d}'

        # TODO: Missing: Version.CompanyName
        # TODO: Missing: Version.FileDescription
        # TODO: Missing: Version.LegalCopyright
        # TODO: Missing: Version.ProductName

        if info := version.fixed_file_info:
            version_info.update(
                OSName=info.file_os.name,
                FileType=info.file_type.name,
            )
            if (s := info.file_subtype).value:
                version_info.update(FileSubType=s)
            if t := info.file_date_MS << 32 | info.file_date_LS:
                version_info.update(Timestamp=_FILETIME(t))
            version_info.update(
                ProductVersion=_to_version_string(info.product_version_MS, info.product_version_LS),
                FileVersion=_to_version_string(info.file_version_MS, info.file_version_LS),
            )

        if info := version.var_file_info:
            ...

        return version_info or None

    def parse_exports(self, pe: lief.PE.Binary, data=None, include_addresses=False) -> list:
        base = pe.optional_header.imagebase
        info = []
        if not pe.has_exports:
            return None
        for k, exp in enumerate(pe.get_export().entries):
            name = exp.demangled_name
            if not name:
                name = exp.name
            if not name:
                name = F'@{k}'
            if not isinstance(name, str):
                name = name.decode('latin1')
            item = {
                'Name': name, 'Address': self._vint(pe, exp.address + base)
            } if include_addresses else name
            info.append(item)
        return info

    def parse_imports(self, pe: lief.PE.Binary, data=None, include_addresses=False) -> list:
        info = {}
        for idd in itertools.chain(pe.imports, pe.delay_imports):
            dll = _STRING(idd.name)
            if dll.lower().endswith('.dll'):
                dll = dll[:~3]
            imports: list[str] = info.setdefault(dll, [])
            for imp in idd.entries:
                name = _STRING(imp.name) or F'@{imp.ordinal}'
                imports.append(dict(
                    Name=name, Address=self._vint(pe, imp.value)
                ) if include_addresses else name)
        return info

    def parse_header(self, pe: lief.PE.Binary, data=None) -> dict:
        major = pe.optional_header.major_operating_system_version
        minor = pe.optional_header.minor_operating_system_version
        version = self._WINVER.get(major, {0: 'Unknown'})

        try:
            MinimumOS = version[minor]
        except LookupError:
            MinimumOS = version[0]
        header_information = {
            'Machine': pe.header.machine.name,
            'Subsystem': pe.optional_header.subsystem.name,
            'MinimumOS': MinimumOS,
        }
        if pe.has_exports:
            export_name = _STRING(pe.get_export().name)
            if export_name.isprintable():
                header_information['ExportName'] = export_name

        if pe.has_rich_header:
            rich = []
            if self.args.tabular:
                cw = max(len(F'{entry.count:d}') for entry in pe.rich_header.entries)
            for entry in pe.rich_header.entries:
                idv = entry.build_id | (entry.id << 0x10)
                count = entry.count
                info = get_rich_info(idv)
                if not info:
                    continue
                pid = info.pid.upper()
                if self.args.tabular:
                    short_pid = get_rich_short_pid(pid)
                    rich.append(F'[{idv:08x}] {count:>0{cw}d} {short_pid!s} {info.ver}')
                else:
                    rich.append({
                        'Counter': count,
                        'Encoded': F'{idv:08x}',
                        'Library': pid,
                        'Product': info.ver,
                    })
            header_information['RICH'] = rich

        characteristics = self._pe_characteristics(pe)
        for typespec, flag in {
            'EXE': 'IMAGE_FILE_EXECUTABLE_IMAGE',
            'DLL': 'IMAGE_FILE_DLL',
            'SYS': 'IMAGE_FILE_SYSTEM'
        }.items():
            if flag in characteristics:
                header_information['Type'] = typespec

        base = pe.optional_header.imagebase
        header_information['ImageBase'] = self._vint(pe, base)
        header_information['ImageSize'] = self._vint(pe, pe.optional_header.sizeof_image)
        header_information['ComputedSize'] = get_pe_size(pe)
        header_information['Bits'] = 4 * self._pe_address_width(pe, 16)
        header_information['EntryPoint'] = self._vint(pe, pe.optional_header.addressof_entrypoint + base)
        return header_information

    def parse_time_stamps(self, pe: lief.PE.Binary, raw_time_stamps: bool, more_detail: bool) -> dict:
        """
        Extracts time stamps from the PE header (link time), as well as from the imports,
        exports, debug, and resource directory. The resource time stamp is also parsed as
        a DOS time stamp and returned as the "Delphi" time stamp.
        """
        def _id(x): return x
        dt = _id if raw_time_stamps else date_from_timestamp
        info = {}

        with suppress(AttributeError):
            info.update(Linker=dt(pe.header.time_date_stamps))

        import_timestamps = {}
        for entry in pe.imports:
            ts = entry.timedatestamp
            if ts == 0 or ts == 0xFFFFFFFF:
                continue
            import_timestamps[_STRING(entry.name, True)] = dt(ts)

        symbol_timestamps = {}
        for entry in pe.delay_imports:
            ts = entry.timestamp
            if ts == 0 or ts == 0xFFFFFFFF:
                continue
            symbol_timestamps[_STRING(entry.name, True)] = dt(ts)

        for key, impts in [
            ('Import', import_timestamps),
            ('Symbol', symbol_timestamps),
        ]:
            if not impts:
                continue
            if not more_detail:
                dmin = min(impts.values())
                dmax = max(impts.values())
                small_delta = 2 * 60 * 60
                if not raw_time_stamps:
                    small_delta = timedelta(seconds=small_delta)
                if dmax - dmin < small_delta:
                    impts = dmin
            info[key] = impts

        if pe.has_exports and (ts := pe.get_export().timestamp):
            info.update(Export=dt(ts))

        if pe.has_resources and pe.resources.is_directory:
            rsrc: lief.PE.ResourceDirectory = pe.resources
            if res_timestamp := rsrc.time_date_stamp:
                with suppress(ValueError):
                    from refinery.units.misc.datefix import datefix
                    dos = datefix.dostime(res_timestamp)
                    info.update(Delphi=dos)
                    info.update(RsrcTS=dt(res_timestamp))

        def norm(value):
            if isinstance(value, list):
                return [norm(v) for v in value]
            if isinstance(value, dict):
                return {k: norm(v) for k, v in value.items()}
            if isinstance(value, int):
                return value
            return str(value)

        return {key: norm(value) for key, value in info.items()}

    def parse_dotnet(self, pe: lief.PE.Binary, data):
        """
        Extracts a JSON-serializable and human-readable dictionary with information about
        the .NET metadata of an input PE file.
        """
        header = DotNetHeader(data, pe)
        tables = header.meta.Streams.Tables
        info = dict(
            RuntimeVersion=F'{header.head.MajorRuntimeVersion}.{header.head.MinorRuntimeVersion}',
            Version=F'{header.meta.MajorVersion}.{header.meta.MinorVersion}',
            VersionString=header.meta.VersionString
        )

        info['Flags'] = [name for name, check in header.head.KnownFlags.items() if check]

        if len(tables.Assembly) == 1:
            assembly = tables.Assembly[0]
            info.update(
                AssemblyName=assembly.Name,
                Release='{}.{}.{}.{}'.format(
                    assembly.MajorVersion,
                    assembly.MinorVersion,
                    assembly.BuildNumber,
                    assembly.RevisionNumber
                )
            )

        try:
            entry = self._vint(pe, header.head.EntryPointToken + pe.optional_header.imagebase)
            info.update(EntryPoint=entry)
        except AttributeError:
            pass

        if len(tables.Module) == 1:
            module = tables.Module[0]
            info.update(ModuleName=module.Name)

        return info

    def parse_debug(self, pe: lief.PE.Binary, data=None):
        result = []
        if not pe.has_debug:
            return None
        for entry in pe.debug:
            if entry.type != lief.PE.Debug.TYPES.CODEVIEW:
                continue
            try:
                entry: lief.PE.CodeViewPDB
                result.append(dict(
                    PdbPath=_STRING(entry.filename),
                    PdbGUID=entry.guid,
                    PdbAge=entry.age,
                ))
            except AttributeError:
                continue
        if len(result) == 1:
            result = result[0]
        return result

    def process(self, data):
        result = {}

        pe = lief.load_pe(
            data,
            parse_exports=self.args.exports,
            parse_imports=self.args.imports,
            parse_rsrc=self.args.version,
            parse_reloc=False,
            parse_signature=self.args.timestamps or self.args.signatures,
        )

        if pe is None:
            raise ValueError('Input not recognized as a PE file.')

        for switch, resolver, name in [
            (self.args.debug,   self.parse_debug,    'Debug'),    # noqa
            (self.args.dotnet,  self.parse_dotnet,   'DotNet'),   # noqa
            (self.args.header,  self.parse_header,   'Header'),   # noqa
            (self.args.version, self.parse_version,  'Version'),  # noqa
            (self.args.imports, self.parse_imports,  'Imports'),  # noqa
            (self.args.exports, self.parse_exports,  'Exports'),  # noqa
        ]:
            if not switch:
                continue
            self.log_debug(F'parsing: {name}')
            args = pe, data
            if switch > 1:
                args = *args, True
            try:
                info = resolver(*args)
            except Exception as E:
                self.log_info(F'failed to obtain {name}: {E!s}')
                continue
            if info:
                result[name] = info

        signature = {}

        if self.args.timestamps or self.args.signatures:
            with suppress(Exception):
                from refinery.units.formats.pe.pesig import pesig
                signature = self.parse_signature(next(data | pesig))

        if signature:
            try:
                verification = pe.verify_signature()
            except Exception:
                pass
            else:
                from lief.PE import Signature
                if verification == Signature.VERIFICATION_FLAGS.OK:
                    signature['Match'] = True
                else:
                    signature['Flags'] = [
                        vf.name for vf in Signature.VERIFICATION_FLAGS if vf & verification == vf]
                    signature['Match'] = False

        if self.args.timestamps:
            ts = self.parse_time_stamps(pe, self.args.timeraw, self.args.timestamps > 1)
            with suppress(KeyError):
                ts.update(Signed=signature['Timestamp'])
            result.update(TimeStamp=ts)

        if signature and self.args.signatures:
            result['Signature'] = signature

        if result:
            yield from ppjson(tabular=self.args.tabular)._pretty_output(result, indent=4, ensure_ascii=False)

    _CHARSET = {
        0x0000: '7-bit ASCII',
        0x03A4: 'Japan (Shift ? JIS X-0208)',
        0x03B5: 'Korea (Shift ? KSC 5601)',
        0x03B6: 'Taiwan (Big5)',
        0x04B0: 'Unicode',
        0x04E2: 'Latin-2 (Eastern European)',
        0x04E3: 'Cyrillic',
        0x04E4: 'Multilingual',
        0x04E5: 'Greek',
        0x04E6: 'Turkish',
        0x04E7: 'Hebrew',
        0x04E8: 'Arabic',
    }

    _WINVER = {
        3: {
            0x00: 'Windows NT 3',
            0x0A: 'Windows NT 3.1',
            0x32: 'Windows NT 3.5',
            0x33: 'Windows NT 3.51',
        },
        4: {
            0x00: 'Windows 95',
            0x0A: 'Windows 98',
        },
        5: {
            0x00: 'Windows 2000',
            0x5A: 'Windows Me',
            0x01: 'Windows XP',
            0x02: 'Windows Server 2003',
        },
        6: {
            0x00: 'Windows Vista',
            0x01: 'Windows 7',
            0x02: 'Windows 8',
            0x03: 'Windows 8.1',
        },
        10: {
            0x00: 'Windows 10',
        }
    }

Functions

def get_rich_short_pid(pid)
Expand source code Browse git
def get_rich_short_pid(pid: str) -> ShortPID:
    pid = pid.upper()
    if pid.startswith('UTC'):
        return ShortPID.UTC
    if pid.startswith('CVTRES'):
        return ShortPID.RES
    if pid.startswith('CVTOMF'):
        return ShortPID.OMF
    if pid.startswith('CVTPGD'):
        return ShortPID.PGD
    if pid.startswith('LINKER'):
        return ShortPID.LNK
    if pid.startswith('EXPORT'):
        return ShortPID.EXP
    if pid.startswith('IMPORT'):
        return ShortPID.IMP
    if pid.startswith('IMPLIB'):
        return ShortPID.IMP
    if pid.startswith('ALIASOBJ'):
        return ShortPID.OBJ
    if pid.startswith('RESOURCE'):
        return ShortPID.RES
    if pid.startswith('PHX'):
        return ShortPID.PHX
    if pid.startswith('PHOENIX'):
        return ShortPID.PHX
    if pid.startswith('MASM'):
        return ShortPID.ASM
    if pid.startswith('ILASM'):
        return ShortPID.MIL
    if pid.startswith('VISUALBASIC'):
        return ShortPID.VB6
    raise LookupError(pid)
def get_rich_info(vid)
Expand source code Browse git
def get_rich_info(vid: int) -> VersionInfo:
    pid = vid >> 0x10
    ver = vid & 0xFFFF
    ver = RICH['ver'].get(F'{ver:04X}')
    pid = RICH['pid'].get(F'{pid:04X}')
    err = ver is None and pid is None
    if ver is not None:
        suffix = ver.get('ver')
        ver = ver['ide']
        if suffix:
            ver = F'{ver} {suffix}'
    else:
        ver = 'Unknown Version'
    pid = pid or 'Unknown Type'
    return VersionInfo(pid, ver, err)

Classes

class VIT (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code Browse git
class VIT(str, Enum):
    ERR = 'unknown'
    OBJ = 'object file from C'
    CPP = 'object file from C++'
    ASM = 'object file from assembler'
    RES = 'object from CVTRES'
    LNK = 'linker version'
    IMP = 'dll import in library file'
    EXP = 'dll export in library file'

    @property
    def tag(self) -> str:
        if self in (VIT.OBJ, VIT.CPP, VIT.ASM, VIT.RES):
            return 'object'
        if self is VIT.IMP:
            return 'import'
        if self is VIT.EXP:
            return 'export'
        if self is VIT.LNK:
            return 'linker'
        else:
            return 'unknown'

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var ERR
var OBJ
var CPP
var ASM
var RES
var LNK
var IMP
var EXP

Instance variables

var tag
Expand source code Browse git
@property
def tag(self) -> str:
    if self in (VIT.OBJ, VIT.CPP, VIT.ASM, VIT.RES):
        return 'object'
    if self is VIT.IMP:
        return 'import'
    if self is VIT.EXP:
        return 'export'
    if self is VIT.LNK:
        return 'linker'
    else:
        return 'unknown'
class VersionInfo (pid, ver, err)

VersionInfo(pid: 'str', ver: 'str', err: 'bool')

Expand source code Browse git
class VersionInfo:
    pid: str
    ver: str
    err: bool

    def __str__(self):
        return F'{self.ver} [{self.pid.upper()}]'

    def __bool__(self):
        return not self.err

Class variables

var pid
var ver
var err
class ShortPID (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code Browse git
class ShortPID(str, Enum):
    UTC = 'STDLIB' # STDLIBC
    RES = 'CVTRES' # Cvt/RES
    OMF = 'CVTOMF' # Cvt/OMF
    PGD = 'CVTPGD' # Cvt/PGD
    LNK = 'LINKER' # Linker
    EXP = 'EXPORT' # Exports
    IMP = 'IMPORT' # Imports
    OBJ = 'OBJECT' # Object
    PHX = 'PHOENX' # Phoenix
    ASM = 'MASM'   # MASM
    MIL = 'MSIL'   # MSIL
    VB6 = 'VB6OBJ' # VB6

    def __str__(self):
        width = max(len(item.value) for item in self.__class__)
        return F'{self.value:>{width}}'

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var UTC
var RES
var OMF
var PGD
var LNK
var EXP
var IMP
var OBJ
var PHX
var ASM
var MIL
var VB6
class pemeta (custom=False, debug=False, dotnet=False, signatures=False, timestamps=0, version=False, header=False, exports=0, imports=0, tabular=False, timeraw=False)

Extract metadata from PE files. By default, all information except for imports and exports are extracted.

Expand source code Browse git
class pemeta(Unit):
    """
    Extract metadata from PE files. By default, all information except for imports and exports are
    extracted.
    """
    def __init__(
        self, custom : Arg('-c', '--custom',
            help='Unless enabled, all default categories will be extracted.') = False,
        debug      : Arg.Switch('-D', help='Parse the PDB path from the debug directory.') = False,
        dotnet     : Arg.Switch('-N', help='Parse the .NET header.') = False,
        signatures : Arg.Switch('-S', help='Parse digital signatures.') = False,
        timestamps : Arg.Counts('-T', help='Extract time stamps. Specify twice for more detail.') = 0,
        version    : Arg.Switch('-V', help='Parse the VERSION resource.') = False,
        header     : Arg.Switch('-H', help='Parse base data from the PE header.') = False,
        exports    : Arg.Counts('-E', help='List all exported functions. Specify twice to include addresses.') = 0,
        imports    : Arg.Counts('-I', help='List all imported functions. Specify twice to include addresses.') = 0,
        tabular    : Arg.Switch('-t', help='Print information in a table rather than as JSON') = False,
        timeraw    : Arg.Switch('-r', help='Extract time stamps as numbers instead of human-readable format.') = False,
    ):
        if not custom and not any((debug, dotnet, signatures, timestamps, version, header)):
            debug = dotnet = signatures = timestamps = version = header = True
        super().__init__(
            debug=debug,
            dotnet=dotnet,
            signatures=signatures,
            timestamps=timestamps,
            version=version,
            header=header,
            imports=imports,
            exports=exports,
            timeraw=timeraw,
            tabular=tabular,
        )

    @classmethod
    def handles(self, data):
        return data[:2] == B'MZ'

    @classmethod
    def _ensure_string(cls, x):
        if not isinstance(x, str):
            x = repr(x) if not isinstance(x, bytes) else x.decode(cls.codec, 'backslashreplace')
        return x

    @classmethod
    def _parse_pedict(cls, bin):
        return dict((
            cls._ensure_string(key).replace(" ", ""),
            cls._ensure_string(val)
        ) for key, val in bin.items() if val)

    @classmethod
    def parse_signature(cls, data: bytearray) -> dict:
        """
        Extracts a JSON-serializable and human-readable dictionary with information about
        time stamp and code signing certificates that are attached to the input PE file.
        """
        from refinery.units.formats.pkcs7 import pkcs7

        try:
            signature = data | pkcs7 | json.loads
        except Exception as E:
            raise ValueError(F'PKCS7 parser failed with error: {E!s}')

        info = {}

        def _value(doc: dict, require_type=None):
            if require_type is not None:
                if doc.get('type', None) != require_type:
                    raise LookupError
            value = doc.get('value', None)
            value = [value] if value else doc.get('values', [])
            if not value:
                raise LookupError
            return value[0]

        def find_timestamps(entry) -> dict:
            if isinstance(entry, dict):
                try:
                    return {'Timestamp': _value(entry, 'signing_time')}
                except LookupError:
                    pass
                for value in entry.values():
                    result = find_timestamps(value)
                    if result is None:
                        continue
                    with suppress(KeyError):
                        result.setdefault('TimestampIssuer', entry['sid']['issuer']['common_name'])
                    return result
            elif isinstance(entry, list):
                for value in entry:
                    result = find_timestamps(value)
                    if result is None:
                        continue
                    return result

        timestamp_info = find_timestamps(signature)
        if timestamp_info is not None:
            info.update(timestamp_info)

        try:
            certificates = signature['content']['certificates']
        except KeyError:
            return info

        if len(certificates) == 1:
            main_certificate = certificates[0]
        else:
            certificates_with_extended_use = []
            main_certificate = None
            for certificate in certificates:
                with suppress(Exception):
                    crt = certificate['tbs_certificate']
                    ext = [e for e in crt['extensions'] if e['extn_id'] == 'extended_key_usage' and e['extn_value'] != ['time_stamping']]
                    key = [e for e in crt['extensions'] if e['extn_id'] == 'key_usage']
                    if ext:
                        certificates_with_extended_use.append(certificate)
                    if any('key_cert_sign' in e['extn_value'] for e in key):
                        continue
                    if any('code_signing' in e['extn_value'] for e in ext):
                        main_certificate = certificate
                        break
            if main_certificate is None and len(certificates_with_extended_use) == 1:
                main_certificate = certificates_with_extended_use[0]
        if main_certificate:
            crt = main_certificate['tbs_certificate']
            serial = crt['serial_number']
            if isinstance(serial, int):
                serial = F'{serial:x}'
            if len(serial) % 2 != 0:
                serial = F'0{serial}'
            assert bytes.fromhex(serial) in data
            subject = crt['subject']
            location = [subject.get(t, '') for t in ('locality_name', 'state_or_province_name', 'country_name')]
            info.update(Subject=subject['common_name'])
            if any(location):
                info.update(SubjectLocation=', '.join(filter(None, location)))
            for signer_info in signature['content'].get('signer_infos', ()):
                try:
                    if signer_info['sid']['serial_number'] != crt['serial_number']:
                        continue
                    for attr in signer_info['signed_attrs']:
                        if attr['type'] == 'authenticode_info':
                            auth = _value(attr)
                            info.update(ProgramName=auth['programName'])
                            info.update(MoreInfo=auth['moreInfo'])
                except KeyError:
                    continue
            try:
                valid_from = crt['validity']['not_before']
                valid_until = crt['validity']['not_after']
            except KeyError:
                pass
            else:
                info.update(ValidFrom=valid_from, ValidUntil=valid_until)
            info.update(
                Issuer=crt['issuer']['common_name'], Fingerprint=main_certificate['fingerprint'], Serial=serial)
            return info
        return info

    def _pe_characteristics(self, pe: lief.PE.Binary):
        characteristics = {F'IMAGE_FILE_{flag.name}' for flag in lief.PE.Header.CHARACTERISTICS
            if pe.header.characteristics & flag.value}
        if pe.header.characteristics & 0x40:
            # TODO: Missing from LIEF
            characteristics.add('IMAGE_FILE_16BIT_MACHINE')
        return characteristics

    def _pe_address_width(self, pe: lief.PE.Binary, default=16) -> int:
        # TODO: missing from LIEF
        IMAGE_FILE_16BIT_MACHINE = 0x40
        if pe.header.characteristics & IMAGE_FILE_16BIT_MACHINE:
            return 4
        elif pe.header.machine == lief.PE.Header.MACHINE_TYPES.I386:
            return 8
        elif pe.header.machine in (
            lief.PE.Header.MACHINE_TYPES.AMD64,
            lief.PE.Header.MACHINE_TYPES.IA64,
        ):
            return 16
        else:
            return default

    def _vint(self, pe: lief.PE.Binary, value: int):
        if not self.args.tabular:
            return value
        aw = self._pe_address_width(pe)
        return F'0x{value:0{aw}X}'

    def parse_version(self, pe: lief.PE.Binary, data=None) -> dict:
        """
        Extracts a JSON-serializable and human-readable dictionary with information about
        the version resource of an input PE file, if available.
        """
        version_info = {}
        if not pe.resources_manager.has_version:
            return None
        version = pe.resources_manager.version

        if info := version.string_file_info:
            for lng in info.langcode_items:
                version_info.update({
                    k.replace(' ', ''): _STRING(v) for k, v in lng.items.items()
                })
                version_info.update(
                    CodePage=lng.code_page.name,
                    LangID=self._vint(pe, lng.lang << 0x10 | lng.sublang),
                    Language=LCID.get(lng.lang, 'Language Neutral'),
                    Charset=self._CHARSET.get(lng.sublang, 'Unknown Charset'),
                )

        def _to_version_string(hi: int, lo: int):
            a = hi >> 0x10
            b = hi & 0xFFFF
            c = lo >> 0x10
            d = lo & 0xFFFF
            return F'{a}.{b}.{c}.{d}'

        # TODO: Missing: Version.CompanyName
        # TODO: Missing: Version.FileDescription
        # TODO: Missing: Version.LegalCopyright
        # TODO: Missing: Version.ProductName

        if info := version.fixed_file_info:
            version_info.update(
                OSName=info.file_os.name,
                FileType=info.file_type.name,
            )
            if (s := info.file_subtype).value:
                version_info.update(FileSubType=s)
            if t := info.file_date_MS << 32 | info.file_date_LS:
                version_info.update(Timestamp=_FILETIME(t))
            version_info.update(
                ProductVersion=_to_version_string(info.product_version_MS, info.product_version_LS),
                FileVersion=_to_version_string(info.file_version_MS, info.file_version_LS),
            )

        if info := version.var_file_info:
            ...

        return version_info or None

    def parse_exports(self, pe: lief.PE.Binary, data=None, include_addresses=False) -> list:
        base = pe.optional_header.imagebase
        info = []
        if not pe.has_exports:
            return None
        for k, exp in enumerate(pe.get_export().entries):
            name = exp.demangled_name
            if not name:
                name = exp.name
            if not name:
                name = F'@{k}'
            if not isinstance(name, str):
                name = name.decode('latin1')
            item = {
                'Name': name, 'Address': self._vint(pe, exp.address + base)
            } if include_addresses else name
            info.append(item)
        return info

    def parse_imports(self, pe: lief.PE.Binary, data=None, include_addresses=False) -> list:
        info = {}
        for idd in itertools.chain(pe.imports, pe.delay_imports):
            dll = _STRING(idd.name)
            if dll.lower().endswith('.dll'):
                dll = dll[:~3]
            imports: list[str] = info.setdefault(dll, [])
            for imp in idd.entries:
                name = _STRING(imp.name) or F'@{imp.ordinal}'
                imports.append(dict(
                    Name=name, Address=self._vint(pe, imp.value)
                ) if include_addresses else name)
        return info

    def parse_header(self, pe: lief.PE.Binary, data=None) -> dict:
        major = pe.optional_header.major_operating_system_version
        minor = pe.optional_header.minor_operating_system_version
        version = self._WINVER.get(major, {0: 'Unknown'})

        try:
            MinimumOS = version[minor]
        except LookupError:
            MinimumOS = version[0]
        header_information = {
            'Machine': pe.header.machine.name,
            'Subsystem': pe.optional_header.subsystem.name,
            'MinimumOS': MinimumOS,
        }
        if pe.has_exports:
            export_name = _STRING(pe.get_export().name)
            if export_name.isprintable():
                header_information['ExportName'] = export_name

        if pe.has_rich_header:
            rich = []
            if self.args.tabular:
                cw = max(len(F'{entry.count:d}') for entry in pe.rich_header.entries)
            for entry in pe.rich_header.entries:
                idv = entry.build_id | (entry.id << 0x10)
                count = entry.count
                info = get_rich_info(idv)
                if not info:
                    continue
                pid = info.pid.upper()
                if self.args.tabular:
                    short_pid = get_rich_short_pid(pid)
                    rich.append(F'[{idv:08x}] {count:>0{cw}d} {short_pid!s} {info.ver}')
                else:
                    rich.append({
                        'Counter': count,
                        'Encoded': F'{idv:08x}',
                        'Library': pid,
                        'Product': info.ver,
                    })
            header_information['RICH'] = rich

        characteristics = self._pe_characteristics(pe)
        for typespec, flag in {
            'EXE': 'IMAGE_FILE_EXECUTABLE_IMAGE',
            'DLL': 'IMAGE_FILE_DLL',
            'SYS': 'IMAGE_FILE_SYSTEM'
        }.items():
            if flag in characteristics:
                header_information['Type'] = typespec

        base = pe.optional_header.imagebase
        header_information['ImageBase'] = self._vint(pe, base)
        header_information['ImageSize'] = self._vint(pe, pe.optional_header.sizeof_image)
        header_information['ComputedSize'] = get_pe_size(pe)
        header_information['Bits'] = 4 * self._pe_address_width(pe, 16)
        header_information['EntryPoint'] = self._vint(pe, pe.optional_header.addressof_entrypoint + base)
        return header_information

    def parse_time_stamps(self, pe: lief.PE.Binary, raw_time_stamps: bool, more_detail: bool) -> dict:
        """
        Extracts time stamps from the PE header (link time), as well as from the imports,
        exports, debug, and resource directory. The resource time stamp is also parsed as
        a DOS time stamp and returned as the "Delphi" time stamp.
        """
        def _id(x): return x
        dt = _id if raw_time_stamps else date_from_timestamp
        info = {}

        with suppress(AttributeError):
            info.update(Linker=dt(pe.header.time_date_stamps))

        import_timestamps = {}
        for entry in pe.imports:
            ts = entry.timedatestamp
            if ts == 0 or ts == 0xFFFFFFFF:
                continue
            import_timestamps[_STRING(entry.name, True)] = dt(ts)

        symbol_timestamps = {}
        for entry in pe.delay_imports:
            ts = entry.timestamp
            if ts == 0 or ts == 0xFFFFFFFF:
                continue
            symbol_timestamps[_STRING(entry.name, True)] = dt(ts)

        for key, impts in [
            ('Import', import_timestamps),
            ('Symbol', symbol_timestamps),
        ]:
            if not impts:
                continue
            if not more_detail:
                dmin = min(impts.values())
                dmax = max(impts.values())
                small_delta = 2 * 60 * 60
                if not raw_time_stamps:
                    small_delta = timedelta(seconds=small_delta)
                if dmax - dmin < small_delta:
                    impts = dmin
            info[key] = impts

        if pe.has_exports and (ts := pe.get_export().timestamp):
            info.update(Export=dt(ts))

        if pe.has_resources and pe.resources.is_directory:
            rsrc: lief.PE.ResourceDirectory = pe.resources
            if res_timestamp := rsrc.time_date_stamp:
                with suppress(ValueError):
                    from refinery.units.misc.datefix import datefix
                    dos = datefix.dostime(res_timestamp)
                    info.update(Delphi=dos)
                    info.update(RsrcTS=dt(res_timestamp))

        def norm(value):
            if isinstance(value, list):
                return [norm(v) for v in value]
            if isinstance(value, dict):
                return {k: norm(v) for k, v in value.items()}
            if isinstance(value, int):
                return value
            return str(value)

        return {key: norm(value) for key, value in info.items()}

    def parse_dotnet(self, pe: lief.PE.Binary, data):
        """
        Extracts a JSON-serializable and human-readable dictionary with information about
        the .NET metadata of an input PE file.
        """
        header = DotNetHeader(data, pe)
        tables = header.meta.Streams.Tables
        info = dict(
            RuntimeVersion=F'{header.head.MajorRuntimeVersion}.{header.head.MinorRuntimeVersion}',
            Version=F'{header.meta.MajorVersion}.{header.meta.MinorVersion}',
            VersionString=header.meta.VersionString
        )

        info['Flags'] = [name for name, check in header.head.KnownFlags.items() if check]

        if len(tables.Assembly) == 1:
            assembly = tables.Assembly[0]
            info.update(
                AssemblyName=assembly.Name,
                Release='{}.{}.{}.{}'.format(
                    assembly.MajorVersion,
                    assembly.MinorVersion,
                    assembly.BuildNumber,
                    assembly.RevisionNumber
                )
            )

        try:
            entry = self._vint(pe, header.head.EntryPointToken + pe.optional_header.imagebase)
            info.update(EntryPoint=entry)
        except AttributeError:
            pass

        if len(tables.Module) == 1:
            module = tables.Module[0]
            info.update(ModuleName=module.Name)

        return info

    def parse_debug(self, pe: lief.PE.Binary, data=None):
        result = []
        if not pe.has_debug:
            return None
        for entry in pe.debug:
            if entry.type != lief.PE.Debug.TYPES.CODEVIEW:
                continue
            try:
                entry: lief.PE.CodeViewPDB
                result.append(dict(
                    PdbPath=_STRING(entry.filename),
                    PdbGUID=entry.guid,
                    PdbAge=entry.age,
                ))
            except AttributeError:
                continue
        if len(result) == 1:
            result = result[0]
        return result

    def process(self, data):
        result = {}

        pe = lief.load_pe(
            data,
            parse_exports=self.args.exports,
            parse_imports=self.args.imports,
            parse_rsrc=self.args.version,
            parse_reloc=False,
            parse_signature=self.args.timestamps or self.args.signatures,
        )

        if pe is None:
            raise ValueError('Input not recognized as a PE file.')

        for switch, resolver, name in [
            (self.args.debug,   self.parse_debug,    'Debug'),    # noqa
            (self.args.dotnet,  self.parse_dotnet,   'DotNet'),   # noqa
            (self.args.header,  self.parse_header,   'Header'),   # noqa
            (self.args.version, self.parse_version,  'Version'),  # noqa
            (self.args.imports, self.parse_imports,  'Imports'),  # noqa
            (self.args.exports, self.parse_exports,  'Exports'),  # noqa
        ]:
            if not switch:
                continue
            self.log_debug(F'parsing: {name}')
            args = pe, data
            if switch > 1:
                args = *args, True
            try:
                info = resolver(*args)
            except Exception as E:
                self.log_info(F'failed to obtain {name}: {E!s}')
                continue
            if info:
                result[name] = info

        signature = {}

        if self.args.timestamps or self.args.signatures:
            with suppress(Exception):
                from refinery.units.formats.pe.pesig import pesig
                signature = self.parse_signature(next(data | pesig))

        if signature:
            try:
                verification = pe.verify_signature()
            except Exception:
                pass
            else:
                from lief.PE import Signature
                if verification == Signature.VERIFICATION_FLAGS.OK:
                    signature['Match'] = True
                else:
                    signature['Flags'] = [
                        vf.name for vf in Signature.VERIFICATION_FLAGS if vf & verification == vf]
                    signature['Match'] = False

        if self.args.timestamps:
            ts = self.parse_time_stamps(pe, self.args.timeraw, self.args.timestamps > 1)
            with suppress(KeyError):
                ts.update(Signed=signature['Timestamp'])
            result.update(TimeStamp=ts)

        if signature and self.args.signatures:
            result['Signature'] = signature

        if result:
            yield from ppjson(tabular=self.args.tabular)._pretty_output(result, indent=4, ensure_ascii=False)

    _CHARSET = {
        0x0000: '7-bit ASCII',
        0x03A4: 'Japan (Shift ? JIS X-0208)',
        0x03B5: 'Korea (Shift ? KSC 5601)',
        0x03B6: 'Taiwan (Big5)',
        0x04B0: 'Unicode',
        0x04E2: 'Latin-2 (Eastern European)',
        0x04E3: 'Cyrillic',
        0x04E4: 'Multilingual',
        0x04E5: 'Greek',
        0x04E6: 'Turkish',
        0x04E7: 'Hebrew',
        0x04E8: 'Arabic',
    }

    _WINVER = {
        3: {
            0x00: 'Windows NT 3',
            0x0A: 'Windows NT 3.1',
            0x32: 'Windows NT 3.5',
            0x33: 'Windows NT 3.51',
        },
        4: {
            0x00: 'Windows 95',
            0x0A: 'Windows 98',
        },
        5: {
            0x00: 'Windows 2000',
            0x5A: 'Windows Me',
            0x01: 'Windows XP',
            0x02: 'Windows Server 2003',
        },
        6: {
            0x00: 'Windows Vista',
            0x01: 'Windows 7',
            0x02: 'Windows 8',
            0x03: 'Windows 8.1',
        },
        10: {
            0x00: 'Windows 10',
        }
    }

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Static methods

def parse_signature(data)

Extracts a JSON-serializable and human-readable dictionary with information about time stamp and code signing certificates that are attached to the input PE file.

Expand source code Browse git
@classmethod
def parse_signature(cls, data: bytearray) -> dict:
    """
    Extracts a JSON-serializable and human-readable dictionary with information about
    time stamp and code signing certificates that are attached to the input PE file.
    """
    from refinery.units.formats.pkcs7 import pkcs7

    try:
        signature = data | pkcs7 | json.loads
    except Exception as E:
        raise ValueError(F'PKCS7 parser failed with error: {E!s}')

    info = {}

    def _value(doc: dict, require_type=None):
        if require_type is not None:
            if doc.get('type', None) != require_type:
                raise LookupError
        value = doc.get('value', None)
        value = [value] if value else doc.get('values', [])
        if not value:
            raise LookupError
        return value[0]

    def find_timestamps(entry) -> dict:
        if isinstance(entry, dict):
            try:
                return {'Timestamp': _value(entry, 'signing_time')}
            except LookupError:
                pass
            for value in entry.values():
                result = find_timestamps(value)
                if result is None:
                    continue
                with suppress(KeyError):
                    result.setdefault('TimestampIssuer', entry['sid']['issuer']['common_name'])
                return result
        elif isinstance(entry, list):
            for value in entry:
                result = find_timestamps(value)
                if result is None:
                    continue
                return result

    timestamp_info = find_timestamps(signature)
    if timestamp_info is not None:
        info.update(timestamp_info)

    try:
        certificates = signature['content']['certificates']
    except KeyError:
        return info

    if len(certificates) == 1:
        main_certificate = certificates[0]
    else:
        certificates_with_extended_use = []
        main_certificate = None
        for certificate in certificates:
            with suppress(Exception):
                crt = certificate['tbs_certificate']
                ext = [e for e in crt['extensions'] if e['extn_id'] == 'extended_key_usage' and e['extn_value'] != ['time_stamping']]
                key = [e for e in crt['extensions'] if e['extn_id'] == 'key_usage']
                if ext:
                    certificates_with_extended_use.append(certificate)
                if any('key_cert_sign' in e['extn_value'] for e in key):
                    continue
                if any('code_signing' in e['extn_value'] for e in ext):
                    main_certificate = certificate
                    break
        if main_certificate is None and len(certificates_with_extended_use) == 1:
            main_certificate = certificates_with_extended_use[0]
    if main_certificate:
        crt = main_certificate['tbs_certificate']
        serial = crt['serial_number']
        if isinstance(serial, int):
            serial = F'{serial:x}'
        if len(serial) % 2 != 0:
            serial = F'0{serial}'
        assert bytes.fromhex(serial) in data
        subject = crt['subject']
        location = [subject.get(t, '') for t in ('locality_name', 'state_or_province_name', 'country_name')]
        info.update(Subject=subject['common_name'])
        if any(location):
            info.update(SubjectLocation=', '.join(filter(None, location)))
        for signer_info in signature['content'].get('signer_infos', ()):
            try:
                if signer_info['sid']['serial_number'] != crt['serial_number']:
                    continue
                for attr in signer_info['signed_attrs']:
                    if attr['type'] == 'authenticode_info':
                        auth = _value(attr)
                        info.update(ProgramName=auth['programName'])
                        info.update(MoreInfo=auth['moreInfo'])
            except KeyError:
                continue
        try:
            valid_from = crt['validity']['not_before']
            valid_until = crt['validity']['not_after']
        except KeyError:
            pass
        else:
            info.update(ValidFrom=valid_from, ValidUntil=valid_until)
        info.update(
            Issuer=crt['issuer']['common_name'], Fingerprint=main_certificate['fingerprint'], Serial=serial)
        return info
    return info

Methods

def parse_version(self, pe, data=None)

Extracts a JSON-serializable and human-readable dictionary with information about the version resource of an input PE file, if available.

Expand source code Browse git
def parse_version(self, pe: lief.PE.Binary, data=None) -> dict:
    """
    Extracts a JSON-serializable and human-readable dictionary with information about
    the version resource of an input PE file, if available.
    """
    version_info = {}
    if not pe.resources_manager.has_version:
        return None
    version = pe.resources_manager.version

    if info := version.string_file_info:
        for lng in info.langcode_items:
            version_info.update({
                k.replace(' ', ''): _STRING(v) for k, v in lng.items.items()
            })
            version_info.update(
                CodePage=lng.code_page.name,
                LangID=self._vint(pe, lng.lang << 0x10 | lng.sublang),
                Language=LCID.get(lng.lang, 'Language Neutral'),
                Charset=self._CHARSET.get(lng.sublang, 'Unknown Charset'),
            )

    def _to_version_string(hi: int, lo: int):
        a = hi >> 0x10
        b = hi & 0xFFFF
        c = lo >> 0x10
        d = lo & 0xFFFF
        return F'{a}.{b}.{c}.{d}'

    # TODO: Missing: Version.CompanyName
    # TODO: Missing: Version.FileDescription
    # TODO: Missing: Version.LegalCopyright
    # TODO: Missing: Version.ProductName

    if info := version.fixed_file_info:
        version_info.update(
            OSName=info.file_os.name,
            FileType=info.file_type.name,
        )
        if (s := info.file_subtype).value:
            version_info.update(FileSubType=s)
        if t := info.file_date_MS << 32 | info.file_date_LS:
            version_info.update(Timestamp=_FILETIME(t))
        version_info.update(
            ProductVersion=_to_version_string(info.product_version_MS, info.product_version_LS),
            FileVersion=_to_version_string(info.file_version_MS, info.file_version_LS),
        )

    if info := version.var_file_info:
        ...

    return version_info or None
def parse_exports(self, pe, data=None, include_addresses=False)
Expand source code Browse git
def parse_exports(self, pe: lief.PE.Binary, data=None, include_addresses=False) -> list:
    base = pe.optional_header.imagebase
    info = []
    if not pe.has_exports:
        return None
    for k, exp in enumerate(pe.get_export().entries):
        name = exp.demangled_name
        if not name:
            name = exp.name
        if not name:
            name = F'@{k}'
        if not isinstance(name, str):
            name = name.decode('latin1')
        item = {
            'Name': name, 'Address': self._vint(pe, exp.address + base)
        } if include_addresses else name
        info.append(item)
    return info
def parse_imports(self, pe, data=None, include_addresses=False)
Expand source code Browse git
def parse_imports(self, pe: lief.PE.Binary, data=None, include_addresses=False) -> list:
    info = {}
    for idd in itertools.chain(pe.imports, pe.delay_imports):
        dll = _STRING(idd.name)
        if dll.lower().endswith('.dll'):
            dll = dll[:~3]
        imports: list[str] = info.setdefault(dll, [])
        for imp in idd.entries:
            name = _STRING(imp.name) or F'@{imp.ordinal}'
            imports.append(dict(
                Name=name, Address=self._vint(pe, imp.value)
            ) if include_addresses else name)
    return info
def parse_header(self, pe, data=None)
Expand source code Browse git
def parse_header(self, pe: lief.PE.Binary, data=None) -> dict:
    major = pe.optional_header.major_operating_system_version
    minor = pe.optional_header.minor_operating_system_version
    version = self._WINVER.get(major, {0: 'Unknown'})

    try:
        MinimumOS = version[minor]
    except LookupError:
        MinimumOS = version[0]
    header_information = {
        'Machine': pe.header.machine.name,
        'Subsystem': pe.optional_header.subsystem.name,
        'MinimumOS': MinimumOS,
    }
    if pe.has_exports:
        export_name = _STRING(pe.get_export().name)
        if export_name.isprintable():
            header_information['ExportName'] = export_name

    if pe.has_rich_header:
        rich = []
        if self.args.tabular:
            cw = max(len(F'{entry.count:d}') for entry in pe.rich_header.entries)
        for entry in pe.rich_header.entries:
            idv = entry.build_id | (entry.id << 0x10)
            count = entry.count
            info = get_rich_info(idv)
            if not info:
                continue
            pid = info.pid.upper()
            if self.args.tabular:
                short_pid = get_rich_short_pid(pid)
                rich.append(F'[{idv:08x}] {count:>0{cw}d} {short_pid!s} {info.ver}')
            else:
                rich.append({
                    'Counter': count,
                    'Encoded': F'{idv:08x}',
                    'Library': pid,
                    'Product': info.ver,
                })
        header_information['RICH'] = rich

    characteristics = self._pe_characteristics(pe)
    for typespec, flag in {
        'EXE': 'IMAGE_FILE_EXECUTABLE_IMAGE',
        'DLL': 'IMAGE_FILE_DLL',
        'SYS': 'IMAGE_FILE_SYSTEM'
    }.items():
        if flag in characteristics:
            header_information['Type'] = typespec

    base = pe.optional_header.imagebase
    header_information['ImageBase'] = self._vint(pe, base)
    header_information['ImageSize'] = self._vint(pe, pe.optional_header.sizeof_image)
    header_information['ComputedSize'] = get_pe_size(pe)
    header_information['Bits'] = 4 * self._pe_address_width(pe, 16)
    header_information['EntryPoint'] = self._vint(pe, pe.optional_header.addressof_entrypoint + base)
    return header_information
def parse_time_stamps(self, pe, raw_time_stamps, more_detail)

Extracts time stamps from the PE header (link time), as well as from the imports, exports, debug, and resource directory. The resource time stamp is also parsed as a DOS time stamp and returned as the "Delphi" time stamp.

Expand source code Browse git
def parse_time_stamps(self, pe: lief.PE.Binary, raw_time_stamps: bool, more_detail: bool) -> dict:
    """
    Extracts time stamps from the PE header (link time), as well as from the imports,
    exports, debug, and resource directory. The resource time stamp is also parsed as
    a DOS time stamp and returned as the "Delphi" time stamp.
    """
    def _id(x): return x
    dt = _id if raw_time_stamps else date_from_timestamp
    info = {}

    with suppress(AttributeError):
        info.update(Linker=dt(pe.header.time_date_stamps))

    import_timestamps = {}
    for entry in pe.imports:
        ts = entry.timedatestamp
        if ts == 0 or ts == 0xFFFFFFFF:
            continue
        import_timestamps[_STRING(entry.name, True)] = dt(ts)

    symbol_timestamps = {}
    for entry in pe.delay_imports:
        ts = entry.timestamp
        if ts == 0 or ts == 0xFFFFFFFF:
            continue
        symbol_timestamps[_STRING(entry.name, True)] = dt(ts)

    for key, impts in [
        ('Import', import_timestamps),
        ('Symbol', symbol_timestamps),
    ]:
        if not impts:
            continue
        if not more_detail:
            dmin = min(impts.values())
            dmax = max(impts.values())
            small_delta = 2 * 60 * 60
            if not raw_time_stamps:
                small_delta = timedelta(seconds=small_delta)
            if dmax - dmin < small_delta:
                impts = dmin
        info[key] = impts

    if pe.has_exports and (ts := pe.get_export().timestamp):
        info.update(Export=dt(ts))

    if pe.has_resources and pe.resources.is_directory:
        rsrc: lief.PE.ResourceDirectory = pe.resources
        if res_timestamp := rsrc.time_date_stamp:
            with suppress(ValueError):
                from refinery.units.misc.datefix import datefix
                dos = datefix.dostime(res_timestamp)
                info.update(Delphi=dos)
                info.update(RsrcTS=dt(res_timestamp))

    def norm(value):
        if isinstance(value, list):
            return [norm(v) for v in value]
        if isinstance(value, dict):
            return {k: norm(v) for k, v in value.items()}
        if isinstance(value, int):
            return value
        return str(value)

    return {key: norm(value) for key, value in info.items()}
def parse_dotnet(self, pe, data)

Extracts a JSON-serializable and human-readable dictionary with information about the .NET metadata of an input PE file.

Expand source code Browse git
def parse_dotnet(self, pe: lief.PE.Binary, data):
    """
    Extracts a JSON-serializable and human-readable dictionary with information about
    the .NET metadata of an input PE file.
    """
    header = DotNetHeader(data, pe)
    tables = header.meta.Streams.Tables
    info = dict(
        RuntimeVersion=F'{header.head.MajorRuntimeVersion}.{header.head.MinorRuntimeVersion}',
        Version=F'{header.meta.MajorVersion}.{header.meta.MinorVersion}',
        VersionString=header.meta.VersionString
    )

    info['Flags'] = [name for name, check in header.head.KnownFlags.items() if check]

    if len(tables.Assembly) == 1:
        assembly = tables.Assembly[0]
        info.update(
            AssemblyName=assembly.Name,
            Release='{}.{}.{}.{}'.format(
                assembly.MajorVersion,
                assembly.MinorVersion,
                assembly.BuildNumber,
                assembly.RevisionNumber
            )
        )

    try:
        entry = self._vint(pe, header.head.EntryPointToken + pe.optional_header.imagebase)
        info.update(EntryPoint=entry)
    except AttributeError:
        pass

    if len(tables.Module) == 1:
        module = tables.Module[0]
        info.update(ModuleName=module.Name)

    return info
def parse_debug(self, pe, data=None)
Expand source code Browse git
def parse_debug(self, pe: lief.PE.Binary, data=None):
    result = []
    if not pe.has_debug:
        return None
    for entry in pe.debug:
        if entry.type != lief.PE.Debug.TYPES.CODEVIEW:
            continue
        try:
            entry: lief.PE.CodeViewPDB
            result.append(dict(
                PdbPath=_STRING(entry.filename),
                PdbGUID=entry.guid,
                PdbAge=entry.age,
            ))
        except AttributeError:
            continue
    if len(result) == 1:
        result = result[0]
    return result

Inherited members