Module refinery.units.formats.archive.xtpyi

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING, NamedTuple

import marshal
import enum
import io
import re
import uuid
import zlib
import os
import os.path
import contextlib
import dataclasses
import sys

from importlib.util import MAGIC_NUMBER

from refinery.units.formats.archive import Arg, ArchiveUnit
from refinery.units.pattern.carve import carve
from refinery.lib.structures import MemoryFile, StreamDetour, Struct, StructReader
from refinery.lib.tools import NoLogging, normalize_word_separators

from Cryptodome.Cipher import AES


if TYPE_CHECKING:
    from types import CodeType
    from typing import Callable, Dict, List, Tuple, Optional, Set, Union, Generator, Iterable
    from xdis import Instruction
    from refinery.lib.types import ByteStr


class Unmarshal(enum.IntEnum):
    No = 0
    Yes = 1
    YesAndDecompile = 2


def version2tuple(version: str):
    return tuple(int(k, 10) for k in re.fullmatch(R'^(\d+\.\d+(?:\.\d+)?)(.*)$', version).group(1).split('.'))


def decompress_peek(buffer, size=512) -> Optional[bytes]:
    try:
        return zlib.decompressobj().decompress(buffer[:size])
    except zlib.error:
        return None


class Code(NamedTuple):
    version: Tuple[int]
    timestamp: int
    magic: int
    container: CodeType
    is_pypi: bool
    code_objects: dict


def extract_code_from_buffer(buffer: ByteStr, file_name: Optional[str] = None) -> Generator[Code, None, None]:
    code_objects = {}
    file_name = file_name or '<unknown>'
    load = xtpyi._xdis.load.load_module_from_file_object
    with NoLogging(NoLogging.Mode.STD_ERR):
        version, timestamp, magic_int, codes, is_pypy, _, _ = load(MemoryFile(buffer), file_name, code_objects)
    if not isinstance(codes, list):
        codes = [codes]
    for code in codes:
        yield Code(version, timestamp, magic_int, code, is_pypy, code_objects)


def disassemble_code(code: CodeType, version=None) -> Iterable[Instruction]:
    dis = xtpyi._xdis
    opc = None
    if version is not None:
        if isinstance(version, float):
            version = str(version)
        if not isinstance(version, str):
            version = dis.version_info.version_tuple_to_str(version)
        with contextlib.suppress(KeyError):
            opc = dis.op_imports.op_imports[version]
    return dis.std.Bytecode(code, opc=opc)


def decompile_buffer(buffer: Union[Code, ByteStr], file_name: Optional[str] = None) -> ByteStr:
    main: xtpyi = xtpyi
    errors = ''
    python = ''
    codes = [buffer]

    if not isinstance(buffer, Code):
        codes = list(extract_code_from_buffer(buffer, file_name))

    def _engines():
        nonlocal errors
        try:
            dc = main._decompyle3
        except ImportError:
            errors += '# The decompiler decompyle3 is not installed.\n'
        else:
            yield 'decompyle3', dc
        try:
            dc = main._uncompyle6
        except ImportError:
            errors += '# The decompiler decompyle3 is not installed.\n'
        else:
            yield 'uncompyle6', dc

    engines = dict(_engines())

    if not engines:
        errors += '# (all missing, install one of the above to enable decompilation)'

    for code in codes:
        for name, engine in engines.items():
            with io.StringIO(newline='') as output, NoLogging(NoLogging.Mode.ALL):
                try:
                    engine.main.decompile(
                        co=code.container,
                        bytecode_version=code.version,
                        out=output,
                        timestamp=code.timestamp,
                        code_objects=code.code_objects,
                        is_pypy=code.is_pypi,
                        magic_int=code.magic,
                    )
                except Exception as E:
                    errors += '\n'.join(F'# {line}' for line in (
                        F'Error while decompiling with {name}:', *str(E).splitlines(True)))
                    errors += '\n'
                else:
                    python = output.getvalue()
                    break
    if python:
        # removes leading comments
        python = python.splitlines(True)
        python.reverse()
        while python[-1].strip().startswith('#'):
            python.pop()
        python.reverse()
        python = ''.join(python)
        return python.encode(main.codec)
    if not isinstance(buffer, Code):
        embedded = bytes(buffer | carve('printable', single=True))
        if len(code) - len(embedded) < 0x20:
            return embedded
    disassembly = MemoryFile()
    with io.TextIOWrapper(disassembly, main.codec, newline='\n') as output:
        output.write(errors)
        output.write('# Generating Disassembly:\n\n')
        for code in codes:
            instructions = list(disassemble_code(code.container, code.version))
            width_offset = max(len(str(i.offset)) for i in instructions)
            for i in instructions:
                opname = normalize_word_separators(i.opname, '.').lower()
                offset = F'{i.offset:0{width_offset}d}'
                output.write(F'# {offset:>5} {opname:<25} {i.argrepr}\n')
        output.write('\n')
    return disassembly.getbuffer()


class PiType(bytes, enum.Enum):
    BINARY          = B'b'  # noqa / binary
    DEPENDENCY      = B'd'  # noqa / runtime option
    PYZ             = B'z'  # noqa / zlib (pyz) - frozen Python code
    PACKAGE         = B'M'  # noqa / Python package (__init__.py)
    MODULE          = B'm'  # noqa / Python module
    SOURCE          = B's'  # noqa / Python script (v3)
    DATA            = B'x'  # noqa / data
    RUNTIME_OPTION  = B'o'  # noqa / runtime option
    SPLASH          = B'l'  # noqa / splash resources
    UNKNOWN         = B'uk' # noqa
    DECOMPILED      = B'dc' # noqa
    USERCODE        = B'uc' # noqa
    ENCRYPTED       = B'ec' # noqa


class PzType(enum.IntEnum):
    MODULE = 0
    PKG = 1
    DATA = 2


@dataclasses.dataclass
class PiMeta:
    type: PiType
    name: str
    data: Union[Callable[[], ByteStr], ByteStr]

    def unpack(self) -> ByteStr:
        if callable(self.data):
            self.data = self.data()
        return self.data


