Module refinery.units.formats.archive.xtpyi

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import ByteString, Callable, Dict, List, Optional, Set, Union, NamedTuple, Generator
from types import CodeType

import marshal
import enum
import io
import re
import uuid
import zlib
import os
import os.path
import contextlib
import dataclasses
import sys

from importlib.util import MAGIC_NUMBER

from refinery.units.formats.archive import Arg, ArchiveUnit
from refinery.units.pattern.carve import carve
from refinery.lib.structures import EOF, MemoryFile, StreamDetour, Struct, StructReader
from refinery.lib.tools import NoLogging, normalize_word_separators

from Cryptodome.Cipher import AES


class Unmarshal(enum.IntEnum):
    No = 0
    Yes = 1
    YesAndDecompile = 2


def version2tuple(version: str):
    return tuple(int(k, 10) for k in re.fullmatch(R'^(\d+\.\d+(?:\.\d+)?)(.*)$', version).group(1).split('.'))


def decompress_peek(buffer, size=512) -> Optional[bytes]:
    try:
        return zlib.decompressobj().decompress(buffer[:size])
    except zlib.error:
        return None


class Code(NamedTuple):
    version: float
    timestamp: int
    magic: int
    container: CodeType
    is_pypi: bool
    code_objects: dict


def extract_code_from_buffer(buffer: ByteString, file_name: Optional[str] = None) -> Generator[Code, None, None]:
    main: xtpyi = xtpyi
    code_objects = {}
    sys_stderr = sys.stderr
    sys.stderr = open(os.devnull, 'w')
    file_name = file_name or '<unknown>'
    try:
        version, timestamp, magic_int, codes, is_pypy, _, _ = \
            main._xdis.load.load_module_from_file_object(MemoryFile(buffer), file_name, code_objects)
    finally:
        sys.stderr.close()
        sys.stderr = sys_stderr
    if not isinstance(codes, list):
        codes = [codes]
    for code in codes:
        yield Code(version, timestamp, magic_int, code, is_pypy, code_objects)


def decompile_buffer(buffer: Union[Code, ByteString], file_name: Optional[str] = None) -> ByteString:
    main: xtpyi = xtpyi
    errors = ''
    python = ''
    codes = [buffer]
    if not isinstance(buffer, Code):
        codes = list(extract_code_from_buffer(buffer, file_name))
    for code in codes:
        engines = {}
        for e in ['decompyle3', 'uncompyle6']:
            try:
                dc = getattr(main, F'_{e}')
                if isinstance(dc, property):
                    dc = dc.fget()
            except ImportError:
                errors += F'# The decompiler {dc} is not installed.\n'
            else:
                engines['decompyle3'] = dc
        if not engines:
            errors += '# (all missing, install one of the above to enable decompilation)'
        for name, engine in engines.items():
            with io.StringIO(newline='') as output, NoLogging(NoLogging.Mode.ALL):
                try:
                    engine.main.decompile(
                        co=code.container,
                        bytecode_version=code.version,
                        out=output,
                        timestamp=code.timestamp,
                        code_objects=code.code_objects,
                        is_pypy=code.is_pypi,
                        magic_int=code.magic,
                    )
                except Exception as E:
                    errors += '\n'.join(F'# {line}' for line in (
                        F'Error while decompiling with {name}:', *str(E).splitlines(True)))
                    errors += '\n'
                else:
                    python = output.getvalue()
                    break
    if python:
        # removes leading comments
        python = python.splitlines(True)
        python.reverse()
        while python[-1].strip().startswith('#'):
            python.pop()
        python.reverse()
        python = ''.join(python)
        return python.encode(main.codec)
    if not isinstance(buffer, Code):
        embedded = bytes(buffer | carve('printable', single=True))
        if len(code) - len(embedded) < 0x20:
            return embedded
    disassembly = MemoryFile()
    with io.TextIOWrapper(disassembly, main.codec, newline='\n') as output:
        output.write(errors)
        output.write('# Generating Disassembly:\n\n')
        for code in codes:
            instructions = list(main._xdis.std.Bytecode(code.container))
            width_offset = max(len(str(i.offset)) for i in instructions)
            for i in instructions:
                opname = normalize_word_separators(i.opname, '.').lower()
                offset = F'{i.offset:0{width_offset}d}'
                output.write(F'# {offset:>5} {opname:<25} {i.argrepr}\n')
        output.write('\n')
    return disassembly.getbuffer()


class PiType(bytes, enum.Enum):
    BINARY          = B'b'  # noqa / binary
    DEPENDENCY      = B'd'  # noqa / runtime option
    PYZ             = B'z'  # noqa / zlib (pyz) - frozen Python code
    PACKAGE         = B'M'  # noqa / Python package (__init__.py)
    MODULE          = B'm'  # noqa / Python module
    SOURCE          = B's'  # noqa / Python script (v3)
    DATA            = B'x'  # noqa / data
    RUNTIME_OPTION  = B'o'  # noqa / runtime option
    SPLASH          = B'l'  # noqa / splash resources
    UNKNOWN         = B'uk' # noqa
    DECOMPILED      = B'dc' # noqa
    USERCODE        = B'uc' # noqa
    ENCRYPTED       = B'ec' # noqa


class PzType(enum.IntEnum):
    MODULE = 0
    PKG = 1
    DATA = 2


@dataclasses.dataclass
class PiMeta:
    type: PiType
    name: str
    data: Union[Callable[[], ByteString], ByteString]

    def unpack(self) -> ByteString:
        if callable(self.data):
            self.data = self.data()
        return self.data


