Module refinery.units.formats.archive.xtpyi
Expand source code Browse git
from __future__ import annotations
from typing import TYPE_CHECKING, NamedTuple
import marshal
import enum
import io
import re
import uuid
import zlib
import os
import os.path
import contextlib
import codecs
import dataclasses
from refinery.units.formats.archive import Arg, ArchiveUnit
from refinery.units.pattern.carve import carve
from refinery.units.formats.pym import Marshal
from refinery.lib.structures import MemoryFile, StreamDetour, Struct, StructReader
from refinery.lib.tools import NoLogging, normalize_word_separators
from refinery.lib.shared import xdis, decompyle3, uncompyle6
from Cryptodome.Cipher import AES
if TYPE_CHECKING:
from types import CodeType
from typing import cast, Callable, Dict, List, Tuple, Optional, Set, Union, Generator, Iterable
from xdis import Instruction
from refinery.lib.types import ByteStr
class Unmarshal(enum.IntEnum):
No = 0
Yes = 1
YesAndDecompile = 2
def version2tuple(version: str):
if m := re.fullmatch(R'^(\d+\.\d+(?:\.\d+)?)(.*)$', version):
return tuple(int(k, 10) for k in m.group(1).split('.'))
raise ValueError(version)
def decompress_peek(buffer, size=512) -> Optional[bytes]:
try:
return zlib.decompressobj().decompress(buffer[:size])
except zlib.error:
return None
class Code(NamedTuple):
version: Tuple[int]
timestamp: int
magic: int
container: CodeType
is_pypi: bool
code_objects: dict
def extract_code_from_buffer(buffer: ByteStr, file_name: Optional[str] = None) -> Generator[Code, None, None]:
code_objects = {}
file_name = file_name or '<unknown>'
load = xdis.load.load_module_from_file_object
with NoLogging(NoLogging.Mode.STD_ERR):
version, timestamp, magic_int, codes, is_pypy, _, _ = load(MemoryFile(buffer), file_name, code_objects)
if not isinstance(codes, list):
codes = [codes]
for code in codes:
yield Code(version, timestamp, magic_int, code, is_pypy, code_objects)
def disassemble_code(code: CodeType, version=None) -> Iterable[Instruction]:
opc = None
if version is not None:
if isinstance(version, float):
version = str(version)
if not isinstance(version, str):
version = xdis.version_info.version_tuple_to_str(version)
with contextlib.suppress(KeyError):
opc = xdis.op_imports.op_imports[version]
return xdis.std.Bytecode(code, opc=opc)
def decompile_buffer(buffer: Union[Code, ByteStr], file_name: Optional[str] = None) -> ByteStr:
errors = ''
python = ''
if not isinstance(buffer, Code):
codes = list(extract_code_from_buffer(buffer, file_name))
else:
codes = [buffer]
def _engines():
nonlocal errors
try:
dc = decompyle3.main.decompile
except ImportError:
errors += '# The decompiler decompyle3 is not installed.\n'
else:
yield 'decompyle3', dc
try:
dc = uncompyle6.main.decompile
except ImportError:
errors += '# The decompiler decompyle3 is not installed.\n'
else:
yield 'uncompyle6', dc
engines = dict(_engines())
if not engines:
errors += '# (all missing, install one of the above to enable decompilation)'
for code in codes:
for name, decompile in engines.items():
with io.StringIO(newline='') as output, NoLogging(NoLogging.Mode.ALL):
try:
decompile(
co=code.container,
bytecode_version=code.version,
out=output,
timestamp=code.timestamp,
code_objects=code.code_objects,
is_pypy=code.is_pypi,
magic_int=code.magic,
)
except Exception as E:
errors += '\n'.join(F'# {line}' for line in (
F'Error while decompiling with {name}:', *str(E).splitlines(True)))
errors += '\n'
else:
python = output.getvalue()
break
if python:
# removes leading comments
python = python.splitlines(True)
python.reverse()
while python[-1].strip().startswith('#'):
python.pop()
python.reverse()
python = ''.join(python)
return python.encode('utf8')
if not isinstance(buffer, Code):
embedded = bytes(buffer | carve('printable', single=True))
if len(buffer) - len(embedded) < 0x20:
return embedded
disassembly = MemoryFile()
with io.TextIOWrapper(disassembly, 'utf8', newline='\n') as output:
output.write(errors)
output.write('# Generating Disassembly:\n\n')
for code in codes:
instructions = list(disassemble_code(code.container, code.version))
width_offset = max(len(str(i.offset)) for i in instructions)
for i in instructions:
opname = normalize_word_separators(i.opname, '.').lower()
offset = F'{i.offset:0{width_offset}d}'
output.write(F'# {offset:>5} {opname:<25} {i.argrepr}\n')
output.write('\n')
return disassembly.getvalue()
class PiType(bytes, enum.Enum):
BINARY = B'b' # noqa / binary
DEPENDENCY = B'd' # noqa / runtime option
PYZ = B'z' # noqa / zlib (pyz) - frozen Python code
PACKAGE = B'M' # noqa / Python package (__init__.py)
MODULE = B'm' # noqa / Python module
SOURCE = B's' # noqa / Python script (v3)
DATA = B'x' # noqa / data
RUNTIME_OPTION = B'o' # noqa / runtime option
SPLASH = B'l' # noqa / splash resources
UNKNOWN = B'uk' # noqa
DECOMPILED = B'dc' # noqa
USERCODE = B'uc' # noqa
ENCRYPTED = B'ec' # noqa
class PzType(enum.IntEnum):
MODULE = 0
PKG = 1
DATA = 2
@dataclasses.dataclass
class PiMeta:
type: PiType
name: str
data: Union[Callable[[], ByteStr], ByteStr]
def unpack(self) -> ByteStr:
if callable(self.data):
self.data = self.data()
return self.data
def make_decompiled_item(name: str, data: ByteStr, *magics) -> PiMeta:
def extract(data=data, magics=magics):
error = None
if any(data[:4] == m[:4] for m in magics):
return decompile_buffer(data, name)
for magic in magics:
try:
return decompile_buffer(magic + data, name)
except Exception as exception:
error = exception
return '\n'.join(F'# {line}'
for line in str(error).splitlines(True)).encode('utf8')
return PiMeta(PiType.DECOMPILED, F'{name}.py', extract)
class PYZ(Struct):
MagicSignature = B'PYZ\0'
def __init__(self, reader: StructReader, version: str):
reader.bigendian = True
self.base = reader.tell()
signature = reader.read(4)
if signature != self.MagicSignature:
raise ValueError('invalid magic')
magic = bytes(reader.read(4))
with contextlib.suppress(KeyError, AttributeError):
version = xdis.magics.versions[magic]
vtuple = version2tuple(version)
padding_size = 4
if vtuple >= (3, 3):
padding_size += 4
if vtuple >= (3, 7):
padding_size += 4
self.version = version
self.magic = magic + padding_size * b'\0'
self.toc_offset = reader.i32()
self.reader = reader
self.entries: List[PiMeta] = []
def unpack(self, decompile: bool, key: Optional[bytes] = None) -> bool:
with StreamDetour(self.reader, self.base + self.toc_offset):
toc_data = self.reader.read()
try:
toc = marshal.loads(toc_data)
except Exception:
toc = Marshal(memoryview(toc_data)).object()
if isinstance(toc, list):
try:
toc = dict(toc)
except Exception as error:
self.entries = []
self.error = error
return False
if TYPE_CHECKING:
toc = cast(dict[str | bytes, tuple[int, int, int]], toc)
failures = 0
attempts = len(toc)
for name, (_pzt, offset, length) in toc.items():
if not isinstance(name, str):
name = codecs.decode(name, 'utf-8')
try:
pzt = PzType(_pzt)
except Exception:
pzt = PzType.DATA
name = name.replace('.', '/')
if pzt is PzType.PKG:
name = F'{name}/__init__'
with StreamDetour(self.reader, self.base + offset):
data = self.reader.read(length)
if key:
def decompressed(data=data):
cipher = AES.new(key, AES.MODE_CFB, bytes(data[:0x10]))
return zlib.decompress(cipher.decrypt(data[0x10:]))
elif decompress_peek(data):
def decompressed(data=data):
return zlib.decompress(data)
else:
failures += 1
continue
if decompile and pzt in (PzType.MODULE, PzType.PKG):
def decompiled(data=data, name=name, magic=self.magic):
data = decompressed(data)
if data[:4] != magic[:4]:
data = magic + data
return decompile_buffer(data, name)
self.entries.append(PiMeta(PiType.DECOMPILED, F'{name}.py', decompiled))
name = F'{name}.pyc'
type = PiType.SOURCE
else:
type = PiType.DATA
self.entries.append(PiMeta(type, name, decompressed))
if key:
if failures >= 6:
xtpyi.logger.warning(F'pyz decompression failed for {failures - 5} additional items')
return True
elif failures > 0.7 * attempts:
self.entries.clear()
return False
else:
return True
class PiTOCEntry(Struct):
def __init__(self, reader: StructReader):
reader.bigendian = True
entry_start_offset = reader.tell()
self.size_of_entry = reader.i32()
self.offset = reader.i32()
self.size_of_compressed_data = reader.i32()
self.size_od_uncompressed_data = reader.i32()
self.is_compressed = bool(reader.read_byte())
entry_type = bytes(reader.read(1))
name_length = self.size_of_entry - reader.tell() + entry_start_offset
if name_length > 0x1000:
raise RuntimeError(F'Refusing to process TOC entry with name of size {name_length}.')
name, *_ = bytes(reader.read(name_length)).partition(B'\0')
try:
name = name.decode('utf8', 'backslashreplace')
except Exception:
name = str(uuid.uuid4())
else:
if not all(part.isprintable() for part in re.split('\\s*', name)):
raise RuntimeError('Refusing to process TOC entry with non-printable name.')
if entry_type == B'Z':
entry_type = B'z'
try:
self.type = PiType(entry_type)
except ValueError:
xtpyi.log_warn(F'unknown type {entry_type!r} in field {name}')
self.type = PiType.UNKNOWN
self.name = name
def __hash__(self):
return hash(self.name)
class PyInstallerArchiveEpilogue(Struct):
MagicSignature = bytes.fromhex('4D45490C0B0A0B0E')
def _read_libname(self, reader: StructReader) -> Optional[str]:
position = reader.tell()
try:
libname, t, rest = reader.read_bytes(64).partition(B'\0')
except EOFError:
reader.seekset(position)
return None
try:
libname = libname.decode('utf8')
except Exception:
reader.seekset(position)
return None
if not t or any(rest) or len(rest) < 10 or not re.fullmatch(R'[\s!-~]+', libname):
reader.seekset(position)
return None
return libname
def __init__(self, reader: StructReader, offset: int, unmarshal: Unmarshal = Unmarshal.No, decompile: bool = False):
self.decompile = decompile
reader.bigendian = True
reader.seekset(offset)
self.reader = reader
signature = reader.read_bytes(8)
if signature != self.MagicSignature:
raise ValueError(
F'offset 0x{offset:X} has invalid signature {signature.hex().upper()}; '
F'should be {self.MagicSignature.hex().upper()}')
self.size = reader.i32()
toc_offset = reader.i32()
toc_length = reader.i32()
self.py_version = '.'.join(str(reader.u32()))
self.py_libname = self._read_libname(reader)
self.offset = reader.tell() - self.size
self.toc: Dict[str, PiTOCEntry] = {}
toc_end = self.offset + toc_offset + toc_length
reader.seekset(self.offset + toc_offset)
while reader.tell() < toc_end:
try:
entry = PiTOCEntry(reader)
except EOFError:
xtpyi.logger.warning('end of file while reading TOC')
break
except Exception as error:
xtpyi.logger.warning(F'unexpected error while reading TOC: {error!s}')
break
if entry.name in self.toc:
raise KeyError(F'duplicate name {entry.name}')
self.toc[entry.name] = entry
self.files: Dict[str, PiMeta] = {}
no_pyz_found = True
pyz_entries: Dict[str, PYZ] = {}
for entry in list(self.toc.values()):
if entry.type is not PiType.PYZ:
continue
no_pyz_found = False
name, xt = os.path.splitext(entry.name)
name_pyz = F'{name}.pyz'
if name == entry.name:
del self.toc[name]
self.toc[name_pyz] = entry
entry.name = name_pyz
reader.seekset(self.offset + entry.offset)
if entry.is_compressed:
data = self.extract(entry.name).unpack()
else:
data = reader
pyz_entries[name] = PYZ(data, self.py_version)
magics = {pyz.magic for pyz in pyz_entries.values()}
if not magics:
if not no_pyz_found:
xtpyi.logger.warning(
'no magic signature could be recovered from embedded pyzip archives; this is '
'unsual and means that there is no way to guess the missing magic for source '
'file entries and it will likely not be possible to decompile them.')
return
elif len(magics) > 1:
xtpyi.logger.warning('more than one magic signature was recovered; this is unusual.')
magics = list(magics)
keys: Set[bytes] = set()
for entry in self.toc.values():
extracted = self.extract(entry.name)
if entry.type not in (PiType.SOURCE, PiType.MODULE):
self.files[entry.name] = extracted
continue
data = extracted.unpack()
name, _ = os.path.splitext(extracted.name)
del self.files[extracted.name]
extracted.name = F'{name}.pyc'
self.files[extracted.name] = extracted
is_crypto_key = name.endswith('crypto_key')
if len(magics) == 1 and data[:4] != magics[0][:4]:
extracted.data = magics[0] + data
if is_crypto_key or self.decompile:
decompiled = make_decompiled_item(name, data, *magics)
if entry.type is PiType.SOURCE:
decompiled.type = PiType.USERCODE
self.files[decompiled.name] = decompiled
if is_crypto_key:
for key in decompiled.unpack() | carve('string', decode=True):
if len(key) != 0x10:
continue
xtpyi.logger.info(F'found key: {key.decode(xtpyi.codec)}')
keys.add(key)
if unmarshal is Unmarshal.No:
return
if not keys:
key = None
else:
key = next(iter(keys))
for name, pyz in pyz_entries.items():
pyz.unpack(unmarshal is Unmarshal.YesAndDecompile, key)
for unpacked in pyz.entries:
unpacked.name = path = F'{name}/{unpacked.name}'
if path in self.files:
raise ValueError(F'duplicate file name: {path}')
self.files[path] = unpacked
def extract(self, name: str) -> PiMeta:
try:
return self.files[name]
except KeyError:
pass
entry = self.toc[name]
with StreamDetour(self.reader, self.offset + entry.offset):
data = self.reader.read(entry.size_of_compressed_data)
if entry.is_compressed:
def extracted(d=data): return zlib.decompress(d)
else:
extracted = data
result = PiMeta(entry.type, name, extracted)
self.files[name] = result
return result
class xtpyi(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'):
"""
Extracts and decompiles files from a Python Installer (aka PyInstaller) archive.
"""
def __init__(
self, *paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False,
path=b'path', date=b'date',
decompile: Arg.Switch('-c', help='Attempt to decompile PYC files.') = False,
user_code: Arg.Switch('-u', group='FILTER', help=(
'Extract only source code files from the root of the archive. These usually implement '
'the actual domain logic. This implies the --decompile option.')) = False,
unmarshal: Arg('-y', action='count', group='FILTER', help=(
'(DANGEROUS) Unmarshal embedded PYZ archives. Warning: Maliciously crafted packages can '
'potentially exploit this to execute code. It is advised to only use this option inside '
'an isolated environment. Specify twice to decompile unmarshalled Python bytecode.'
)) = 0
):
super().__init__(
*paths,
list=list,
join_path=join_path,
drop_path=drop_path,
fuzzy=fuzzy,
exact=exact,
regex=regex,
path=path,
date=date,
decompile=decompile,
unmarshal=unmarshal,
user_code=user_code,
)
def unpack(self, data):
view = memoryview(data)
positions = [m.start() for m in re.finditer(re.escape(PyInstallerArchiveEpilogue.MagicSignature), view)]
mode = Unmarshal(min(2, int(self.args.unmarshal)))
self.log_debug(F'unmarshal mode: {mode.name}')
if not positions:
raise LookupError('unable to find PyInstaller signature')
if len(positions) > 2:
# first position is expected to be the sentinel value in the unpacker stub
width = max(len(F'{p:X}') for p in positions)
for position in positions:
self.log_info(F'magic signature found at offset 0x{position:0{width}X}')
self.log_warn(F'found {len(positions) - 1} potential PyInstaller epilogue markers; using last one.')
decompile = self.args.decompile
uc_target = PiType.USERCODE if decompile else PiType.SOURCE
archive = PyInstallerArchiveEpilogue(view, positions[-1], mode, decompile)
for name, file in archive.files.items():
if self.args.user_code:
if file.type != uc_target:
continue
if name.startswith('pyiboot'):
continue
yield self._pack(name, None, file.data, type=file.type.name)
@classmethod
def handles(cls, data: ByteStr) -> Optional[bool]:
return PyInstallerArchiveEpilogue.MagicSignature in data
Functions
def version2tuple(version)
-
Expand source code Browse git
def version2tuple(version: str): if m := re.fullmatch(R'^(\d+\.\d+(?:\.\d+)?)(.*)$', version): return tuple(int(k, 10) for k in m.group(1).split('.')) raise ValueError(version)
def decompress_peek(buffer, size=512)
-
Expand source code Browse git
def decompress_peek(buffer, size=512) -> Optional[bytes]: try: return zlib.decompressobj().decompress(buffer[:size]) except zlib.error: return None
def extract_code_from_buffer(buffer, file_name=None)
-
Expand source code Browse git
def extract_code_from_buffer(buffer: ByteStr, file_name: Optional[str] = None) -> Generator[Code, None, None]: code_objects = {} file_name = file_name or '<unknown>' load = xdis.load.load_module_from_file_object with NoLogging(NoLogging.Mode.STD_ERR): version, timestamp, magic_int, codes, is_pypy, _, _ = load(MemoryFile(buffer), file_name, code_objects) if not isinstance(codes, list): codes = [codes] for code in codes: yield Code(version, timestamp, magic_int, code, is_pypy, code_objects)
def disassemble_code(code, version=None)
-
Expand source code Browse git
def disassemble_code(code: CodeType, version=None) -> Iterable[Instruction]: opc = None if version is not None: if isinstance(version, float): version = str(version) if not isinstance(version, str): version = xdis.version_info.version_tuple_to_str(version) with contextlib.suppress(KeyError): opc = xdis.op_imports.op_imports[version] return xdis.std.Bytecode(code, opc=opc)
def decompile_buffer(buffer, file_name=None)
-
Expand source code Browse git
def decompile_buffer(buffer: Union[Code, ByteStr], file_name: Optional[str] = None) -> ByteStr: errors = '' python = '' if not isinstance(buffer, Code): codes = list(extract_code_from_buffer(buffer, file_name)) else: codes = [buffer] def _engines(): nonlocal errors try: dc = decompyle3.main.decompile except ImportError: errors += '# The decompiler decompyle3 is not installed.\n' else: yield 'decompyle3', dc try: dc = uncompyle6.main.decompile except ImportError: errors += '# The decompiler decompyle3 is not installed.\n' else: yield 'uncompyle6', dc engines = dict(_engines()) if not engines: errors += '# (all missing, install one of the above to enable decompilation)' for code in codes: for name, decompile in engines.items(): with io.StringIO(newline='') as output, NoLogging(NoLogging.Mode.ALL): try: decompile( co=code.container, bytecode_version=code.version, out=output, timestamp=code.timestamp, code_objects=code.code_objects, is_pypy=code.is_pypi, magic_int=code.magic, ) except Exception as E: errors += '\n'.join(F'# {line}' for line in ( F'Error while decompiling with {name}:', *str(E).splitlines(True))) errors += '\n' else: python = output.getvalue() break if python: # removes leading comments python = python.splitlines(True) python.reverse() while python[-1].strip().startswith('#'): python.pop() python.reverse() python = ''.join(python) return python.encode('utf8') if not isinstance(buffer, Code): embedded = bytes(buffer | carve('printable', single=True)) if len(buffer) - len(embedded) < 0x20: return embedded disassembly = MemoryFile() with io.TextIOWrapper(disassembly, 'utf8', newline='\n') as output: output.write(errors) output.write('# Generating Disassembly:\n\n') for code in codes: instructions = list(disassemble_code(code.container, code.version)) width_offset = max(len(str(i.offset)) for i in instructions) for i in instructions: opname = normalize_word_separators(i.opname, '.').lower() offset = F'{i.offset:0{width_offset}d}' output.write(F'# {offset:>5} {opname:<25} {i.argrepr}\n') output.write('\n') return disassembly.getvalue()
def make_decompiled_item(name, data, *magics)
-
Expand source code Browse git
def make_decompiled_item(name: str, data: ByteStr, *magics) -> PiMeta: def extract(data=data, magics=magics): error = None if any(data[:4] == m[:4] for m in magics): return decompile_buffer(data, name) for magic in magics: try: return decompile_buffer(magic + data, name) except Exception as exception: error = exception return '\n'.join(F'# {line}' for line in str(error).splitlines(True)).encode('utf8') return PiMeta(PiType.DECOMPILED, F'{name}.py', extract)
Classes
class Unmarshal (*args, **kwds)
-
Enum where members are also (and must be) ints
Expand source code Browse git
class Unmarshal(enum.IntEnum): No = 0 Yes = 1 YesAndDecompile = 2
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var No
var Yes
var YesAndDecompile
class Code (version, timestamp, magic, container, is_pypi, code_objects)
-
Code(version, timestamp, magic, container, is_pypi, code_objects)
Expand source code Browse git
class Code(NamedTuple): version: Tuple[int] timestamp: int magic: int container: CodeType is_pypi: bool code_objects: dict
Ancestors
- builtins.tuple
Instance variables
var version
-
Alias for field number 0
Expand source code Browse git
class Code(NamedTuple): version: Tuple[int] timestamp: int magic: int container: CodeType is_pypi: bool code_objects: dict
var timestamp
-
Alias for field number 1
Expand source code Browse git
class Code(NamedTuple): version: Tuple[int] timestamp: int magic: int container: CodeType is_pypi: bool code_objects: dict
var magic
-
Alias for field number 2
Expand source code Browse git
class Code(NamedTuple): version: Tuple[int] timestamp: int magic: int container: CodeType is_pypi: bool code_objects: dict
var container
-
Alias for field number 3
Expand source code Browse git
class Code(NamedTuple): version: Tuple[int] timestamp: int magic: int container: CodeType is_pypi: bool code_objects: dict
var is_pypi
-
Alias for field number 4
Expand source code Browse git
class Code(NamedTuple): version: Tuple[int] timestamp: int magic: int container: CodeType is_pypi: bool code_objects: dict
var code_objects
-
Alias for field number 5
Expand source code Browse git
class Code(NamedTuple): version: Tuple[int] timestamp: int magic: int container: CodeType is_pypi: bool code_objects: dict
class PiType (*args, **kwds)
-
bytes(iterable_of_ints) -> bytes bytes(string, encoding[, errors]) -> bytes bytes(bytes_or_buffer) -> immutable copy of bytes_or_buffer bytes(int) -> bytes object of size given by the parameter initialized with null bytes bytes() -> empty bytes object
Construct an immutable array of bytes from: - an iterable yielding integers in range(256) - a text string encoded using the specified encoding - any object implementing the buffer API. - an integer
Expand source code Browse git
class PiType(bytes, enum.Enum): BINARY = B'b' # noqa / binary DEPENDENCY = B'd' # noqa / runtime option PYZ = B'z' # noqa / zlib (pyz) - frozen Python code PACKAGE = B'M' # noqa / Python package (__init__.py) MODULE = B'm' # noqa / Python module SOURCE = B's' # noqa / Python script (v3) DATA = B'x' # noqa / data RUNTIME_OPTION = B'o' # noqa / runtime option SPLASH = B'l' # noqa / splash resources UNKNOWN = B'uk' # noqa DECOMPILED = B'dc' # noqa USERCODE = B'uc' # noqa ENCRYPTED = B'ec' # noqa
Ancestors
- builtins.bytes
- enum.Enum
Class variables
var BINARY
var DEPENDENCY
var PYZ
var PACKAGE
var MODULE
var SOURCE
var DATA
var RUNTIME_OPTION
var SPLASH
var UNKNOWN
var DECOMPILED
var USERCODE
var ENCRYPTED
class PzType (*args, **kwds)
-
Enum where members are also (and must be) ints
Expand source code Browse git
class PzType(enum.IntEnum): MODULE = 0 PKG = 1 DATA = 2
Ancestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var MODULE
var PKG
var DATA
class PiMeta (type, name, data)
-
PiMeta(type: 'PiType', name: 'str', data: 'Union[Callable[[], ByteStr], ByteStr]')
Expand source code Browse git
@dataclasses.dataclass class PiMeta: type: PiType name: str data: Union[Callable[[], ByteStr], ByteStr] def unpack(self) -> ByteStr: if callable(self.data): self.data = self.data() return self.data
Instance variables
var type
var name
var data
Methods
def unpack(self)
-
Expand source code Browse git
def unpack(self) -> ByteStr: if callable(self.data): self.data = self.data() return self.data
class PYZ (reader, version)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class PYZ(Struct): MagicSignature = B'PYZ\0' def __init__(self, reader: StructReader, version: str): reader.bigendian = True self.base = reader.tell() signature = reader.read(4) if signature != self.MagicSignature: raise ValueError('invalid magic') magic = bytes(reader.read(4)) with contextlib.suppress(KeyError, AttributeError): version = xdis.magics.versions[magic] vtuple = version2tuple(version) padding_size = 4 if vtuple >= (3, 3): padding_size += 4 if vtuple >= (3, 7): padding_size += 4 self.version = version self.magic = magic + padding_size * b'\0' self.toc_offset = reader.i32() self.reader = reader self.entries: List[PiMeta] = [] def unpack(self, decompile: bool, key: Optional[bytes] = None) -> bool: with StreamDetour(self.reader, self.base + self.toc_offset): toc_data = self.reader.read() try: toc = marshal.loads(toc_data) except Exception: toc = Marshal(memoryview(toc_data)).object() if isinstance(toc, list): try: toc = dict(toc) except Exception as error: self.entries = [] self.error = error return False if TYPE_CHECKING: toc = cast(dict[str | bytes, tuple[int, int, int]], toc) failures = 0 attempts = len(toc) for name, (_pzt, offset, length) in toc.items(): if not isinstance(name, str): name = codecs.decode(name, 'utf-8') try: pzt = PzType(_pzt) except Exception: pzt = PzType.DATA name = name.replace('.', '/') if pzt is PzType.PKG: name = F'{name}/__init__' with StreamDetour(self.reader, self.base + offset): data = self.reader.read(length) if key: def decompressed(data=data): cipher = AES.new(key, AES.MODE_CFB, bytes(data[:0x10])) return zlib.decompress(cipher.decrypt(data[0x10:])) elif decompress_peek(data): def decompressed(data=data): return zlib.decompress(data) else: failures += 1 continue if decompile and pzt in (PzType.MODULE, PzType.PKG): def decompiled(data=data, name=name, magic=self.magic): data = decompressed(data) if data[:4] != magic[:4]: data = magic + data return decompile_buffer(data, name) self.entries.append(PiMeta(PiType.DECOMPILED, F'{name}.py', decompiled)) name = F'{name}.pyc' type = PiType.SOURCE else: type = PiType.DATA self.entries.append(PiMeta(type, name, decompressed)) if key: if failures >= 6: xtpyi.logger.warning(F'pyz decompression failed for {failures - 5} additional items') return True elif failures > 0.7 * attempts: self.entries.clear() return False else: return True
Ancestors
Class variables
var MagicSignature
Methods
def unpack(self, decompile, key=None)
-
Expand source code Browse git
def unpack(self, decompile: bool, key: Optional[bytes] = None) -> bool: with StreamDetour(self.reader, self.base + self.toc_offset): toc_data = self.reader.read() try: toc = marshal.loads(toc_data) except Exception: toc = Marshal(memoryview(toc_data)).object() if isinstance(toc, list): try: toc = dict(toc) except Exception as error: self.entries = [] self.error = error return False if TYPE_CHECKING: toc = cast(dict[str | bytes, tuple[int, int, int]], toc) failures = 0 attempts = len(toc) for name, (_pzt, offset, length) in toc.items(): if not isinstance(name, str): name = codecs.decode(name, 'utf-8') try: pzt = PzType(_pzt) except Exception: pzt = PzType.DATA name = name.replace('.', '/') if pzt is PzType.PKG: name = F'{name}/__init__' with StreamDetour(self.reader, self.base + offset): data = self.reader.read(length) if key: def decompressed(data=data): cipher = AES.new(key, AES.MODE_CFB, bytes(data[:0x10])) return zlib.decompress(cipher.decrypt(data[0x10:])) elif decompress_peek(data): def decompressed(data=data): return zlib.decompress(data) else: failures += 1 continue if decompile and pzt in (PzType.MODULE, PzType.PKG): def decompiled(data=data, name=name, magic=self.magic): data = decompressed(data) if data[:4] != magic[:4]: data = magic + data return decompile_buffer(data, name) self.entries.append(PiMeta(PiType.DECOMPILED, F'{name}.py', decompiled)) name = F'{name}.pyc' type = PiType.SOURCE else: type = PiType.DATA self.entries.append(PiMeta(type, name, decompressed)) if key: if failures >= 6: xtpyi.logger.warning(F'pyz decompression failed for {failures - 5} additional items') return True elif failures > 0.7 * attempts: self.entries.clear() return False else: return True
class PiTOCEntry (reader)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class PiTOCEntry(Struct): def __init__(self, reader: StructReader): reader.bigendian = True entry_start_offset = reader.tell() self.size_of_entry = reader.i32() self.offset = reader.i32() self.size_of_compressed_data = reader.i32() self.size_od_uncompressed_data = reader.i32() self.is_compressed = bool(reader.read_byte()) entry_type = bytes(reader.read(1)) name_length = self.size_of_entry - reader.tell() + entry_start_offset if name_length > 0x1000: raise RuntimeError(F'Refusing to process TOC entry with name of size {name_length}.') name, *_ = bytes(reader.read(name_length)).partition(B'\0') try: name = name.decode('utf8', 'backslashreplace') except Exception: name = str(uuid.uuid4()) else: if not all(part.isprintable() for part in re.split('\\s*', name)): raise RuntimeError('Refusing to process TOC entry with non-printable name.') if entry_type == B'Z': entry_type = B'z' try: self.type = PiType(entry_type) except ValueError: xtpyi.log_warn(F'unknown type {entry_type!r} in field {name}') self.type = PiType.UNKNOWN self.name = name def __hash__(self): return hash(self.name)
Ancestors
class PyInstallerArchiveEpilogue (reader, offset, unmarshal=0, decompile=False)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class PyInstallerArchiveEpilogue(Struct): MagicSignature = bytes.fromhex('4D45490C0B0A0B0E') def _read_libname(self, reader: StructReader) -> Optional[str]: position = reader.tell() try: libname, t, rest = reader.read_bytes(64).partition(B'\0') except EOFError: reader.seekset(position) return None try: libname = libname.decode('utf8') except Exception: reader.seekset(position) return None if not t or any(rest) or len(rest) < 10 or not re.fullmatch(R'[\s!-~]+', libname): reader.seekset(position) return None return libname def __init__(self, reader: StructReader, offset: int, unmarshal: Unmarshal = Unmarshal.No, decompile: bool = False): self.decompile = decompile reader.bigendian = True reader.seekset(offset) self.reader = reader signature = reader.read_bytes(8) if signature != self.MagicSignature: raise ValueError( F'offset 0x{offset:X} has invalid signature {signature.hex().upper()}; ' F'should be {self.MagicSignature.hex().upper()}') self.size = reader.i32() toc_offset = reader.i32() toc_length = reader.i32() self.py_version = '.'.join(str(reader.u32())) self.py_libname = self._read_libname(reader) self.offset = reader.tell() - self.size self.toc: Dict[str, PiTOCEntry] = {} toc_end = self.offset + toc_offset + toc_length reader.seekset(self.offset + toc_offset) while reader.tell() < toc_end: try: entry = PiTOCEntry(reader) except EOFError: xtpyi.logger.warning('end of file while reading TOC') break except Exception as error: xtpyi.logger.warning(F'unexpected error while reading TOC: {error!s}') break if entry.name in self.toc: raise KeyError(F'duplicate name {entry.name}') self.toc[entry.name] = entry self.files: Dict[str, PiMeta] = {} no_pyz_found = True pyz_entries: Dict[str, PYZ] = {} for entry in list(self.toc.values()): if entry.type is not PiType.PYZ: continue no_pyz_found = False name, xt = os.path.splitext(entry.name) name_pyz = F'{name}.pyz' if name == entry.name: del self.toc[name] self.toc[name_pyz] = entry entry.name = name_pyz reader.seekset(self.offset + entry.offset) if entry.is_compressed: data = self.extract(entry.name).unpack() else: data = reader pyz_entries[name] = PYZ(data, self.py_version) magics = {pyz.magic for pyz in pyz_entries.values()} if not magics: if not no_pyz_found: xtpyi.logger.warning( 'no magic signature could be recovered from embedded pyzip archives; this is ' 'unsual and means that there is no way to guess the missing magic for source ' 'file entries and it will likely not be possible to decompile them.') return elif len(magics) > 1: xtpyi.logger.warning('more than one magic signature was recovered; this is unusual.') magics = list(magics) keys: Set[bytes] = set() for entry in self.toc.values(): extracted = self.extract(entry.name) if entry.type not in (PiType.SOURCE, PiType.MODULE): self.files[entry.name] = extracted continue data = extracted.unpack() name, _ = os.path.splitext(extracted.name) del self.files[extracted.name] extracted.name = F'{name}.pyc' self.files[extracted.name] = extracted is_crypto_key = name.endswith('crypto_key') if len(magics) == 1 and data[:4] != magics[0][:4]: extracted.data = magics[0] + data if is_crypto_key or self.decompile: decompiled = make_decompiled_item(name, data, *magics) if entry.type is PiType.SOURCE: decompiled.type = PiType.USERCODE self.files[decompiled.name] = decompiled if is_crypto_key: for key in decompiled.unpack() | carve('string', decode=True): if len(key) != 0x10: continue xtpyi.logger.info(F'found key: {key.decode(xtpyi.codec)}') keys.add(key) if unmarshal is Unmarshal.No: return if not keys: key = None else: key = next(iter(keys)) for name, pyz in pyz_entries.items(): pyz.unpack(unmarshal is Unmarshal.YesAndDecompile, key) for unpacked in pyz.entries: unpacked.name = path = F'{name}/{unpacked.name}' if path in self.files: raise ValueError(F'duplicate file name: {path}') self.files[path] = unpacked def extract(self, name: str) -> PiMeta: try: return self.files[name] except KeyError: pass entry = self.toc[name] with StreamDetour(self.reader, self.offset + entry.offset): data = self.reader.read(entry.size_of_compressed_data) if entry.is_compressed: def extracted(d=data): return zlib.decompress(d) else: extracted = data result = PiMeta(entry.type, name, extracted) self.files[name] = result return result
Ancestors
Class variables
var MagicSignature
Methods
def extract(self, name)
-
Expand source code Browse git
def extract(self, name: str) -> PiMeta: try: return self.files[name] except KeyError: pass entry = self.toc[name] with StreamDetour(self.reader, self.offset + entry.offset): data = self.reader.read(entry.size_of_compressed_data) if entry.is_compressed: def extracted(d=data): return zlib.decompress(d) else: extracted = data result = PiMeta(entry.type, name, extracted) self.files[name] = result return result
class xtpyi (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', decompile=False, user_code=False, unmarshal=0)
-
Extracts and decompiles files from a Python Installer (aka PyInstaller) archive. This unit is a path extractor which extracts data from a hierarchical structure. Each extracted item is emitted as a separate chunk and has attached to it a meta variable that contains its path within the source structure. The positional arguments to the command are patterns that can be used to filter the extracted items by their path. To view only the paths of all chunks, use the listing switch:
emit something | xtpyi --list
Otherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:
emit something | xtpyi [| dump {path} ]
Expand source code Browse git
class xtpyi(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'): """ Extracts and decompiles files from a Python Installer (aka PyInstaller) archive. """ def __init__( self, *paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', decompile: Arg.Switch('-c', help='Attempt to decompile PYC files.') = False, user_code: Arg.Switch('-u', group='FILTER', help=( 'Extract only source code files from the root of the archive. These usually implement ' 'the actual domain logic. This implies the --decompile option.')) = False, unmarshal: Arg('-y', action='count', group='FILTER', help=( '(DANGEROUS) Unmarshal embedded PYZ archives. Warning: Maliciously crafted packages can ' 'potentially exploit this to execute code. It is advised to only use this option inside ' 'an isolated environment. Specify twice to decompile unmarshalled Python bytecode.' )) = 0 ): super().__init__( *paths, list=list, join_path=join_path, drop_path=drop_path, fuzzy=fuzzy, exact=exact, regex=regex, path=path, date=date, decompile=decompile, unmarshal=unmarshal, user_code=user_code, ) def unpack(self, data): view = memoryview(data) positions = [m.start() for m in re.finditer(re.escape(PyInstallerArchiveEpilogue.MagicSignature), view)] mode = Unmarshal(min(2, int(self.args.unmarshal))) self.log_debug(F'unmarshal mode: {mode.name}') if not positions: raise LookupError('unable to find PyInstaller signature') if len(positions) > 2: # first position is expected to be the sentinel value in the unpacker stub width = max(len(F'{p:X}') for p in positions) for position in positions: self.log_info(F'magic signature found at offset 0x{position:0{width}X}') self.log_warn(F'found {len(positions) - 1} potential PyInstaller epilogue markers; using last one.') decompile = self.args.decompile uc_target = PiType.USERCODE if decompile else PiType.SOURCE archive = PyInstallerArchiveEpilogue(view, positions[-1], mode, decompile) for name, file in archive.files.items(): if self.args.user_code: if file.type != uc_target: continue if name.startswith('pyiboot'): continue yield self._pack(name, None, file.data, type=file.type.name) @classmethod def handles(cls, data: ByteStr) -> Optional[bool]: return PyInstallerArchiveEpilogue.MagicSignature in data
Ancestors
Subclasses
Class variables
var required_dependencies
var optional_dependencies
var console
var reverse
Methods
def unpack(self, data)
-
Expand source code Browse git
def unpack(self, data): view = memoryview(data) positions = [m.start() for m in re.finditer(re.escape(PyInstallerArchiveEpilogue.MagicSignature), view)] mode = Unmarshal(min(2, int(self.args.unmarshal))) self.log_debug(F'unmarshal mode: {mode.name}') if not positions: raise LookupError('unable to find PyInstaller signature') if len(positions) > 2: # first position is expected to be the sentinel value in the unpacker stub width = max(len(F'{p:X}') for p in positions) for position in positions: self.log_info(F'magic signature found at offset 0x{position:0{width}X}') self.log_warn(F'found {len(positions) - 1} potential PyInstaller epilogue markers; using last one.') decompile = self.args.decompile uc_target = PiType.USERCODE if decompile else PiType.SOURCE archive = PyInstallerArchiveEpilogue(view, positions[-1], mode, decompile) for name, file in archive.files.items(): if self.args.user_code: if file.type != uc_target: continue if name.startswith('pyiboot'): continue yield self._pack(name, None, file.data, type=file.type.name)
Inherited members