def make_decompiled_item(name: str, data: ByteStr, *magics) -> PiMeta:

    def extract(data=data, magics=magics):
        error = None
        if any(data[:4] == m[:4] for m in magics):
            return decompile_buffer(data, name)
        for magic in magics:
            try:
                return decompile_buffer(magic + data, name)
            except Exception as exception:
                error = exception
        return '\n'.join(F'# {line}'
            for line in str(error).splitlines(True)).encode(xtpyi.codec)

    return PiMeta(PiType.DECOMPILED, F'{name}.py', extract)


class PYZ(Struct):

    MagicSignature = B'PYZ\0'

    def __init__(self, reader: StructReader, version: str):
        reader.bigendian = True
        self.base = reader.tell()
        signature = reader.read(4)
        if signature != self.MagicSignature:
            raise ValueError('invalid magic')
        magic = bytes(reader.read(4))
        with contextlib.suppress(KeyError, AttributeError):
            xdis = xtpyi._xdis
            if isinstance(xdis, property):
                xdis = xdis.fget()
            version = xdis.magics.versions[magic]
        vtuple = version2tuple(version)
        padding_size = 4
        if vtuple >= (3, 3):
            padding_size += 4
        if vtuple >= (3, 7):
            padding_size += 4
        self.version = version
        self.magic = magic + padding_size * b'\0'
        self.toc_offset = reader.i32()
        self.reader = reader
        self.entries: List[PiMeta] = []

    def unpack(self, decompile: bool, key: Optional[bytes] = None) -> bool:
        with StreamDetour(self.reader, self.base + self.toc_offset):
            toc_data = self.reader.read()
        try:
            toc = marshal.loads(toc_data)
        except Exception as error:
            if MAGIC_NUMBER != self.magic[:4]:
                xdis = xtpyi._xdis
                if isinstance(xdis, property):
                    xdis = xdis.fget()
                _ord = xdis.marsh.Ord
                xdis.marsh.Ord = ord  # monkey-patch workaround for bug in xdis
                try:
                    toc = xdis.marsh.load(
                        MemoryFile(self.data), self.version)
                except Exception:
                    pass
                else:
                    error = None
                finally:
                    xdis.marsh.Ord = _ord
            if error is not None:
                raise error

        if isinstance(toc, list):
            try:
                toc = dict(toc)
            except Exception as error:
                self.entries = []
                self.error = error
                return

        failures = 0
        attempts = len(toc)

        for name, (pzt, offset, length) in toc.items():
            try:
                name: str
                name = name.decode('utf-8')
            except AttributeError:
                pass
            try:
                pzt = PzType(pzt)
            except Exception:
                pzt = PzType.DATA

            name = name.replace('.', '/')
            if pzt is PzType.PKG:
                name = F'{name}/__init__'

            with StreamDetour(self.reader, self.base + offset):
                data = self.reader.read(length)

            if key:
                def decompressed(data=data):
                    cipher = AES.new(key, AES.MODE_CFB, bytes(data[:0x10]))
                    return zlib.decompress(cipher.decrypt(data[0x10:]))
            elif decompress_peek(data):
                def decompressed(data=data):
                    return zlib.decompress(data)
            else:
                failures += 1
                continue

            if decompile and pzt in (PzType.MODULE, PzType.PKG):
                def decompiled(data=data, name=name, magic=self.magic):
                    data = decompressed(data)
                    if data[:4] != magic[:4]:
                        data = magic + data
                    return decompile_buffer(data, name)
                self.entries.append(PiMeta(PiType.DECOMPILED, F'{name}.py', decompiled))
                name = F'{name}.pyc'
                type = PiType.SOURCE
            else:
                type = PiType.DATA

            self.entries.append(PiMeta(type, name, decompressed))

        if key:
            if failures >= 6:
                xtpyi.logger.warning(F'pyz decompression failed for {failures - 5} additional items')
            return True
        elif failures > 0.7 * attempts:
            self.entries.clear()
            return False
        else:
            return True


class PiTOCEntry(Struct):

    def __init__(self, reader: StructReader):
        reader.bigendian = True
        entry_start_offset = reader.tell()
        self.size_of_entry = reader.i32()
        self.offset = reader.i32()
        self.size_of_compressed_data = reader.i32()
        self.size_od_uncompressed_data = reader.i32()
        self.is_compressed = bool(reader.read_byte())
        entry_type = bytes(reader.read(1))
        name_length = self.size_of_entry - reader.tell() + entry_start_offset
        if name_length > 0x1000:
            raise RuntimeError(F'Refusing to process TOC entry with name of size {name_length}.')
        name, *_ = bytes(reader.read(name_length)).partition(B'\0')
        try:
            name = name.decode('utf8', 'backslashreplace')
        except Exception:
            name = None
        if not all(part.isprintable() for part in re.split('\\s*', name)):
            raise RuntimeError('Refusing to process TOC entry with non-printable name.')
        name = name or str(uuid.uuid4())
        if entry_type == B'Z':
            entry_type = B'z'
        try:
            self.type = PiType(entry_type)
        except ValueError:
            xtpyi.log_warn(F'unknown type {entry_type!r} in field {name}')
            self.type = PiType.UNKNOWN
        self.name = name

    def __hash__(self):
        return hash(self.name)