def make_decompiled_item(name: str, data: ByteString, *magics) -> PiMeta:

    def extract(data=data, magics=magics):
        error = None
        if any(data[:4] == m[:4] for m in magics):
            return decompile_buffer(data, name)
        for magic in magics:
            try:
                return decompile_buffer(magic + data, name)
            except Exception as exception:
                error = exception
        return '\n'.join(F'# {line}'
            for line in str(error).splitlines(True)).encode(xtpyi.codec)

    return PiMeta(PiType.DECOMPILED, F'{name}.py', extract)


class PYZ(Struct):

    MagicSignature = B'PYZ\0'

    def __init__(self, reader: StructReader, version: str):
        reader.bigendian = True
        self.base = reader.tell()
        signature = reader.read(4)
        if signature != self.MagicSignature:
            raise ValueError('invalid magic')
        magic = bytes(reader.read(4))
        with contextlib.suppress(KeyError, AttributeError):
            xdis = xtpyi._xdis
            if isinstance(xdis, property):
                xdis = xdis.fget()
            version = xdis.magics.versions[magic]
        vtuple = version2tuple(version)
        padding_size = 4
        if vtuple >= (3, 3):
            padding_size += 4
        if vtuple >= (3, 7):
            padding_size += 4
        self.version = version
        self.magic = magic + padding_size * b'\0'
        self.toc_offset = reader.i32()
        self.reader = reader
        self.entries: List[PiMeta] = []

    def unpack(self, decompile: bool, key: Optional[bytes] = None) -> bool:
        with StreamDetour(self.reader, self.base + self.toc_offset):
            toc_data = self.reader.read()
        try:
            toc = marshal.loads(toc_data)
        except Exception as error:
            if MAGIC_NUMBER != self.magic[:4]:
                xdis = xtpyi._xdis
                if isinstance(xdis, property):
                    xdis = xdis.fget()
                _ord = xdis.marsh.Ord
                xdis.marsh.Ord = ord  # monkey-patch workaround for bug in xdis
                try:
                    toc = xdis.marsh.load(
                        MemoryFile(self.data), self.version)
                except Exception:
                    pass
                else:
                    error = None
                finally:
                    xdis.marsh.Ord = _ord
            if error is not None:
                raise error

        if isinstance(toc, list):
            try:
                toc = dict(toc)
            except Exception as error:
                self.entries = []
                self.error = error
                return

        failures = 0
        attempts = len(toc)

        for name, (pzt, offset, length) in toc.items():
            try:
                name: str
                name = name.decode('utf-8')
            except AttributeError:
                pass
            try:
                pzt = PzType(pzt)
            except Exception:
                pzt = PzType.DATA

            name = name.replace('.', '/')
            if pzt is PzType.PKG:
                name = F'{name}/__init__'

            with StreamDetour(self.reader, self.base + offset):
                data = self.reader.read(length)

            if key:
                def decompressed(data=data):
                    cipher = AES.new(key, AES.MODE_CFB, bytes(data[:0x10]))
                    return zlib.decompress(cipher.decrypt(data[0x10:]))
            elif decompress_peek(data):
                def decompressed(data=data):
                    return zlib.decompress(data)
            else:
                failures += 1
                continue

            if decompile and pzt in (PzType.MODULE, PzType.PKG):
                def decompiled(data=data, name=name, magic=self.magic):
                    data = decompressed(data)
                    if data[:4] != magic[:4]:
                        data = magic + data
                    return decompile_buffer(data, name)
                self.entries.append(PiMeta(PiType.DECOMPILED, F'{name}.py', decompiled))
                name = F'{name}.pyc'
                type = PiType.SOURCE
            else:
                type = PiType.DATA

            self.entries.append(PiMeta(type, name, decompressed))

        if key:
            if failures >= 6:
                xtpyi.logger.warning(F'pyz decompression failed for {failures - 5} additional items')
            return True
        elif failures > 0.7 * attempts:
            self.entries.clear()
            return False
        else:
            return True


class PiTOCEntry(Struct):

    def __init__(self, reader: StructReader):
        reader.bigendian = True
        entry_start_offset = reader.tell()
        self.size_of_entry = reader.i32()
        self.offset = reader.i32()
        self.size_of_compressed_data = reader.i32()
        self.size_od_uncompressed_data = reader.i32()
        self.is_compressed = bool(reader.read_byte())
        entry_type = bytes(reader.read(1))
        name_length = self.size_of_entry - reader.tell() + entry_start_offset
        if name_length > 0x1000:
            raise RuntimeError(F'Refusing to process TOC entry with name of size {name_length}.')
        name, *_ = bytes(reader.read(name_length)).partition(B'\0')
        try:
            name = name.decode('utf8', 'backslashreplace')
        except Exception:
            name = None
        if not all(part.isprintable() for part in re.split('\\s*', name)):
            raise RuntimeError('Refusing to process TOC entry with non-printable name.')
        name = name or str(uuid.uuid4())
        if entry_type == B'Z':
            entry_type = B'z'
        try:
            self.type = PiType(entry_type)
        except ValueError:
            xtpyi.log_warn(F'unknown type {entry_type!r} in field {name}')
            self.type = PiType.UNKNOWN
        self.name = name

    def __hash__(self):
        return hash(self.name)