class PyInstallerArchiveEpilogue(Struct):

    MagicSignature = bytes.fromhex('4D45490C0B0A0B0E')

    def _read_libname(self, reader: StructReader) -> Optional[str]:
        position = reader.tell()
        try:
            libname, t, rest = reader.read_bytes(64).partition(B'\0')
        except EOFError:
            reader.seekset(position)
            return None
        try:
            libname = libname.decode('utf8')
        except Exception:
            reader.seekset(position)
            return None
        if not t or any(rest) or len(rest) < 10 or not re.fullmatch(R'[\s!-~]+', libname):
            reader.seekset(position)
            return None
        return libname

    def __init__(self, reader: StructReader, offset: int, unmarshal: Unmarshal = Unmarshal.No, decompile: bool = False):
        self.decompile = decompile
        reader.bigendian = True
        reader.seekset(offset)
        self.reader = reader
        signature = reader.read_bytes(8)
        if signature != self.MagicSignature:
            raise ValueError(
                F'offset 0x{offset:X} has invalid signature {signature.hex().upper()}; '
                F'should be {self.MagicSignature.hex().upper()}')
        self.size = reader.i32()
        toc_offset = reader.i32()
        toc_length = reader.i32()
        self.py_version = '.'.join(str(reader.u32()))
        self.py_libname = self._read_libname(reader)
        self.offset = reader.tell() - self.size

        self.toc: Dict[str, PiTOCEntry] = {}
        toc_end = self.offset + toc_offset + toc_length
        reader.seekset(self.offset + toc_offset)
        while reader.tell() < toc_end:
            try:
                entry = PiTOCEntry(reader)
            except EOFError:
                xtpyi.logger.warning('end of file while reading TOC')
                break
            except Exception as error:
                xtpyi.logger.warning(F'unexpected error while reading TOC: {error!s}')
                break
            if entry.name in self.toc:
                raise KeyError(F'duplicate name {entry.name}')
            self.toc[entry.name] = entry

        self.files: Dict[str, PiMeta] = {}
        no_pyz_found = True
        pyz_entries: Dict[str, PYZ] = {}

        for entry in list(self.toc.values()):
            if entry.type is not PiType.PYZ:
                continue
            no_pyz_found = False
            name, xt = os.path.splitext(entry.name)
            name_pyz = F'{name}.pyz'
            if name == entry.name:
                del self.toc[name]
                self.toc[name_pyz] = entry
                entry.name = name_pyz
            reader.seekset(self.offset + entry.offset)
            if entry.is_compressed:
                data = self.extract(entry.name).unpack()
            else:
                data = reader
            pyz_entries[name] = PYZ(data, self.py_version)

        magics = {pyz.magic for pyz in pyz_entries.values()}

        if not magics:
            if not no_pyz_found:
                xtpyi.logger.warning(
                    'no magic signature could be recovered from embedded pyzip archives; this is '
                    'unsual and means that there is no way to guess the missing magic for source '
                    'file entries and it will likely not be possible to decompile them.')
            return
        elif len(magics) > 1:
            xtpyi.logger.warning('more than one magic signature was recovered; this is unusual.')

        magics = list(magics)
        keys: Set[bytes] = set()

        for entry in self.toc.values():
            extracted = self.extract(entry.name)
            if entry.type not in (PiType.SOURCE, PiType.MODULE):
                self.files[entry.name] = extracted
                continue
            data = extracted.unpack()
            name, _ = os.path.splitext(extracted.name)
            del self.files[extracted.name]
            extracted.name = F'{name}.pyc'
            self.files[extracted.name] = extracted
            is_crypto_key = name.endswith('crypto_key')

            if len(magics) == 1 and data[:4] != magics[0]:
                extracted.data = magics[0] + data

            if is_crypto_key or self.decompile:
                decompiled = make_decompiled_item(name, data, *magics)
                if entry.type is PiType.SOURCE:
                    decompiled.type = PiType.USERCODE
                self.files[decompiled.name] = decompiled

            if is_crypto_key:
                for key in decompiled.unpack() | carve('string', decode=True):
                    if len(key) != 0x10:
                        continue
                    xtpyi.logger.info(F'found key: {key.decode(xtpyi.codec)}')
                    keys.add(key)

        if unmarshal is Unmarshal.No:
            return

        if not keys:
            key = None
        else:
            key = next(iter(keys))

        for name, pyz in pyz_entries.items():
            pyz.unpack(unmarshal is Unmarshal.YesAndDecompile, key)
            for unpacked in pyz.entries:
                unpacked.name = path = F'{name}/{unpacked.name}'
                if path in self.files:
                    raise ValueError(F'duplicate file name: {path}')
                self.files[path] = unpacked

    def extract(self, name: str) -> PiMeta:
        try:
            return self.files[name]
        except KeyError:
            pass
        entry = self.toc[name]
        with StreamDetour(self.reader, self.offset + entry.offset):
            data = self.reader.read(entry.size_of_compressed_data)
        if entry.is_compressed:
            def extracted(d=data): return zlib.decompress(d)
        else:
            extracted = data
        result = PiMeta(entry.type, name, extracted)
        self.files[name] = result
        return result


class xtpyi(ArchiveUnit):
    """
    Extracts and decompiles files from a Python Installer (aka PyInstaller) archive.
    """
    def __init__(
        self, *paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False,
        path=b'path', date=b'date',
        decompile: Arg.Switch('-c', help='Attempt to decompile PYC files.'),
        user_code: Arg.Switch('-u', group='FILTER', help=(
            'Extract only source code files from the root of the archive. These usually implement '
            'the actual domain logic. This implies the --decompile option.')) = False,
        unmarshal: Arg('-y', action='count', group='FILTER', help=(
            '(DANGEROUS) Unmarshal embedded PYZ archives. Warning: Maliciously crafted packages can '
            'potentially exploit this to execute code. It is advised to only use this option inside '
            'an isolated environment. Specify twice to decompile unmarshalled Python bytecode.'
        )) = 0
    ):
        super().__init__(
            *paths,
            list=list,
            join_path=join_path,
            drop_path=drop_path,
            fuzzy=fuzzy,
            exact=exact,
            regex=regex,
            path=path,
            date=date,
            decompile=decompile,
            unmarshal=unmarshal,
            user_code=user_code,
        )

    @ArchiveUnit.Requires('xdis', 'arc', 'python', 'extended')
    def _xdis():
        import xdis.load
        import xdis.magics
        import xdis.marsh
        import xdis.op_imports
        import xdis.version_info
        import xdis
        A, B, C, *_ = sys.version_info
        version = F'{A}.{B}.{C}'
        canonic = F'{A}.{B}'
        if version not in xdis.magics.canonic_python_version:
            class opcode_dummy:
                version = float(canonic)
                def __init__(self, name): self.name = name
                def __getattr__(self, key): return opcode_dummy(F'{self.name}.{key}')
                def __call__(self, *a, **k): return None
                def __str__(self): return self.name
                def __repr__(self): return self.name
            import importlib
            magic = importlib.util.MAGIC_NUMBER
            xdis.magics.add_magic_from_int(xdis.magics.magic2int(magic), version)
            xdis.magics.by_magic.setdefault(magic, set()).add(version)
            xdis.magics.by_version[version] = magic
            xdis.magics.magics[canonic] = magic
            xdis.magics.canonic_python_version[canonic] = canonic
            xdis.magics.add_canonic_versions(version, canonic)
            xdis.op_imports.op_imports.setdefault(canonic, opcode_dummy('dummy'))
        del A, B, C, version
        import xdis.std
        return xdis

    @ArchiveUnit.Requires('uncompyle6', 'arc', 'python', 'extended')
    def _uncompyle6():
        import uncompyle6
        import uncompyle6.main
        return uncompyle6

    @ArchiveUnit.Requires('decompyle3', 'arc', 'python')
    def _decompyle3():
        import decompyle3
        import decompyle3.main
        return decompyle3

    def unpack(self, data):
        view = memoryview(data)
        positions = [m.start() for m in re.finditer(re.escape(PyInstallerArchiveEpilogue.MagicSignature), view)]
        mode = Unmarshal(min(2, int(self.args.unmarshal)))
        self.log_debug(F'unmarshal mode: {mode.name}')
        if not positions:
            raise LookupError('unable to find PyInstaller signature')
        if len(positions) > 2:
            # first position is expected to be the sentinel value in the unpacker stub
            width = max(len(F'{p:X}') for p in positions)
            for position in positions:
                self.log_info(F'magic signature found at offset 0x{position:0{width}X}')
            self.log_warn(F'found {len(positions) - 1} potential PyInstaller epilogue markers; using last one.')
        decompile = self.args.decompile
        uc_target = PiType.USERCODE if decompile else PiType.SOURCE
        archive = PyInstallerArchiveEpilogue(view, positions[-1], mode, decompile)
        for name, file in archive.files.items():
            if self.args.user_code:
                if file.type != uc_target:
                    continue
                if name.startswith('pyiboot'):
                    continue
            yield self._pack(name, None, file.data, type=file.type.name)

    @classmethod
    def handles(cls, data: ByteStr) -> Optional[bool]:
        return PyInstallerArchiveEpilogue.MagicSignature in data

Functions

def version2tuple(version)
Expand source code Browse git
def version2tuple(version: str):
    return tuple(int(k, 10) for k in re.fullmatch(R'^(\d+\.\d+(?:\.\d+)?)(.*)$', version).group(1).split('.'))
def decompress_peek(buffer, size=512)
Expand source code Browse git
def decompress_peek(buffer, size=512) -> Optional[bytes]:
    try:
        return zlib.decompressobj().decompress(buffer[:size])
    except zlib.error:
        return None
def extract_code_from_buffer(buffer, file_name=None)
Expand source code Browse git
def extract_code_from_buffer(buffer: ByteStr, file_name: Optional[str] = None) -> Generator[Code, None, None]:
    code_objects = {}
    file_name = file_name or '<unknown>'
    load = xtpyi._xdis.load.load_module_from_file_object
    with NoLogging(NoLogging.Mode.STD_ERR):
        version, timestamp, magic_int, codes, is_pypy, _, _ = load(MemoryFile(buffer), file_name, code_objects)
    if not isinstance(codes, list):
        codes = [codes]
    for code in codes:
        yield Code(version, timestamp, magic_int, code, is_pypy, code_objects)
def disassemble_code(code, version=None)
Expand source code Browse git
def disassemble_code(code: CodeType, version=None) -> Iterable[Instruction]:
    dis = xtpyi._xdis
    opc = None
    if version is not None:
        if isinstance(version, float):
            version = str(version)
        if not isinstance(version, str):
            version = dis.version_info.version_tuple_to_str(version)
        with contextlib.suppress(KeyError):
            opc = dis.op_imports.op_imports[version]
    return dis.std.Bytecode(code, opc=opc)
def decompile_buffer(buffer, file_name=None)
Expand source code Browse git
def decompile_buffer(buffer: Union[Code, ByteStr], file_name: Optional[str] = None) -> ByteStr:
    main: xtpyi = xtpyi
    errors = ''
    python = ''
    codes = [buffer]

    if not isinstance(buffer, Code):
        codes = list(extract_code_from_buffer(buffer, file_name))

    def _engines():
        nonlocal errors
        try:
            dc = main._decompyle3
        except ImportError:
            errors += '# The decompiler decompyle3 is not installed.\n'
        else:
            yield 'decompyle3', dc
        try:
            dc = main._uncompyle6
        except ImportError:
            errors += '# The decompiler decompyle3 is not installed.\n'
        else:
            yield 'uncompyle6', dc

    engines = dict(_engines())

    if not engines:
        errors += '# (all missing, install one of the above to enable decompilation)'

    for code in codes:
        for name, engine in engines.items():
            with io.StringIO(newline='') as output, NoLogging(NoLogging.Mode.ALL):
                try:
                    engine.main.decompile(
                        co=code.container,
                        bytecode_version=code.version,
                        out=output,
                        timestamp=code.timestamp,
                        code_objects=code.code_objects,
                        is_pypy=code.is_pypi,
                        magic_int=code.magic,
                    )
                except Exception as E:
                    errors += '\n'.join(F'# {line}' for line in (
                        F'Error while decompiling with {name}:', *str(E).splitlines(True)))
                    errors += '\n'
                else:
                    python = output.getvalue()
                    break
    if python:
        # removes leading comments
        python = python.splitlines(True)
        python.reverse()
        while python[-1].strip().startswith('#'):
            python.pop()
        python.reverse()
        python = ''.join(python)
        return python.encode(main.codec)
    if not isinstance(buffer, Code):
        embedded = bytes(buffer | carve('printable', single=True))
        if len(code) - len(embedded) < 0x20:
            return embedded
    disassembly = MemoryFile()
    with io.TextIOWrapper(disassembly, main.codec, newline='\n') as output:
        output.write(errors)
        output.write('# Generating Disassembly:\n\n')
        for code in codes:
            instructions = list(disassemble_code(code.container, code.version))
            width_offset = max(len(str(i.offset)) for i in instructions)
            for i in instructions:
                opname = normalize_word_separators(i.opname, '.').lower()
                offset = F'{i.offset:0{width_offset}d}'
                output.write(F'# {offset:>5} {opname:<25} {i.argrepr}\n')
        output.write('\n')
    return disassembly.getbuffer()