class PyInstallerArchiveEpilogue(Struct):

    MagicSignature = bytes.fromhex('4D45490C0B0A0B0E')

    def _read_libname(self, reader: StructReader) -> Optional[str]:
        position = reader.tell()
        try:
            libname, t, rest = reader.read_bytes(64).partition(B'\0')
        except EOF:
            reader.seekset(position)
            return None
        try:
            libname = libname.decode('utf8')
        except Exception:
            reader.seekset(position)
            return None
        if not t or any(rest) or len(rest) < 10 or not re.fullmatch(R'[\s!-~]+', libname):
            reader.seekset(position)
            return None
        return libname

    def __init__(self, reader: StructReader, offset: int, unmarshal: Unmarshal = Unmarshal.No):
        reader.bigendian = True
        reader.seekset(offset)
        self.reader = reader
        signature = reader.read_bytes(8)
        if signature != self.MagicSignature:
            raise ValueError(
                F'offset 0x{offset:X} has invalid signature {signature.hex().upper()}; '
                F'should be {self.MagicSignature.hex().upper()}')
        self.size = reader.i32()
        toc_offset = reader.i32()
        toc_length = reader.i32()
        self.py_version = '.'.join(str(reader.u32()))
        self.py_libname = self._read_libname(reader)
        self.offset = reader.tell() - self.size

        self.toc: Dict[str, PiTOCEntry] = {}
        toc_end = self.offset + toc_offset + toc_length
        reader.seekset(self.offset + toc_offset)
        while reader.tell() < toc_end:
            try:
                entry = PiTOCEntry(reader)
            except EOF:
                xtpyi.logger.warning('end of file while reading TOC')
                break
            except Exception as error:
                xtpyi.logger.warning(F'unexpected error while reading TOC: {error!s}')
                break
            if entry.name in self.toc:
                raise KeyError(F'duplicate name {entry.name}')
            self.toc[entry.name] = entry

        self.files: Dict[str, PiMeta] = {}
        no_pyz_found = True
        pyz_entries: Dict[str, PYZ] = {}

        for entry in list(self.toc.values()):
            if entry.type is not PiType.PYZ:
                continue
            no_pyz_found = False
            name, xt = os.path.splitext(entry.name)
            name_pyz = F'{name}.pyz'
            if name == entry.name:
                del self.toc[name]
                self.toc[name_pyz] = entry
                entry.name = name_pyz
            reader.seekset(self.offset + entry.offset)
            if entry.is_compressed:
                data = self.extract(entry.name).unpack()
            else:
                data = reader
            pyz_entries[name] = PYZ(data, self.py_version)

        magics = {pyz.magic for pyz in pyz_entries.values()}

        if not magics:
            if not no_pyz_found:
                xtpyi.logger.warning(
                    'no magic signature could be recovered from embedded pyzip archives; this is '
                    'unsual and means that there is no way to guess the missing magic for source '
                    'file entries and it will likely not be possible to decompile them.')
            return
        elif len(magics) > 1:
            xtpyi.logger.warning('more than one magic signature was recovered; this is unusual.')

        magics = list(magics)
        keys: Set[bytes] = set()

        for entry in self.toc.values():
            extracted = self.extract(entry.name)
            if entry.type not in (PiType.SOURCE, PiType.MODULE):
                self.files[entry.name] = extracted
                continue
            data = extracted.unpack()
            name, _ = os.path.splitext(extracted.name)
            del self.files[extracted.name]
            extracted.name = F'{name}.pyc'
            self.files[extracted.name] = extracted

            if len(magics) == 1 and data[:4] != magics[0]:
                extracted.data = magics[0] + data
            decompiled = make_decompiled_item(name, data, *magics)
            if entry.type is PiType.SOURCE:
                decompiled.type = PiType.USERCODE
            self.files[F'{name}.py'] = decompiled
            if name.endswith('crypto_key'):
                for key in decompiled.unpack() | carve('string', decode=True):
                    if len(key) != 0x10:
                        continue
                    xtpyi.logger.info(F'found key: {key.decode(xtpyi.codec)}')
                    keys.add(key)

        if unmarshal is Unmarshal.No:
            return

        if not keys:
            key = None
        else:
            key = next(iter(keys))

        for name, pyz in pyz_entries.items():
            pyz.unpack(unmarshal is Unmarshal.YesAndDecompile, key)
            for unpacked in pyz.entries:
                unpacked.name = path = F'{name}/{unpacked.name}'
                if path in self.files:
                    raise ValueError(F'duplicate file name: {path}')
                self.files[path] = unpacked

    def extract(self, name: str) -> PiMeta:
        try:
            return self.files[name]
        except KeyError:
            pass
        entry = self.toc[name]
        with StreamDetour(self.reader, self.offset + entry.offset):
            data = self.reader.read(entry.size_of_compressed_data)
        if entry.is_compressed:
            def extracted(d=data): return zlib.decompress(d)
        else:
            extracted = data
        result = PiMeta(entry.type, name, extracted)
        self.files[name] = result
        return result


class xtpyi(ArchiveUnit):
    """
    Extracts and decompiles files from a Python Installer (aka PyInstaller) archive.
    """
    def __init__(
        self, *paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False,
        path=b'path', date=b'date',
        user_code: Arg.Switch('-u', group='FILTER', help=(
            'Extract only source code files from the root of the archive. These usually implement '
            'the actual domain logic.')) = False,
        unmarshal: Arg('-y', action='count', group='FILTER', help=(
            '(DANGEROUS) Unmarshal embedded PYZ archives. Warning: Maliciously crafted packages can '
            'potentially exploit this to execute code. It is advised to only use this option inside '
            'an isolated environment. Specify twice to decompile unmarshalled Python bytecode.'
        )) = 0
    ):
        super().__init__(
            *paths,
            list=list,
            join_path=join_path,
            drop_path=drop_path,
            fuzzy=fuzzy,
            exact=exact,
            regex=regex,
            path=path,
            date=date,
            unmarshal=unmarshal,
            user_code=user_code,
        )

    @ArchiveUnit.Requires('xdis', 'arc', 'python', 'extended')
    def _xdis():
        import xdis.load
        import xdis.magics
        import xdis.marsh
        import xdis.op_imports
        import xdis
        A, B, C, *_ = sys.version_info
        version = F'{A}.{B}.{C}'
        canonic = F'{A}.{B}'
        if version not in xdis.magics.canonic_python_version:
            class opcode_dummy:
                version = float(canonic)
                def __init__(self, name): self.name = name
                def __getattr__(self, key): return opcode_dummy(F'{self.name}.{key}')
                def __call__(self, *a, **k): return None
                def __str__(self): return self.name
                def __repr__(self): return self.name
            import importlib
            magic = importlib.util.MAGIC_NUMBER
            xdis.magics.add_magic_from_int(xdis.magics.magic2int(magic), version)
            xdis.magics.by_magic.setdefault(magic, set()).add(version)
            xdis.magics.by_version[version] = magic
            xdis.magics.magics[canonic] = magic
            xdis.magics.canonic_python_version[canonic] = canonic
            xdis.magics.add_canonic_versions(version, canonic)
            xdis.op_imports.op_imports.setdefault(canonic, opcode_dummy('dummy'))
        del A, B, C, version
        import xdis.std
        return xdis

    @ArchiveUnit.Requires('uncompyle6', 'arc', 'python', 'extended')
    def _uncompyle6():
        import uncompyle6
        import uncompyle6.main
        return uncompyle6

    @ArchiveUnit.Requires('decompyle3', 'arc', 'python')
    def _decompyle3():
        import decompyle3
        import decompyle3.main
        return decompyle3

    def unpack(self, data):
        view = memoryview(data)
        positions = [m.start() for m in re.finditer(re.escape(PyInstallerArchiveEpilogue.MagicSignature), view)]
        mode = Unmarshal(min(2, int(self.args.unmarshal)))
        self.log_debug(F'unmarshal mode: {mode.name}')
        if not positions:
            raise LookupError('unable to find PyInstaller signature')
        if len(positions) > 2:
            # first position is expected to be the sentinel value in the unpacker stub
            width = max(len(F'{p:X}') for p in positions)
            for position in positions:
                self.log_info(F'magic signature found at offset 0x{position:0{width}X}')
            self.log_warn(F'found {len(positions) - 1} potential PyInstaller epilogue markers; using last one.')
        archive = PyInstallerArchiveEpilogue(view, positions[-1], mode)
        for name, file in archive.files.items():
            if self.args.user_code:
                if file.type != PiType.USERCODE:
                    continue
                if name.startswith('pyiboot'):
                    continue
            yield self._pack(name, None, file.data, type=file.type.name)

    @classmethod
    def handles(cls, data: ByteString) -> Optional[bool]:
        return PyInstallerArchiveEpilogue.MagicSignature in data

Functions

def version2tuple(version)
Expand source code Browse git
def version2tuple(version: str):
    return tuple(int(k, 10) for k in re.fullmatch(R'^(\d+\.\d+(?:\.\d+)?)(.*)$', version).group(1).split('.'))
def decompress_peek(buffer, size=512)
Expand source code Browse git
def decompress_peek(buffer, size=512) -> Optional[bytes]:
    try:
        return zlib.decompressobj().decompress(buffer[:size])
    except zlib.error:
        return None
def extract_code_from_buffer(buffer, file_name=None)
Expand source code Browse git
def extract_code_from_buffer(buffer: ByteString, file_name: Optional[str] = None) -> Generator[Code, None, None]:
    main: xtpyi = xtpyi
    code_objects = {}
    sys_stderr = sys.stderr
    sys.stderr = open(os.devnull, 'w')
    file_name = file_name or '<unknown>'
    try:
        version, timestamp, magic_int, codes, is_pypy, _, _ = \
            main._xdis.load.load_module_from_file_object(MemoryFile(buffer), file_name, code_objects)
    finally:
        sys.stderr.close()
        sys.stderr = sys_stderr
    if not isinstance(codes, list):
        codes = [codes]
    for code in codes:
        yield Code(version, timestamp, magic_int, code, is_pypy, code_objects)
def decompile_buffer(buffer, file_name=None)
Expand source code Browse git
def decompile_buffer(buffer: Union[Code, ByteString], file_name: Optional[str] = None) -> ByteString:
    main: xtpyi = xtpyi
    errors = ''
    python = ''
    codes = [buffer]
    if not isinstance(buffer, Code):
        codes = list(extract_code_from_buffer(buffer, file_name))
    for code in codes:
        engines = {}
        for e in ['decompyle3', 'uncompyle6']:
            try:
                dc = getattr(main, F'_{e}')
                if isinstance(dc, property):
                    dc = dc.fget()
            except ImportError:
                errors += F'# The decompiler {dc} is not installed.\n'
            else:
                engines['decompyle3'] = dc
        if not engines:
            errors += '# (all missing, install one of the above to enable decompilation)'
        for name, engine in engines.items():
            with io.StringIO(newline='') as output, NoLogging(NoLogging.Mode.ALL):
                try:
                    engine.main.decompile(
                        co=code.container,
                        bytecode_version=code.version,
                        out=output,
                        timestamp=code.timestamp,
                        code_objects=code.code_objects,
                        is_pypy=code.is_pypi,
                        magic_int=code.magic,
                    )
                except Exception as E:
                    errors += '\n'.join(F'# {line}' for line in (
                        F'Error while decompiling with {name}:', *str(E).splitlines(True)))
                    errors += '\n'
                else:
                    python = output.getvalue()
                    break
    if python:
        # removes leading comments
        python = python.splitlines(True)
        python.reverse()
        while python[-1].strip().startswith('#'):
            python.pop()
        python.reverse()
        python = ''.join(python)
        return python.encode(main.codec)
    if not isinstance(buffer, Code):
        embedded = bytes(buffer | carve('printable', single=True))
        if len(code) - len(embedded) < 0x20:
            return embedded
    disassembly = MemoryFile()
    with io.TextIOWrapper(disassembly, main.codec, newline='\n') as output:
        output.write(errors)
        output.write('# Generating Disassembly:\n\n')
        for code in codes:
            instructions = list(main._xdis.std.Bytecode(code.container))
            width_offset = max(len(str(i.offset)) for i in instructions)
            for i in instructions:
                opname = normalize_word_separators(i.opname, '.').lower()
                offset = F'{i.offset:0{width_offset}d}'
                output.write(F'# {offset:>5} {opname:<25} {i.argrepr}\n')
        output.write('\n')
    return disassembly.getbuffer()