def make_decompiled_item(name, data, *magics)
Expand source code Browse git
def make_decompiled_item(name: str, data: ByteStr, *magics) -> PiMeta:

    def extract(data=data, magics=magics):
        error = None
        if any(data[:4] == m[:4] for m in magics):
            return decompile_buffer(data, name)
        for magic in magics:
            try:
                return decompile_buffer(magic + data, name)
            except Exception as exception:
                error = exception
        return '\n'.join(F'# {line}'
            for line in str(error).splitlines(True)).encode(xtpyi.codec)

    return PiMeta(PiType.DECOMPILED, F'{name}.py', extract)

Classes

class Unmarshal (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code Browse git
class Unmarshal(enum.IntEnum):
    No = 0
    Yes = 1
    YesAndDecompile = 2

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var No
var Yes
var YesAndDecompile
class Code (version, timestamp, magic, container, is_pypi, code_objects)

Code(version, timestamp, magic, container, is_pypi, code_objects)

Expand source code Browse git
class Code(NamedTuple):
    version: Tuple[int]
    timestamp: int
    magic: int
    container: CodeType
    is_pypi: bool
    code_objects: dict

Ancestors

  • builtins.tuple

Instance variables

var version

Alias for field number 0

var timestamp

Alias for field number 1

var magic

Alias for field number 2

var container

Alias for field number 3

var is_pypi

Alias for field number 4

var code_objects

Alias for field number 5

class PiType (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code Browse git
class PiType(bytes, enum.Enum):
    BINARY          = B'b'  # noqa / binary
    DEPENDENCY      = B'd'  # noqa / runtime option
    PYZ             = B'z'  # noqa / zlib (pyz) - frozen Python code
    PACKAGE         = B'M'  # noqa / Python package (__init__.py)
    MODULE          = B'm'  # noqa / Python module
    SOURCE          = B's'  # noqa / Python script (v3)
    DATA            = B'x'  # noqa / data
    RUNTIME_OPTION  = B'o'  # noqa / runtime option
    SPLASH          = B'l'  # noqa / splash resources
    UNKNOWN         = B'uk' # noqa
    DECOMPILED      = B'dc' # noqa
    USERCODE        = B'uc' # noqa
    ENCRYPTED       = B'ec' # noqa

Ancestors

  • builtins.bytes
  • enum.Enum

Class variables

var BINARY
var DEPENDENCY
var PYZ
var PACKAGE
var MODULE
var SOURCE
var DATA
var RUNTIME_OPTION
var SPLASH
var UNKNOWN
var DECOMPILED
var USERCODE
var ENCRYPTED
class PzType (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code Browse git
class PzType(enum.IntEnum):
    MODULE = 0
    PKG = 1
    DATA = 2

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var MODULE
var PKG
var DATA
class PiMeta (type, name, data)

PiMeta(type: 'PiType', name: 'str', data: 'Union[Callable[[], ByteStr], ByteStr]')

Expand source code Browse git
class PiMeta:
    type: PiType
    name: str
    data: Union[Callable[[], ByteStr], ByteStr]

    def unpack(self) -> ByteStr:
        if callable(self.data):
            self.data = self.data()
        return self.data

Class variables

var type
var name
var data

Methods

def unpack(self)
Expand source code Browse git
def unpack(self) -> ByteStr:
    if callable(self.data):
        self.data = self.data()
    return self.data
class PYZ (reader, version)

A class to parse structured data. A Struct class can be instantiated as follows:

foo = Struct(data, bar=29)

The initialization routine of the structure will be called with a single argument reader. If the object data is already a StructReader, then it will be passed as reader. Otherwise, the argument will be wrapped in a StructReader. Additional arguments to the struct are passed through.

Expand source code Browse git
class PYZ(Struct):

    MagicSignature = B'PYZ\0'

    def __init__(self, reader: StructReader, version: str):
        reader.bigendian = True
        self.base = reader.tell()
        signature = reader.read(4)
        if signature != self.MagicSignature:
            raise ValueError('invalid magic')
        magic = bytes(reader.read(4))
        with contextlib.suppress(KeyError, AttributeError):
            xdis = xtpyi._xdis
            if isinstance(xdis, property):
                xdis = xdis.fget()
            version = xdis.magics.versions[magic]
        vtuple = version2tuple(version)
        padding_size = 4
        if vtuple >= (3, 3):
            padding_size += 4
        if vtuple >= (3, 7):
            padding_size += 4
        self.version = version
        self.magic = magic + padding_size * b'\0'
        self.toc_offset = reader.i32()
        self.reader = reader
        self.entries: List[PiMeta] = []

    def unpack(self, decompile: bool, key: Optional[bytes] = None) -> bool:
        with StreamDetour(self.reader, self.base + self.toc_offset):
            toc_data = self.reader.read()
        try:
            toc = marshal.loads(toc_data)
        except Exception as error:
            if MAGIC_NUMBER != self.magic[:4]:
                xdis = xtpyi._xdis
                if isinstance(xdis, property):
                    xdis = xdis.fget()
                _ord = xdis.marsh.Ord
                xdis.marsh.Ord = ord  # monkey-patch workaround for bug in xdis
                try:
                    toc = xdis.marsh.load(
                        MemoryFile(self.data), self.version)
                except Exception:
                    pass
                else:
                    error = None
                finally:
                    xdis.marsh.Ord = _ord
            if error is not None:
                raise error

        if isinstance(toc, list):
            try:
                toc = dict(toc)
            except Exception as error:
                self.entries = []
                self.error = error
                return

        failures = 0
        attempts = len(toc)

        for name, (pzt, offset, length) in toc.items():
            try:
                name: str
                name = name.decode('utf-8')
            except AttributeError:
                pass
            try:
                pzt = PzType(pzt)
            except Exception:
                pzt = PzType.DATA

            name = name.replace('.', '/')
            if pzt is PzType.PKG:
                name = F'{name}/__init__'

            with StreamDetour(self.reader, self.base + offset):
                data = self.reader.read(length)

            if key:
                def decompressed(data=data):
                    cipher = AES.new(key, AES.MODE_CFB, bytes(data[:0x10]))
                    return zlib.decompress(cipher.decrypt(data[0x10:]))
            elif decompress_peek(data):
                def decompressed(data=data):
                    return zlib.decompress(data)
            else:
                failures += 1
                continue

            if decompile and pzt in (PzType.MODULE, PzType.PKG):
                def decompiled(data=data, name=name, magic=self.magic):
                    data = decompressed(data)
                    if data[:4] != magic[:4]:
                        data = magic + data
                    return decompile_buffer(data, name)
                self.entries.append(PiMeta(PiType.DECOMPILED, F'{name}.py', decompiled))
                name = F'{name}.pyc'
                type = PiType.SOURCE
            else:
                type = PiType.DATA

            self.entries.append(PiMeta(type, name, decompressed))

        if key:
            if failures >= 6:
                xtpyi.logger.warning(F'pyz decompression failed for {failures - 5} additional items')
            return True
        elif failures > 0.7 * attempts:
            self.entries.clear()
            return False
        else:
            return True

Ancestors

Class variables

var MagicSignature

Methods

def unpack(self, decompile, key=None)
Expand source code Browse git
def unpack(self, decompile: bool, key: Optional[bytes] = None) -> bool:
    with StreamDetour(self.reader, self.base + self.toc_offset):
        toc_data = self.reader.read()
    try:
        toc = marshal.loads(toc_data)
    except Exception as error:
        if MAGIC_NUMBER != self.magic[:4]:
            xdis = xtpyi._xdis
            if isinstance(xdis, property):
                xdis = xdis.fget()
            _ord = xdis.marsh.Ord
            xdis.marsh.Ord = ord  # monkey-patch workaround for bug in xdis
            try:
                toc = xdis.marsh.load(
                    MemoryFile(self.data), self.version)
            except Exception:
                pass
            else:
                error = None
            finally:
                xdis.marsh.Ord = _ord
        if error is not None:
            raise error

    if isinstance(toc, list):
        try:
            toc = dict(toc)
        except Exception as error:
            self.entries = []
            self.error = error
            return

    failures = 0
    attempts = len(toc)

    for name, (pzt, offset, length) in toc.items():
        try:
            name: str
            name = name.decode('utf-8')
        except AttributeError:
            pass
        try:
            pzt = PzType(pzt)
        except Exception:
            pzt = PzType.DATA

        name = name.replace('.', '/')
        if pzt is PzType.PKG:
            name = F'{name}/__init__'

        with StreamDetour(self.reader, self.base + offset):
            data = self.reader.read(length)

        if key:
            def decompressed(data=data):
                cipher = AES.new(key, AES.MODE_CFB, bytes(data[:0x10]))
                return zlib.decompress(cipher.decrypt(data[0x10:]))
        elif decompress_peek(data):
            def decompressed(data=data):
                return zlib.decompress(data)
        else:
            failures += 1
            continue

        if decompile and pzt in (PzType.MODULE, PzType.PKG):
            def decompiled(data=data, name=name, magic=self.magic):
                data = decompressed(data)
                if data[:4] != magic[:4]:
                    data = magic + data
                return decompile_buffer(data, name)
            self.entries.append(PiMeta(PiType.DECOMPILED, F'{name}.py', decompiled))
            name = F'{name}.pyc'
            type = PiType.SOURCE
        else:
            type = PiType.DATA

        self.entries.append(PiMeta(type, name, decompressed))

    if key:
        if failures >= 6:
            xtpyi.logger.warning(F'pyz decompression failed for {failures - 5} additional items')
        return True
    elif failures > 0.7 * attempts:
        self.entries.clear()
        return False
    else:
        return True
class PiTOCEntry (reader)

A class to parse structured data. A Struct class can be instantiated as follows:

foo = Struct(data, bar=29)

The initialization routine of the structure will be called with a single argument reader. If the object data is already a StructReader, then it will be passed as reader. Otherwise, the argument will be wrapped in a StructReader. Additional arguments to the struct are passed through.

Expand source code Browse git
class PiTOCEntry(Struct):

    def __init__(self, reader: StructReader):
        reader.bigendian = True
        entry_start_offset = reader.tell()
        self.size_of_entry = reader.i32()
        self.offset = reader.i32()
        self.size_of_compressed_data = reader.i32()
        self.size_od_uncompressed_data = reader.i32()
        self.is_compressed = bool(reader.read_byte())
        entry_type = bytes(reader.read(1))
        name_length = self.size_of_entry - reader.tell() + entry_start_offset
        if name_length > 0x1000:
            raise RuntimeError(F'Refusing to process TOC entry with name of size {name_length}.')
        name, *_ = bytes(reader.read(name_length)).partition(B'\0')
        try:
            name = name.decode('utf8', 'backslashreplace')
        except Exception:
            name = None
        if not all(part.isprintable() for part in re.split('\\s*', name)):
            raise RuntimeError('Refusing to process TOC entry with non-printable name.')
        name = name or str(uuid.uuid4())
        if entry_type == B'Z':
            entry_type = B'z'
        try:
            self.type = PiType(entry_type)
        except ValueError:
            xtpyi.log_warn(F'unknown type {entry_type!r} in field {name}')
            self.type = PiType.UNKNOWN
        self.name = name

    def __hash__(self):
        return hash(self.name)

Ancestors

class PyInstallerArchiveEpilogue (reader, offset, unmarshal=Unmarshal.No, decompile=False)

A class to parse structured data. A Struct class can be instantiated as follows:

foo = Struct(data, bar=29)

The initialization routine of the structure will be called with a single argument reader. If the object data is already a StructReader, then it will be passed as reader. Otherwise, the argument will be wrapped in a StructReader. Additional arguments to the struct are passed through.

Expand source code Browse git
class PyInstallerArchiveEpilogue(Struct):

    MagicSignature = bytes.fromhex('4D45490C0B0A0B0E')

    def _read_libname(self, reader: StructReader) -> Optional[str]:
        position = reader.tell()
        try:
            libname, t, rest = reader.read_bytes(64).partition(B'\0')
        except EOFError:
            reader.seekset(position)
            return None
        try:
            libname = libname.decode('utf8')
        except Exception:
            reader.seekset(position)
            return None
        if not t or any(rest) or len(rest) < 10 or not re.fullmatch(R'[\s!-~]+', libname):
            reader.seekset(position)
            return None
        return libname

    def __init__(self, reader: StructReader, offset: int, unmarshal: Unmarshal = Unmarshal.No, decompile: bool = False):
        self.decompile = decompile
        reader.bigendian = True
        reader.seekset(offset)
        self.reader = reader
        signature = reader.read_bytes(8)
        if signature != self.MagicSignature:
            raise ValueError(
                F'offset 0x{offset:X} has invalid signature {signature.hex().upper()}; '
                F'should be {self.MagicSignature.hex().upper()}')
        self.size = reader.i32()
        toc_offset = reader.i32()
        toc_length = reader.i32()
        self.py_version = '.'.join(str(reader.u32()))
        self.py_libname = self._read_libname(reader)
        self.offset = reader.tell() - self.size

        self.toc: Dict[str, PiTOCEntry] = {}
        toc_end = self.offset + toc_offset + toc_length
        reader.seekset(self.offset + toc_offset)
        while reader.tell() < toc_end:
            try:
                entry = PiTOCEntry(reader)
            except EOFError:
                xtpyi.logger.warning('end of file while reading TOC')
                break
            except Exception as error:
                xtpyi.logger.warning(F'unexpected error while reading TOC: {error!s}')
                break
            if entry.name in self.toc:
                raise KeyError(F'duplicate name {entry.name}')
            self.toc[entry.name] = entry

        self.files: Dict[str, PiMeta] = {}
        no_pyz_found = True
        pyz_entries: Dict[str, PYZ] = {}

        for entry in list(self.toc.values()):
            if entry.type is not PiType.PYZ:
                continue
            no_pyz_found = False
            name, xt = os.path.splitext(entry.name)
            name_pyz = F'{name}.pyz'
            if name == entry.name:
                del self.toc[name]
                self.toc[name_pyz] = entry
                entry.name = name_pyz
            reader.seekset(self.offset + entry.offset)
            if entry.is_compressed:
                data = self.extract(entry.name).unpack()
            else:
                data = reader
            pyz_entries[name] = PYZ(data, self.py_version)

        magics = {pyz.magic for pyz in pyz_entries.values()}

        if not magics:
            if not no_pyz_found:
                xtpyi.logger.warning(
                    'no magic signature could be recovered from embedded pyzip archives; this is '
                    'unsual and means that there is no way to guess the missing magic for source '
                    'file entries and it will likely not be possible to decompile them.')
            return
        elif len(magics) > 1:
            xtpyi.logger.warning('more than one magic signature was recovered; this is unusual.')

        magics = list(magics)
        keys: Set[bytes] = set()

        for entry in self.toc.values():
            extracted = self.extract(entry.name)
            if entry.type not in (PiType.SOURCE, PiType.MODULE):
                self.files[entry.name] = extracted
                continue
            data = extracted.unpack()
            name, _ = os.path.splitext(extracted.name)
            del self.files[extracted.name]
            extracted.name = F'{name}.pyc'
            self.files[extracted.name] = extracted
            is_crypto_key = name.endswith('crypto_key')

            if len(magics) == 1 and data[:4] != magics[0]:
                extracted.data = magics[0] + data

            if is_crypto_key or self.decompile:
                decompiled = make_decompiled_item(name, data, *magics)
                if entry.type is PiType.SOURCE:
                    decompiled.type = PiType.USERCODE
                self.files[decompiled.name] = decompiled

            if is_crypto_key:
                for key in decompiled.unpack() | carve('string', decode=True):
                    if len(key) != 0x10:
                        continue
                    xtpyi.logger.info(F'found key: {key.decode(xtpyi.codec)}')
                    keys.add(key)

        if unmarshal is Unmarshal.No:
            return

        if not keys:
            key = None
        else:
            key = next(iter(keys))

        for name, pyz in pyz_entries.items():
            pyz.unpack(unmarshal is Unmarshal.YesAndDecompile, key)
            for unpacked in pyz.entries:
                unpacked.name = path = F'{name}/{unpacked.name}'
                if path in self.files:
                    raise ValueError(F'duplicate file name: {path}')
                self.files[path] = unpacked

    def extract(self, name: str) -> PiMeta:
        try:
            return self.files[name]
        except KeyError:
            pass
        entry = self.toc[name]
        with StreamDetour(self.reader, self.offset + entry.offset):
            data = self.reader.read(entry.size_of_compressed_data)
        if entry.is_compressed:
            def extracted(d=data): return zlib.decompress(d)
        else:
            extracted = data
        result = PiMeta(entry.type, name, extracted)
        self.files[name] = result
        return result

Ancestors

Class variables

var MagicSignature

Methods

def extract(self, name)
Expand source code Browse git
def extract(self, name: str) -> PiMeta:
    try:
        return self.files[name]
    except KeyError:
        pass
    entry = self.toc[name]
    with StreamDetour(self.reader, self.offset + entry.offset):
        data = self.reader.read(entry.size_of_compressed_data)
    if entry.is_compressed:
        def extracted(d=data): return zlib.decompress(d)
    else:
        extracted = data
    result = PiMeta(entry.type, name, extracted)
    self.files[name] = result
    return result
class xtpyi (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', decompile, user_code=False, unmarshal=0)

Extracts and decompiles files from a Python Installer (aka PyInstaller) archive.

Expand source code Browse git
class xtpyi(ArchiveUnit):
    """
    Extracts and decompiles files from a Python Installer (aka PyInstaller) archive.
    """
    def __init__(
        self, *paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False,
        path=b'path', date=b'date',
        decompile: Arg.Switch('-c', help='Attempt to decompile PYC files.'),
        user_code: Arg.Switch('-u', group='FILTER', help=(
            'Extract only source code files from the root of the archive. These usually implement '
            'the actual domain logic. This implies the --decompile option.')) = False,
        unmarshal: Arg('-y', action='count', group='FILTER', help=(
            '(DANGEROUS) Unmarshal embedded PYZ archives. Warning: Maliciously crafted packages can '
            'potentially exploit this to execute code. It is advised to only use this option inside '
            'an isolated environment. Specify twice to decompile unmarshalled Python bytecode.'
        )) = 0
    ):
        super().__init__(
            *paths,
            list=list,
            join_path=join_path,
            drop_path=drop_path,
            fuzzy=fuzzy,
            exact=exact,
            regex=regex,
            path=path,
            date=date,
            decompile=decompile,
            unmarshal=unmarshal,
            user_code=user_code,
        )

    @ArchiveUnit.Requires('xdis', 'arc', 'python', 'extended')
    def _xdis():
        import xdis.load
        import xdis.magics
        import xdis.marsh
        import xdis.op_imports
        import xdis.version_info
        import xdis
        A, B, C, *_ = sys.version_info
        version = F'{A}.{B}.{C}'
        canonic = F'{A}.{B}'
        if version not in xdis.magics.canonic_python_version:
            class opcode_dummy:
                version = float(canonic)
                def __init__(self, name): self.name = name
                def __getattr__(self, key): return opcode_dummy(F'{self.name}.{key}')
                def __call__(self, *a, **k): return None
                def __str__(self): return self.name
                def __repr__(self): return self.name
            import importlib
            magic = importlib.util.MAGIC_NUMBER
            xdis.magics.add_magic_from_int(xdis.magics.magic2int(magic), version)
            xdis.magics.by_magic.setdefault(magic, set()).add(version)
            xdis.magics.by_version[version] = magic
            xdis.magics.magics[canonic] = magic
            xdis.magics.canonic_python_version[canonic] = canonic
            xdis.magics.add_canonic_versions(version, canonic)
            xdis.op_imports.op_imports.setdefault(canonic, opcode_dummy('dummy'))
        del A, B, C, version
        import xdis.std
        return xdis

    @ArchiveUnit.Requires('uncompyle6', 'arc', 'python', 'extended')
    def _uncompyle6():
        import uncompyle6
        import uncompyle6.main
        return uncompyle6

    @ArchiveUnit.Requires('decompyle3', 'arc', 'python')
    def _decompyle3():
        import decompyle3
        import decompyle3.main
        return decompyle3

    def unpack(self, data):
        view = memoryview(data)
        positions = [m.start() for m in re.finditer(re.escape(PyInstallerArchiveEpilogue.MagicSignature), view)]
        mode = Unmarshal(min(2, int(self.args.unmarshal)))
        self.log_debug(F'unmarshal mode: {mode.name}')
        if not positions:
            raise LookupError('unable to find PyInstaller signature')
        if len(positions) > 2:
            # first position is expected to be the sentinel value in the unpacker stub
            width = max(len(F'{p:X}') for p in positions)
            for position in positions:
                self.log_info(F'magic signature found at offset 0x{position:0{width}X}')
            self.log_warn(F'found {len(positions) - 1} potential PyInstaller epilogue markers; using last one.')
        decompile = self.args.decompile
        uc_target = PiType.USERCODE if decompile else PiType.SOURCE
        archive = PyInstallerArchiveEpilogue(view, positions[-1], mode, decompile)
        for name, file in archive.files.items():
            if self.args.user_code:
                if file.type != uc_target:
                    continue
                if name.startswith('pyiboot'):
                    continue
            yield self._pack(name, None, file.data, type=file.type.name)

    @classmethod
    def handles(cls, data: ByteStr) -> Optional[bool]:
        return PyInstallerArchiveEpilogue.MagicSignature in data

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data):
    view = memoryview(data)
    positions = [m.start() for m in re.finditer(re.escape(PyInstallerArchiveEpilogue.MagicSignature), view)]
    mode = Unmarshal(min(2, int(self.args.unmarshal)))
    self.log_debug(F'unmarshal mode: {mode.name}')
    if not positions:
        raise LookupError('unable to find PyInstaller signature')
    if len(positions) > 2:
        # first position is expected to be the sentinel value in the unpacker stub
        width = max(len(F'{p:X}') for p in positions)
        for position in positions:
            self.log_info(F'magic signature found at offset 0x{position:0{width}X}')
        self.log_warn(F'found {len(positions) - 1} potential PyInstaller epilogue markers; using last one.')
    decompile = self.args.decompile
    uc_target = PiType.USERCODE if decompile else PiType.SOURCE
    archive = PyInstallerArchiveEpilogue(view, positions[-1], mode, decompile)
    for name, file in archive.files.items():
        if self.args.user_code:
            if file.type != uc_target:
                continue
            if name.startswith('pyiboot'):
                continue
        yield self._pack(name, None, file.data, type=file.type.name)

Inherited members