def make_decompiled_item(name, data, *magics)
Expand source code Browse git
def make_decompiled_item(name: str, data: ByteString, *magics) -> PiMeta:

    def extract(data=data, magics=magics):
        error = None
        if any(data[:4] == m[:4] for m in magics):
            return decompile_buffer(data, name)
        for magic in magics:
            try:
                return decompile_buffer(magic + data, name)
            except Exception as exception:
                error = exception
        return '\n'.join(F'# {line}'
            for line in str(error).splitlines(True)).encode(xtpyi.codec)

    return PiMeta(PiType.DECOMPILED, F'{name}.py', extract)

Classes

class Unmarshal (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code Browse git
class Unmarshal(enum.IntEnum):
    No = 0
    Yes = 1
    YesAndDecompile = 2

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var No
var Yes
var YesAndDecompile
class Code (version, timestamp, magic, container, is_pypi, code_objects)

Code(version, timestamp, magic, container, is_pypi, code_objects)

Expand source code Browse git
class Code(NamedTuple):
    version: float
    timestamp: int
    magic: int
    container: CodeType
    is_pypi: bool
    code_objects: dict

Ancestors

  • builtins.tuple

Instance variables

var version

Alias for field number 0

var timestamp

Alias for field number 1

var magic

Alias for field number 2

var container

Alias for field number 3

var is_pypi

Alias for field number 4

var code_objects

Alias for field number 5

class PiType (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code Browse git
class PiType(bytes, enum.Enum):
    BINARY          = B'b'  # noqa / binary
    DEPENDENCY      = B'd'  # noqa / runtime option
    PYZ             = B'z'  # noqa / zlib (pyz) - frozen Python code
    PACKAGE         = B'M'  # noqa / Python package (__init__.py)
    MODULE          = B'm'  # noqa / Python module
    SOURCE          = B's'  # noqa / Python script (v3)
    DATA            = B'x'  # noqa / data
    RUNTIME_OPTION  = B'o'  # noqa / runtime option
    SPLASH          = B'l'  # noqa / splash resources
    UNKNOWN         = B'uk' # noqa
    DECOMPILED      = B'dc' # noqa
    USERCODE        = B'uc' # noqa
    ENCRYPTED       = B'ec' # noqa

Ancestors

  • builtins.bytes
  • enum.Enum

Class variables

var BINARY
var DEPENDENCY
var PYZ
var PACKAGE
var MODULE
var SOURCE
var DATA
var RUNTIME_OPTION
var SPLASH
var UNKNOWN
var DECOMPILED
var USERCODE
var ENCRYPTED
class PzType (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code Browse git
class PzType(enum.IntEnum):
    MODULE = 0
    PKG = 1
    DATA = 2

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var MODULE
var PKG
var DATA
class PiMeta (type, name, data)

PiMeta(type: 'PiType', name: 'str', data: 'Union[Callable[[], ByteString], ByteString]')

Expand source code Browse git
class PiMeta:
    type: PiType
    name: str
    data: Union[Callable[[], ByteString], ByteString]

    def unpack(self) -> ByteString:
        if callable(self.data):
            self.data = self.data()
        return self.data

Class variables

var type
var name
var data

Methods

def unpack(self)
Expand source code Browse git
def unpack(self) -> ByteString:
    if callable(self.data):
        self.data = self.data()
    return self.data
class PYZ (reader, version)

A class to parse structured data. A Struct class can be instantiated as follows:

foo = Struct(data, bar=29)

The initialization routine of the structure will be called with a single argument reader. If the object data is already a StructReader, then it will be passed as reader. Otherwise, the argument will be wrapped in a StructReader. Before initialization of the struct, the member bar of the newly created structure will be set to the value 29.

Expand source code Browse git
class PYZ(Struct):

    MagicSignature = B'PYZ\0'

    def __init__(self, reader: StructReader, version: str):
        reader.bigendian = True
        self.base = reader.tell()
        signature = reader.read(4)
        if signature != self.MagicSignature:
            raise ValueError('invalid magic')
        magic = bytes(reader.read(4))
        with contextlib.suppress(KeyError, AttributeError):
            xdis = xtpyi._xdis
            if isinstance(xdis, property):
                xdis = xdis.fget()
            version = xdis.magics.versions[magic]
        vtuple = version2tuple(version)
        padding_size = 4
        if vtuple >= (3, 3):
            padding_size += 4
        if vtuple >= (3, 7):
            padding_size += 4
        self.version = version
        self.magic = magic + padding_size * b'\0'
        self.toc_offset = reader.i32()
        self.reader = reader
        self.entries: List[PiMeta] = []

    def unpack(self, decompile: bool, key: Optional[bytes] = None) -> bool:
        with StreamDetour(self.reader, self.base + self.toc_offset):
            toc_data = self.reader.read()
        try:
            toc = marshal.loads(toc_data)
        except Exception as error:
            if MAGIC_NUMBER != self.magic[:4]:
                xdis = xtpyi._xdis
                if isinstance(xdis, property):
                    xdis = xdis.fget()
                _ord = xdis.marsh.Ord
                xdis.marsh.Ord = ord  # monkey-patch workaround for bug in xdis
                try:
                    toc = xdis.marsh.load(
                        MemoryFile(self.data), self.version)
                except Exception:
                    pass
                else:
                    error = None
                finally:
                    xdis.marsh.Ord = _ord
            if error is not None:
                raise error

        if isinstance(toc, list):
            try:
                toc = dict(toc)
            except Exception as error:
                self.entries = []
                self.error = error
                return

        failures = 0
        attempts = len(toc)

        for name, (pzt, offset, length) in toc.items():
            try:
                name: str
                name = name.decode('utf-8')
            except AttributeError:
                pass
            try:
                pzt = PzType(pzt)
            except Exception:
                pzt = PzType.DATA

            name = name.replace('.', '/')
            if pzt is PzType.PKG:
                name = F'{name}/__init__'

            with StreamDetour(self.reader, self.base + offset):
                data = self.reader.read(length)

            if key:
                def decompressed(data=data):
                    cipher = AES.new(key, AES.MODE_CFB, bytes(data[:0x10]))
                    return zlib.decompress(cipher.decrypt(data[0x10:]))
            elif decompress_peek(data):
                def decompressed(data=data):
                    return zlib.decompress(data)
            else:
                failures += 1
                continue

            if decompile and pzt in (PzType.MODULE, PzType.PKG):
                def decompiled(data=data, name=name, magic=self.magic):
                    data = decompressed(data)
                    if data[:4] != magic[:4]:
                        data = magic + data
                    return decompile_buffer(data, name)
                self.entries.append(PiMeta(PiType.DECOMPILED, F'{name}.py', decompiled))
                name = F'{name}.pyc'
                type = PiType.SOURCE
            else:
                type = PiType.DATA

            self.entries.append(PiMeta(type, name, decompressed))

        if key:
            if failures >= 6:
                xtpyi.logger.warning(F'pyz decompression failed for {failures - 5} additional items')
            return True
        elif failures > 0.7 * attempts:
            self.entries.clear()
            return False
        else:
            return True

Ancestors

Class variables

var MagicSignature

Methods

def unpack(self, decompile, key=None)
Expand source code Browse git
def unpack(self, decompile: bool, key: Optional[bytes] = None) -> bool:
    with StreamDetour(self.reader, self.base + self.toc_offset):
        toc_data = self.reader.read()
    try:
        toc = marshal.loads(toc_data)
    except Exception as error:
        if MAGIC_NUMBER != self.magic[:4]:
            xdis = xtpyi._xdis
            if isinstance(xdis, property):
                xdis = xdis.fget()
            _ord = xdis.marsh.Ord
            xdis.marsh.Ord = ord  # monkey-patch workaround for bug in xdis
            try:
                toc = xdis.marsh.load(
                    MemoryFile(self.data), self.version)
            except Exception:
                pass
            else:
                error = None
            finally:
                xdis.marsh.Ord = _ord
        if error is not None:
            raise error

    if isinstance(toc, list):
        try:
            toc = dict(toc)
        except Exception as error:
            self.entries = []
            self.error = error
            return

    failures = 0
    attempts = len(toc)

    for name, (pzt, offset, length) in toc.items():
        try:
            name: str
            name = name.decode('utf-8')
        except AttributeError:
            pass
        try:
            pzt = PzType(pzt)
        except Exception:
            pzt = PzType.DATA

        name = name.replace('.', '/')
        if pzt is PzType.PKG:
            name = F'{name}/__init__'

        with StreamDetour(self.reader, self.base + offset):
            data = self.reader.read(length)

        if key:
            def decompressed(data=data):
                cipher = AES.new(key, AES.MODE_CFB, bytes(data[:0x10]))
                return zlib.decompress(cipher.decrypt(data[0x10:]))
        elif decompress_peek(data):
            def decompressed(data=data):
                return zlib.decompress(data)
        else:
            failures += 1
            continue

        if decompile and pzt in (PzType.MODULE, PzType.PKG):
            def decompiled(data=data, name=name, magic=self.magic):
                data = decompressed(data)
                if data[:4] != magic[:4]:
                    data = magic + data
                return decompile_buffer(data, name)
            self.entries.append(PiMeta(PiType.DECOMPILED, F'{name}.py', decompiled))
            name = F'{name}.pyc'
            type = PiType.SOURCE
        else:
            type = PiType.DATA

        self.entries.append(PiMeta(type, name, decompressed))

    if key:
        if failures >= 6:
            xtpyi.logger.warning(F'pyz decompression failed for {failures - 5} additional items')
        return True
    elif failures > 0.7 * attempts:
        self.entries.clear()
        return False
    else:
        return True
class PiTOCEntry (reader)

A class to parse structured data. A Struct class can be instantiated as follows:

foo = Struct(data, bar=29)

The initialization routine of the structure will be called with a single argument reader. If the object data is already a StructReader, then it will be passed as reader. Otherwise, the argument will be wrapped in a StructReader. Before initialization of the struct, the member bar of the newly created structure will be set to the value 29.

Expand source code Browse git
class PiTOCEntry(Struct):

    def __init__(self, reader: StructReader):
        reader.bigendian = True
        entry_start_offset = reader.tell()
        self.size_of_entry = reader.i32()
        self.offset = reader.i32()
        self.size_of_compressed_data = reader.i32()
        self.size_od_uncompressed_data = reader.i32()
        self.is_compressed = bool(reader.read_byte())
        entry_type = bytes(reader.read(1))
        name_length = self.size_of_entry - reader.tell() + entry_start_offset
        if name_length > 0x1000:
            raise RuntimeError(F'Refusing to process TOC entry with name of size {name_length}.')
        name, *_ = bytes(reader.read(name_length)).partition(B'\0')
        try:
            name = name.decode('utf8', 'backslashreplace')
        except Exception:
            name = None
        if not all(part.isprintable() for part in re.split('\\s*', name)):
            raise RuntimeError('Refusing to process TOC entry with non-printable name.')
        name = name or str(uuid.uuid4())
        if entry_type == B'Z':
            entry_type = B'z'
        try:
            self.type = PiType(entry_type)
        except ValueError:
            xtpyi.log_warn(F'unknown type {entry_type!r} in field {name}')
            self.type = PiType.UNKNOWN
        self.name = name

    def __hash__(self):
        return hash(self.name)

Ancestors

class PyInstallerArchiveEpilogue (reader, offset, unmarshal=Unmarshal.No)

A class to parse structured data. A Struct class can be instantiated as follows:

foo = Struct(data, bar=29)

The initialization routine of the structure will be called with a single argument reader. If the object data is already a StructReader, then it will be passed as reader. Otherwise, the argument will be wrapped in a StructReader. Before initialization of the struct, the member bar of the newly created structure will be set to the value 29.

Expand source code Browse git
class PyInstallerArchiveEpilogue(Struct):

    MagicSignature = bytes.fromhex('4D45490C0B0A0B0E')

    def _read_libname(self, reader: StructReader) -> Optional[str]:
        position = reader.tell()
        try:
            libname, t, rest = reader.read_bytes(64).partition(B'\0')
        except EOF:
            reader.seekset(position)
            return None
        try:
            libname = libname.decode('utf8')
        except Exception:
            reader.seekset(position)
            return None
        if not t or any(rest) or len(rest) < 10 or not re.fullmatch(R'[\s!-~]+', libname):
            reader.seekset(position)
            return None
        return libname

    def __init__(self, reader: StructReader, offset: int, unmarshal: Unmarshal = Unmarshal.No):
        reader.bigendian = True
        reader.seekset(offset)
        self.reader = reader
        signature = reader.read_bytes(8)
        if signature != self.MagicSignature:
            raise ValueError(
                F'offset 0x{offset:X} has invalid signature {signature.hex().upper()}; '
                F'should be {self.MagicSignature.hex().upper()}')
        self.size = reader.i32()
        toc_offset = reader.i32()
        toc_length = reader.i32()
        self.py_version = '.'.join(str(reader.u32()))
        self.py_libname = self._read_libname(reader)
        self.offset = reader.tell() - self.size

        self.toc: Dict[str, PiTOCEntry] = {}
        toc_end = self.offset + toc_offset + toc_length
        reader.seekset(self.offset + toc_offset)
        while reader.tell() < toc_end:
            try:
                entry = PiTOCEntry(reader)
            except EOF:
                xtpyi.logger.warning('end of file while reading TOC')
                break
            except Exception as error:
                xtpyi.logger.warning(F'unexpected error while reading TOC: {error!s}')
                break
            if entry.name in self.toc:
                raise KeyError(F'duplicate name {entry.name}')
            self.toc[entry.name] = entry

        self.files: Dict[str, PiMeta] = {}
        no_pyz_found = True
        pyz_entries: Dict[str, PYZ] = {}

        for entry in list(self.toc.values()):
            if entry.type is not PiType.PYZ:
                continue
            no_pyz_found = False
            name, xt = os.path.splitext(entry.name)
            name_pyz = F'{name}.pyz'
            if name == entry.name:
                del self.toc[name]
                self.toc[name_pyz] = entry
                entry.name = name_pyz
            reader.seekset(self.offset + entry.offset)
            if entry.is_compressed:
                data = self.extract(entry.name).unpack()
            else:
                data = reader
            pyz_entries[name] = PYZ(data, self.py_version)

        magics = {pyz.magic for pyz in pyz_entries.values()}

        if not magics:
            if not no_pyz_found:
                xtpyi.logger.warning(
                    'no magic signature could be recovered from embedded pyzip archives; this is '
                    'unsual and means that there is no way to guess the missing magic for source '
                    'file entries and it will likely not be possible to decompile them.')
            return
        elif len(magics) > 1:
            xtpyi.logger.warning('more than one magic signature was recovered; this is unusual.')

        magics = list(magics)
        keys: Set[bytes] = set()

        for entry in self.toc.values():
            extracted = self.extract(entry.name)
            if entry.type not in (PiType.SOURCE, PiType.MODULE):
                self.files[entry.name] = extracted
                continue
            data = extracted.unpack()
            name, _ = os.path.splitext(extracted.name)
            del self.files[extracted.name]
            extracted.name = F'{name}.pyc'
            self.files[extracted.name] = extracted

            if len(magics) == 1 and data[:4] != magics[0]:
                extracted.data = magics[0] + data
            decompiled = make_decompiled_item(name, data, *magics)
            if entry.type is PiType.SOURCE:
                decompiled.type = PiType.USERCODE
            self.files[F'{name}.py'] = decompiled
            if name.endswith('crypto_key'):
                for key in decompiled.unpack() | carve('string', decode=True):
                    if len(key) != 0x10:
                        continue
                    xtpyi.logger.info(F'found key: {key.decode(xtpyi.codec)}')
                    keys.add(key)

        if unmarshal is Unmarshal.No:
            return

        if not keys:
            key = None
        else:
            key = next(iter(keys))

        for name, pyz in pyz_entries.items():
            pyz.unpack(unmarshal is Unmarshal.YesAndDecompile, key)
            for unpacked in pyz.entries:
                unpacked.name = path = F'{name}/{unpacked.name}'
                if path in self.files:
                    raise ValueError(F'duplicate file name: {path}')
                self.files[path] = unpacked

    def extract(self, name: str) -> PiMeta:
        try:
            return self.files[name]
        except KeyError:
            pass
        entry = self.toc[name]
        with StreamDetour(self.reader, self.offset + entry.offset):
            data = self.reader.read(entry.size_of_compressed_data)
        if entry.is_compressed:
            def extracted(d=data): return zlib.decompress(d)
        else:
            extracted = data
        result = PiMeta(entry.type, name, extracted)
        self.files[name] = result
        return result

Ancestors

Class variables

var MagicSignature

Methods

def extract(self, name)
Expand source code Browse git
def extract(self, name: str) -> PiMeta:
    try:
        return self.files[name]
    except KeyError:
        pass
    entry = self.toc[name]
    with StreamDetour(self.reader, self.offset + entry.offset):
        data = self.reader.read(entry.size_of_compressed_data)
    if entry.is_compressed:
        def extracted(d=data): return zlib.decompress(d)
    else:
        extracted = data
    result = PiMeta(entry.type, name, extracted)
    self.files[name] = result
    return result
class xtpyi (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', user_code=False, unmarshal=0)

Extracts and decompiles files from a Python Installer (aka PyInstaller) archive.

Expand source code Browse git
class xtpyi(ArchiveUnit):
    """
    Extracts and decompiles files from a Python Installer (aka PyInstaller) archive.
    """
    def __init__(
        self, *paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False,
        path=b'path', date=b'date',
        user_code: Arg.Switch('-u', group='FILTER', help=(
            'Extract only source code files from the root of the archive. These usually implement '
            'the actual domain logic.')) = False,
        unmarshal: Arg('-y', action='count', group='FILTER', help=(
            '(DANGEROUS) Unmarshal embedded PYZ archives. Warning: Maliciously crafted packages can '
            'potentially exploit this to execute code. It is advised to only use this option inside '
            'an isolated environment. Specify twice to decompile unmarshalled Python bytecode.'
        )) = 0
    ):
        super().__init__(
            *paths,
            list=list,
            join_path=join_path,
            drop_path=drop_path,
            fuzzy=fuzzy,
            exact=exact,
            regex=regex,
            path=path,
            date=date,
            unmarshal=unmarshal,
            user_code=user_code,
        )

    @ArchiveUnit.Requires('xdis', 'arc', 'python', 'extended')
    def _xdis():
        import xdis.load
        import xdis.magics
        import xdis.marsh
        import xdis.op_imports
        import xdis
        A, B, C, *_ = sys.version_info
        version = F'{A}.{B}.{C}'
        canonic = F'{A}.{B}'
        if version not in xdis.magics.canonic_python_version:
            class opcode_dummy:
                version = float(canonic)
                def __init__(self, name): self.name = name
                def __getattr__(self, key): return opcode_dummy(F'{self.name}.{key}')
                def __call__(self, *a, **k): return None
                def __str__(self): return self.name
                def __repr__(self): return self.name
            import importlib
            magic = importlib.util.MAGIC_NUMBER
            xdis.magics.add_magic_from_int(xdis.magics.magic2int(magic), version)
            xdis.magics.by_magic.setdefault(magic, set()).add(version)
            xdis.magics.by_version[version] = magic
            xdis.magics.magics[canonic] = magic
            xdis.magics.canonic_python_version[canonic] = canonic
            xdis.magics.add_canonic_versions(version, canonic)
            xdis.op_imports.op_imports.setdefault(canonic, opcode_dummy('dummy'))
        del A, B, C, version
        import xdis.std
        return xdis

    @ArchiveUnit.Requires('uncompyle6', 'arc', 'python', 'extended')
    def _uncompyle6():
        import uncompyle6
        import uncompyle6.main
        return uncompyle6

    @ArchiveUnit.Requires('decompyle3', 'arc', 'python')
    def _decompyle3():
        import decompyle3
        import decompyle3.main
        return decompyle3

    def unpack(self, data):
        view = memoryview(data)
        positions = [m.start() for m in re.finditer(re.escape(PyInstallerArchiveEpilogue.MagicSignature), view)]
        mode = Unmarshal(min(2, int(self.args.unmarshal)))
        self.log_debug(F'unmarshal mode: {mode.name}')
        if not positions:
            raise LookupError('unable to find PyInstaller signature')
        if len(positions) > 2:
            # first position is expected to be the sentinel value in the unpacker stub
            width = max(len(F'{p:X}') for p in positions)
            for position in positions:
                self.log_info(F'magic signature found at offset 0x{position:0{width}X}')
            self.log_warn(F'found {len(positions) - 1} potential PyInstaller epilogue markers; using last one.')
        archive = PyInstallerArchiveEpilogue(view, positions[-1], mode)
        for name, file in archive.files.items():
            if self.args.user_code:
                if file.type != PiType.USERCODE:
                    continue
                if name.startswith('pyiboot'):
                    continue
            yield self._pack(name, None, file.data, type=file.type.name)

    @classmethod
    def handles(cls, data: ByteString) -> Optional[bool]:
        return PyInstallerArchiveEpilogue.MagicSignature in data

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data):
    view = memoryview(data)
    positions = [m.start() for m in re.finditer(re.escape(PyInstallerArchiveEpilogue.MagicSignature), view)]
    mode = Unmarshal(min(2, int(self.args.unmarshal)))
    self.log_debug(F'unmarshal mode: {mode.name}')
    if not positions:
        raise LookupError('unable to find PyInstaller signature')
    if len(positions) > 2:
        # first position is expected to be the sentinel value in the unpacker stub
        width = max(len(F'{p:X}') for p in positions)
        for position in positions:
            self.log_info(F'magic signature found at offset 0x{position:0{width}X}')
        self.log_warn(F'found {len(positions) - 1} potential PyInstaller epilogue markers; using last one.')
    archive = PyInstallerArchiveEpilogue(view, positions[-1], mode)
    for name, file in archive.files.items():
        if self.args.user_code:
            if file.type != PiType.USERCODE:
                continue
            if name.startswith('pyiboot'):
                continue
        yield self._pack(name, None, file.data, type=file.type.name)

Inherited members