Module refinery.units.formats.archive.xtpyi
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import ByteString, Callable, Dict, List, Optional, Set, Union, NamedTuple, Generator
from types import CodeType
import marshal
import enum
import io
import re
import uuid
import zlib
import os
import os.path
import contextlib
import dataclasses
import sys
from importlib.util import MAGIC_NUMBER
from refinery.units.formats.archive import Arg, ArchiveUnit
from refinery.units.pattern.carve import carve
from refinery.lib.structures import EOF, MemoryFile, StreamDetour, Struct, StructReader
from refinery.lib.tools import NoLogging, normalize_word_separators
from Cryptodome.Cipher import AES
class Unmarshal(enum.IntEnum):
No = 0
Yes = 1
YesAndDecompile = 2
def version2tuple(version: str):
return tuple(int(k, 10) for k in re.fullmatch(R'^(\d+\.\d+(?:\.\d+)?)(.*)$', version).group(1).split('.'))
def decompress_peek(buffer, size=512) -> Optional[bytes]:
try:
return zlib.decompressobj().decompress(buffer[:size])
except zlib.error:
return None
class Code(NamedTuple):
version: float
timestamp: int
magic: int
container: CodeType
is_pypi: bool
code_objects: dict
def extract_code_from_buffer(buffer: ByteString, file_name: Optional[str] = None) -> Generator[Code, None, None]:
main: xtpyi = xtpyi
code_objects = {}
sys_stderr = sys.stderr
sys.stderr = open(os.devnull, 'w')
file_name = file_name or '<unknown>'
try:
version, timestamp, magic_int, codes, is_pypy, _, _ = \
main._xdis.load.load_module_from_file_object(MemoryFile(buffer), file_name, code_objects)
finally:
sys.stderr.close()
sys.stderr = sys_stderr
if not isinstance(codes, list):
codes = [codes]
for code in codes:
yield Code(version, timestamp, magic_int, code, is_pypy, code_objects)
def decompile_buffer(buffer: Union[Code, ByteString], file_name: Optional[str] = None) -> ByteString:
main: xtpyi = xtpyi
errors = ''
python = ''
codes = [buffer]
if not isinstance(buffer, Code):
codes = list(extract_code_from_buffer(buffer, file_name))
for code in codes:
engines = {}
for e in ['decompyle3', 'uncompyle6']:
try:
dc = getattr(main, F'_{e}')
if isinstance(dc, property):
dc = dc.fget()
except ImportError:
errors += F'# The decompiler {dc} is not installed.\n'
else:
engines['decompyle3'] = dc
if not engines:
errors += '# (all missing, install one of the above to enable decompilation)'
for name, engine in engines.items():
with io.StringIO(newline='') as output, NoLogging(NoLogging.Mode.ALL):
try:
engine.main.decompile(
co=code.container,
bytecode_version=code.version,
out=output,
timestamp=code.timestamp,
code_objects=code.code_objects,
is_pypy=code.is_pypi,
magic_int=code.magic,
)
except Exception as E:
errors += '\n'.join(F'# {line}' for line in (
F'Error while decompiling with {name}:', *str(E).splitlines(True)))
errors += '\n'
else:
python = output.getvalue()
break
if python:
# removes leading comments
python = python.splitlines(True)
python.reverse()
while python[-1].strip().startswith('#'):
python.pop()
python.reverse()
python = ''.join(python)
return python.encode(main.codec)
if not isinstance(buffer, Code):
embedded = bytes(buffer | carve('printable', single=True))
if len(code) - len(embedded) < 0x20:
return embedded
disassembly = MemoryFile()
with io.TextIOWrapper(disassembly, main.codec, newline='\n') as output:
output.write(errors)
output.write('# Generating Disassembly:\n\n')
for code in codes:
instructions = list(main._xdis.std.Bytecode(code.container))
width_offset = max(len(str(i.offset)) for i in instructions)
for i in instructions:
opname = normalize_word_separators(i.opname, '.').lower()
offset = F'{i.offset:0{width_offset}d}'
output.write(F'# {offset:>5} {opname:<25} {i.argrepr}\n')
output.write('\n')
return disassembly.getbuffer()
class PiType(bytes, enum.Enum):
BINARY = B'b' # noqa / binary
DEPENDENCY = B'd' # noqa / runtime option
PYZ = B'z' # noqa / zlib (pyz) - frozen Python code
PACKAGE = B'M' # noqa / Python package (__init__.py)
MODULE = B'm' # noqa / Python module
SOURCE = B's' # noqa / Python script (v3)
DATA = B'x' # noqa / data
RUNTIME_OPTION = B'o' # noqa / runtime option
SPLASH = B'l' # noqa / splash resources
UNKNOWN = B'uk' # noqa
DECOMPILED = B'dc' # noqa
USERCODE = B'uc' # noqa
ENCRYPTED = B'ec' # noqa
class PzType(enum.IntEnum):
MODULE = 0
PKG = 1
DATA = 2
@dataclasses.dataclass
class PiMeta:
type: PiType
name: str
data: Union[Callable[[], ByteString], ByteString]
def unpack(self) -> ByteString:
if callable(self.data):
self.data = self.data()
return self.data
def make_decompiled_item(name: str, data: ByteString, *magics) -> PiMeta:
def extract(data=data, magics=magics):
error = None
if any(data[:4] == m[:4] for m in magics):
return decompile_buffer(data, name)
for magic in magics:
try:
return decompile_buffer(magic + data, name)
except Exception as exception:
error = exception
return '\n'.join(F'# {line}'
for line in str(error).splitlines(True)).encode(xtpyi.codec)
return PiMeta(PiType.DECOMPILED, F'{name}.py', extract)
class PYZ(Struct):
MagicSignature = B'PYZ\0'
def __init__(self, reader: StructReader, version: str):
reader.bigendian = True
self.base = reader.tell()
signature = reader.read(4)
if signature != self.MagicSignature:
raise ValueError('invalid magic')
magic = bytes(reader.read(4))
with contextlib.suppress(KeyError, AttributeError):
xdis = xtpyi._xdis
if isinstance(xdis, property):
xdis = xdis.fget()
version = xdis.magics.versions[magic]
vtuple = version2tuple(version)
padding_size = 4
if vtuple >= (3, 3):
padding_size += 4
if vtuple >= (3, 7):
padding_size += 4
self.version = version
self.magic = magic + padding_size * b'\0'
self.toc_offset = reader.i32()
self.reader = reader
self.entries: List[PiMeta] = []
def unpack(self, decompile: bool, key: Optional[bytes] = None) -> bool:
with StreamDetour(self.reader, self.base + self.toc_offset):
toc_data = self.reader.read()
try:
toc = marshal.loads(toc_data)
except Exception as error:
if MAGIC_NUMBER != self.magic[:4]:
xdis = xtpyi._xdis
if isinstance(xdis, property):
xdis = xdis.fget()
_ord = xdis.marsh.Ord
xdis.marsh.Ord = ord # monkey-patch workaround for bug in xdis
try:
toc = xdis.marsh.load(
MemoryFile(self.data), self.version)
except Exception:
pass
else:
error = None
finally:
xdis.marsh.Ord = _ord
if error is not None:
raise error
if isinstance(toc, list):
try:
toc = dict(toc)
except Exception as error:
self.entries = []
self.error = error
return
failures = 0
attempts = len(toc)
for name, (pzt, offset, length) in toc.items():
try:
name: str
name = name.decode('utf-8')
except AttributeError:
pass
try:
pzt = PzType(pzt)
except Exception:
pzt = PzType.DATA
name = name.replace('.', '/')
if pzt is PzType.PKG:
name = F'{name}/__init__'
with StreamDetour(self.reader, self.base + offset):
data = self.reader.read(length)
if key:
def decompressed(data=data):
cipher = AES.new(key, AES.MODE_CFB, bytes(data[:0x10]))
return zlib.decompress(cipher.decrypt(data[0x10:]))
elif decompress_peek(data):
def decompressed(data=data):
return zlib.decompress(data)
else:
failures += 1
continue
if decompile and pzt in (PzType.MODULE, PzType.PKG):
def decompiled(data=data, name=name, magic=self.magic):
data = decompressed(data)
if data[:4] != magic[:4]:
data = magic + data
return decompile_buffer(data, name)
self.entries.append(PiMeta(PiType.DECOMPILED, F'{name}.py', decompiled))
name = F'{name}.pyc'
type = PiType.SOURCE
else:
type = PiType.DATA
self.entries.append(PiMeta(type, name, decompressed))
if key:
if failures >= 6:
xtpyi.logger.warning(F'pyz decompression failed for {failures - 5} additional items')
return True
elif failures > 0.7 * attempts:
self.entries.clear()
return False
else:
return True
class PiTOCEntry(Struct):
def __init__(self, reader: StructReader):
reader.bigendian = True
entry_start_offset = reader.tell()
self.size_of_entry = reader.i32()
self.offset = reader.i32()
self.size_of_compressed_data = reader.i32()
self.size_od_uncompressed_data = reader.i32()
self.is_compressed = bool(reader.read_byte())
entry_type = bytes(reader.read(1))
name_length = self.size_of_entry - reader.tell() + entry_start_offset
if name_length > 0x1000:
raise RuntimeError(F'Refusing to process TOC entry with name of size {name_length}.')
name, *_ = bytes(reader.read(name_length)).partition(B'\0')
try:
name = name.decode('utf8', 'backslashreplace')
except Exception:
name = None
if not all(part.isprintable() for part in re.split('\\s*', name)):
raise RuntimeError('Refusing to process TOC entry with non-printable name.')
name = name or str(uuid.uuid4())
if entry_type == B'Z':
entry_type = B'z'
try:
self.type = PiType(entry_type)
except ValueError:
xtpyi.log_warn(F'unknown type {entry_type!r} in field {name}')
self.type = PiType.UNKNOWN
self.name = name
def __hash__(self):
return hash(self.name)
class PyInstallerArchiveEpilogue(Struct):
MagicSignature = bytes.fromhex('4D45490C0B0A0B0E')
def _read_libname(self, reader: StructReader) -> Optional[str]:
position = reader.tell()
try:
libname, t, rest = reader.read_bytes(64).partition(B'\0')
except EOF:
reader.seekset(position)
return None
try:
libname = libname.decode('utf8')
except Exception:
reader.seekset(position)
return None
if not t or any(rest) or len(rest) < 10 or not re.fullmatch(R'[\s!-~]+', libname):
reader.seekset(position)
return None
return libname
def __init__(self, reader: StructReader, offset: int, unmarshal: Unmarshal = Unmarshal.No):
reader.bigendian = True
reader.seekset(offset)
self.reader = reader
signature = reader.read_bytes(8)
if signature != self.MagicSignature:
raise ValueError(
F'offset 0x{offset:X} has invalid signature {signature.hex().upper()}; '
F'should be {self.MagicSignature.hex().upper()}')
self.size = reader.i32()
toc_offset = reader.i32()
toc_length = reader.i32()
self.py_version = '.'.join(str(reader.u32()))
self.py_libname = self._read_libname(reader)
self.offset = reader.tell() - self.size
self.toc: Dict[str, PiTOCEntry] = {}
toc_end = self.offset + toc_offset + toc_length
reader.seekset(self.offset + toc_offset)
while reader.tell() < toc_end:
try:
entry = PiTOCEntry(reader)
except EOF:
xtpyi.logger.warning('end of file while reading TOC')
break
except Exception as error:
xtpyi.logger.warning(F'unexpected error while reading TOC: {error!s}')
break
if entry.name in self.toc:
raise KeyError(F'duplicate name {entry.name}')
self.toc[entry.name] = entry
self.files: Dict[str, PiMeta] = {}
no_pyz_found = True
pyz_entries: Dict[str, PYZ] = {}
for entry in list(self.toc.values()):
if entry.type is not PiType.PYZ:
continue
no_pyz_found = False
name, xt = os.path.splitext(entry.name)
name_pyz = F'{name}.pyz'
if name == entry.name:
del self.toc[name]
self.toc[name_pyz] = entry
entry.name = name_pyz
reader.seekset(self.offset + entry.offset)
if entry.is_compressed:
data = self.extract(entry.name).unpack()
else:
data = reader
pyz_entries[name] = PYZ(data, self.py_version)
magics = {pyz.magic for pyz in pyz_entries.values()}
if not magics:
if not no_pyz_found:
xtpyi.logger.warning(
'no magic signature could be recovered from embedded pyzip archives; this is '
'unsual and means that there is no way to guess the missing magic for source '
'file entries and it will likely not be possible to decompile them.')
return
elif len(magics) > 1:
xtpyi.logger.warning('more than one magic signature was recovered; this is unusual.')
magics = list(magics)
keys: Set[bytes] = set()
for entry in self.toc.values():
extracted = self.extract(entry.name)
if entry.type not in (PiType.SOURCE, PiType.MODULE):
self.files[entry.name] = extracted
continue
data = extracted.unpack()
name, _ = os.path.splitext(extracted.name)
del self.files[extracted.name]
extracted.name = F'{name}.pyc'
self.files[extracted.name] = extracted
if len(magics) == 1 and data[:4] != magics[0]:
extracted.data = magics[0] + data
decompiled = make_decompiled_item(name, data, *magics)
if entry.type is PiType.SOURCE:
decompiled.type = PiType.USERCODE
self.files[F'{name}.py'] = decompiled
if name.endswith('crypto_key'):
for key in decompiled.unpack() | carve('string', decode=True):
if len(key) != 0x10:
continue
xtpyi.logger.info(F'found key: {key.decode(xtpyi.codec)}')
keys.add(key)
if unmarshal is Unmarshal.No:
return
if not keys:
key = None
else:
key = next(iter(keys))
for name, pyz in pyz_entries.items():
pyz.unpack(unmarshal is Unmarshal.YesAndDecompile, key)
for unpacked in pyz.entries:
unpacked.name = path = F'{name}/{unpacked.name}'
if path in self.files:
raise ValueError(F'duplicate file name: {path}')
self.files[path] = unpacked
def extract(self, name: str) -> PiMeta:
try:
return self.files[name]
except KeyError:
pass
entry = self.toc[name]
with StreamDetour(self.reader, self.offset + entry.offset):
data = self.reader.read(entry.size_of_compressed_data)
if entry.is_compressed:
def extracted(d=data): return zlib.decompress(d)
else:
extracted = data
result = PiMeta(entry.type, name, extracted)
self.files[name] = result
return result
class xtpyi(ArchiveUnit):
"""
Extracts and decompiles files from a Python Installer (aka PyInstaller) archive.
"""
def __init__(
self, *paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False,
path=b'path', date=b'date',
user_code: Arg.Switch('-u', group='FILTER', help=(
'Extract only source code files from the root of the archive. These usually implement '
'the actual domain logic.')) = False,
unmarshal: Arg('-y', action='count', group='FILTER', help=(
'(DANGEROUS) Unmarshal embedded PYZ archives. Warning: Maliciously crafted packages can '
'potentially exploit this to execute code. It is advised to only use this option inside '
'an isolated environment. Specify twice to decompile unmarshalled Python bytecode.'
)) = 0
):
super().__init__(
*paths,
list=list,
join_path=join_path,
drop_path=drop_path,
fuzzy=fuzzy,
exact=exact,
regex=regex,
path=path,
date=date,
unmarshal=unmarshal,
user_code=user_code,
)
@ArchiveUnit.Requires('xdis', 'arc', 'python', 'extended')
def _xdis():
import xdis.load
import xdis.magics
import xdis.marsh
import xdis.op_imports
import xdis
A, B, C, *_ = sys.version_info
version = F'{A}.{B}.{C}'
canonic = F'{A}.{B}'
if version not in xdis.magics.canonic_python_version:
class opcode_dummy:
version = float(canonic)
def __init__(self, name): self.name = name
def __getattr__(self, key): return opcode_dummy(F'{self.name}.{key}')
def __call__(self, *a, **k): return None
def __str__(self): return self.name
def __repr__(self): return self.name
import importlib
magic = importlib.util.MAGIC_NUMBER
xdis.magics.add_magic_from_int(xdis.magics.magic2int(magic), version)
xdis.magics.by_magic.setdefault(magic, set()).add(version)
xdis.magics.by_version[version] = magic
xdis.magics.magics[canonic] = magic
xdis.magics.canonic_python_version[canonic] = canonic
xdis.magics.add_canonic_versions(version, canonic)
xdis.op_imports.op_imports.setdefault(canonic, opcode_dummy('dummy'))
del A, B, C, version
import xdis.std
return xdis
@ArchiveUnit.Requires('uncompyle6', 'arc', 'python', 'extended')
def _uncompyle6():
import uncompyle6
import uncompyle6.main
return uncompyle6
@ArchiveUnit.Requires('decompyle3', 'arc', 'python')
def _decompyle3():
import decompyle3
import decompyle3.main
return decompyle3
def unpack(self, data):
view = memoryview(data)
positions = [m.start() for m in re.finditer(re.escape(PyInstallerArchiveEpilogue.MagicSignature), view)]
mode = Unmarshal(min(2, int(self.args.unmarshal)))
self.log_debug(F'unmarshal mode: {mode.name}')
if not positions:
raise LookupError('unable to find PyInstaller signature')
if len(positions) > 2:
# first position is expected to be the sentinel value in the unpacker stub
width = max(len(F'{p:X}') for p in positions)
for position in positions:
self.log_info(F'magic signature found at offset 0x{position:0{width}X}')
self.log_warn(F'found {len(positions) - 1} potential PyInstaller epilogue markers; using last one.')
archive = PyInstallerArchiveEpilogue(view, positions[-1], mode)
for name, file in archive.files.items():
if self.args.user_code:
if file.type != PiType.USERCODE:
continue
if name.startswith('pyiboot'):
continue
yield self._pack(name, None, file.data, type=file.type.name)
@classmethod
def handles(cls, data: ByteString) -> Optional[bool]:
return PyInstallerArchiveEpilogue.MagicSignature in data
Functions
def version2tuple(version)
-
Expand source code Browse git
def version2tuple(version: str): return tuple(int(k, 10) for k in re.fullmatch(R'^(\d+\.\d+(?:\.\d+)?)(.*)$', version).group(1).split('.'))
def decompress_peek(buffer, size=512)
-
Expand source code Browse git
def decompress_peek(buffer, size=512) -> Optional[bytes]: try: return zlib.decompressobj().decompress(buffer[:size]) except zlib.error: return None
def extract_code_from_buffer(buffer, file_name=None)
-
Expand source code Browse git
def extract_code_from_buffer(buffer: ByteString, file_name: Optional[str] = None) -> Generator[Code, None, None]: main: xtpyi = xtpyi code_objects = {} sys_stderr = sys.stderr sys.stderr = open(os.devnull, 'w') file_name = file_name or '<unknown>' try: version, timestamp, magic_int, codes, is_pypy, _, _ = \ main._xdis.load.load_module_from_file_object(MemoryFile(buffer), file_name, code_objects) finally: sys.stderr.close() sys.stderr = sys_stderr if not isinstance(codes, list): codes = [codes] for code in codes: yield Code(version, timestamp, magic_int, code, is_pypy, code_objects)
def decompile_buffer(buffer, file_name=None)
-
Expand source code Browse git
def decompile_buffer(buffer: Union[Code, ByteString], file_name: Optional[str] = None) -> ByteString: main: xtpyi = xtpyi errors = '' python = '' codes = [buffer] if not isinstance(buffer, Code): codes = list(extract_code_from_buffer(buffer, file_name)) for code in codes: engines = {} for e in ['decompyle3', 'uncompyle6']: try: dc = getattr(main, F'_{e}') if isinstance(dc, property): dc = dc.fget() except ImportError: errors += F'# The decompiler {dc} is not installed.\n' else: engines['decompyle3'] = dc if not engines: errors += '# (all missing, install one of the above to enable decompilation)' for name, engine in engines.items(): with io.StringIO(newline='') as output, NoLogging(NoLogging.Mode.ALL): try: engine.main.decompile( co=code.container, bytecode_version=code.version, out=output, timestamp=code.timestamp, code_objects=code.code_objects, is_pypy=code.is_pypi, magic_int=code.magic, ) except Exception as E: errors += '\n'.join(F'# {line}' for line in ( F'Error while decompiling with {name}:', *str(E).splitlines(True))) errors += '\n' else: python = output.getvalue() break if python: # removes leading comments python = python.splitlines(True) python.reverse() while python[-1].strip().startswith('#'): python.pop() python.reverse() python = ''.join(python) return python.encode(main.codec) if not isinstance(buffer, Code): embedded = bytes(buffer | carve('printable', single=True)) if len(code) - len(embedded) < 0x20: return embedded disassembly = MemoryFile() with io.TextIOWrapper(disassembly, main.codec, newline='\n') as output: output.write(errors) output.write('# Generating Disassembly:\n\n') for code in codes: instructions = list(main._xdis.std.Bytecode(code.container)) width_offset = max(len(str(i.offset)) for i in instructions) for i in instructions: opname = normalize_word_separators(i.opname, '.').lower() offset = F'{i.offset:0{width_offset}d}' output.write(F'# {offset:>5} {opname:<25} {i.argrepr}\n') output.write('\n') return disassembly.getbuffer()
def make_decompiled_item(name, data, *magics)
-
Expand source code Browse git
def make_decompiled_item(name: str, data: ByteString, *magics) -> PiMeta: def extract(data=data, magics=magics): error = None if any(data[:4] == m[:4] for m in magics): return decompile_buffer(data, name) for magic in magics: try: return decompile_buffer(magic + data, name) except Exception as exception: error = exception return '\n'.join(F'# {line}' for line in str(error).splitlines(True)).encode(xtpyi.codec) return PiMeta(PiType.DECOMPILED, F'{name}.py', extract)
Classes
class Unmarshal (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code Browse git
class Unmarshal(enum.IntEnum): No = 0 Yes = 1 YesAndDecompile = 2
Ancestors
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var No
var Yes
var YesAndDecompile
class Code (version, timestamp, magic, container, is_pypi, code_objects)
-
Code(version, timestamp, magic, container, is_pypi, code_objects)
Expand source code Browse git
class Code(NamedTuple): version: float timestamp: int magic: int container: CodeType is_pypi: bool code_objects: dict
Ancestors
- builtins.tuple
Instance variables
var version
-
Alias for field number 0
var timestamp
-
Alias for field number 1
var magic
-
Alias for field number 2
var container
-
Alias for field number 3
var is_pypi
-
Alias for field number 4
var code_objects
-
Alias for field number 5
class PiType (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code Browse git
class PiType(bytes, enum.Enum): BINARY = B'b' # noqa / binary DEPENDENCY = B'd' # noqa / runtime option PYZ = B'z' # noqa / zlib (pyz) - frozen Python code PACKAGE = B'M' # noqa / Python package (__init__.py) MODULE = B'm' # noqa / Python module SOURCE = B's' # noqa / Python script (v3) DATA = B'x' # noqa / data RUNTIME_OPTION = B'o' # noqa / runtime option SPLASH = B'l' # noqa / splash resources UNKNOWN = B'uk' # noqa DECOMPILED = B'dc' # noqa USERCODE = B'uc' # noqa ENCRYPTED = B'ec' # noqa
Ancestors
- builtins.bytes
- enum.Enum
Class variables
var BINARY
var DEPENDENCY
var PYZ
var PACKAGE
var MODULE
var SOURCE
var DATA
var RUNTIME_OPTION
var SPLASH
var UNKNOWN
var DECOMPILED
var USERCODE
var ENCRYPTED
class PzType (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code Browse git
class PzType(enum.IntEnum): MODULE = 0 PKG = 1 DATA = 2
Ancestors
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var MODULE
var PKG
var DATA
class PiMeta (type, name, data)
-
PiMeta(type: 'PiType', name: 'str', data: 'Union[Callable[[], ByteString], ByteString]')
Expand source code Browse git
class PiMeta: type: PiType name: str data: Union[Callable[[], ByteString], ByteString] def unpack(self) -> ByteString: if callable(self.data): self.data = self.data() return self.data
Class variables
var type
var name
var data
Methods
def unpack(self)
-
Expand source code Browse git
def unpack(self) -> ByteString: if callable(self.data): self.data = self.data() return self.data
class PYZ (reader, version)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Before initialization of the struct, the memberbar
of the newly created structure will be set to the value29
.Expand source code Browse git
class PYZ(Struct): MagicSignature = B'PYZ\0' def __init__(self, reader: StructReader, version: str): reader.bigendian = True self.base = reader.tell() signature = reader.read(4) if signature != self.MagicSignature: raise ValueError('invalid magic') magic = bytes(reader.read(4)) with contextlib.suppress(KeyError, AttributeError): xdis = xtpyi._xdis if isinstance(xdis, property): xdis = xdis.fget() version = xdis.magics.versions[magic] vtuple = version2tuple(version) padding_size = 4 if vtuple >= (3, 3): padding_size += 4 if vtuple >= (3, 7): padding_size += 4 self.version = version self.magic = magic + padding_size * b'\0' self.toc_offset = reader.i32() self.reader = reader self.entries: List[PiMeta] = [] def unpack(self, decompile: bool, key: Optional[bytes] = None) -> bool: with StreamDetour(self.reader, self.base + self.toc_offset): toc_data = self.reader.read() try: toc = marshal.loads(toc_data) except Exception as error: if MAGIC_NUMBER != self.magic[:4]: xdis = xtpyi._xdis if isinstance(xdis, property): xdis = xdis.fget() _ord = xdis.marsh.Ord xdis.marsh.Ord = ord # monkey-patch workaround for bug in xdis try: toc = xdis.marsh.load( MemoryFile(self.data), self.version) except Exception: pass else: error = None finally: xdis.marsh.Ord = _ord if error is not None: raise error if isinstance(toc, list): try: toc = dict(toc) except Exception as error: self.entries = [] self.error = error return failures = 0 attempts = len(toc) for name, (pzt, offset, length) in toc.items(): try: name: str name = name.decode('utf-8') except AttributeError: pass try: pzt = PzType(pzt) except Exception: pzt = PzType.DATA name = name.replace('.', '/') if pzt is PzType.PKG: name = F'{name}/__init__' with StreamDetour(self.reader, self.base + offset): data = self.reader.read(length) if key: def decompressed(data=data): cipher = AES.new(key, AES.MODE_CFB, bytes(data[:0x10])) return zlib.decompress(cipher.decrypt(data[0x10:])) elif decompress_peek(data): def decompressed(data=data): return zlib.decompress(data) else: failures += 1 continue if decompile and pzt in (PzType.MODULE, PzType.PKG): def decompiled(data=data, name=name, magic=self.magic): data = decompressed(data) if data[:4] != magic[:4]: data = magic + data return decompile_buffer(data, name) self.entries.append(PiMeta(PiType.DECOMPILED, F'{name}.py', decompiled)) name = F'{name}.pyc' type = PiType.SOURCE else: type = PiType.DATA self.entries.append(PiMeta(type, name, decompressed)) if key: if failures >= 6: xtpyi.logger.warning(F'pyz decompression failed for {failures - 5} additional items') return True elif failures > 0.7 * attempts: self.entries.clear() return False else: return True
Ancestors
Class variables
var MagicSignature
Methods
def unpack(self, decompile, key=None)
-
Expand source code Browse git
def unpack(self, decompile: bool, key: Optional[bytes] = None) -> bool: with StreamDetour(self.reader, self.base + self.toc_offset): toc_data = self.reader.read() try: toc = marshal.loads(toc_data) except Exception as error: if MAGIC_NUMBER != self.magic[:4]: xdis = xtpyi._xdis if isinstance(xdis, property): xdis = xdis.fget() _ord = xdis.marsh.Ord xdis.marsh.Ord = ord # monkey-patch workaround for bug in xdis try: toc = xdis.marsh.load( MemoryFile(self.data), self.version) except Exception: pass else: error = None finally: xdis.marsh.Ord = _ord if error is not None: raise error if isinstance(toc, list): try: toc = dict(toc) except Exception as error: self.entries = [] self.error = error return failures = 0 attempts = len(toc) for name, (pzt, offset, length) in toc.items(): try: name: str name = name.decode('utf-8') except AttributeError: pass try: pzt = PzType(pzt) except Exception: pzt = PzType.DATA name = name.replace('.', '/') if pzt is PzType.PKG: name = F'{name}/__init__' with StreamDetour(self.reader, self.base + offset): data = self.reader.read(length) if key: def decompressed(data=data): cipher = AES.new(key, AES.MODE_CFB, bytes(data[:0x10])) return zlib.decompress(cipher.decrypt(data[0x10:])) elif decompress_peek(data): def decompressed(data=data): return zlib.decompress(data) else: failures += 1 continue if decompile and pzt in (PzType.MODULE, PzType.PKG): def decompiled(data=data, name=name, magic=self.magic): data = decompressed(data) if data[:4] != magic[:4]: data = magic + data return decompile_buffer(data, name) self.entries.append(PiMeta(PiType.DECOMPILED, F'{name}.py', decompiled)) name = F'{name}.pyc' type = PiType.SOURCE else: type = PiType.DATA self.entries.append(PiMeta(type, name, decompressed)) if key: if failures >= 6: xtpyi.logger.warning(F'pyz decompression failed for {failures - 5} additional items') return True elif failures > 0.7 * attempts: self.entries.clear() return False else: return True
class PiTOCEntry (reader)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Before initialization of the struct, the memberbar
of the newly created structure will be set to the value29
.Expand source code Browse git
class PiTOCEntry(Struct): def __init__(self, reader: StructReader): reader.bigendian = True entry_start_offset = reader.tell() self.size_of_entry = reader.i32() self.offset = reader.i32() self.size_of_compressed_data = reader.i32() self.size_od_uncompressed_data = reader.i32() self.is_compressed = bool(reader.read_byte()) entry_type = bytes(reader.read(1)) name_length = self.size_of_entry - reader.tell() + entry_start_offset if name_length > 0x1000: raise RuntimeError(F'Refusing to process TOC entry with name of size {name_length}.') name, *_ = bytes(reader.read(name_length)).partition(B'\0') try: name = name.decode('utf8', 'backslashreplace') except Exception: name = None if not all(part.isprintable() for part in re.split('\\s*', name)): raise RuntimeError('Refusing to process TOC entry with non-printable name.') name = name or str(uuid.uuid4()) if entry_type == B'Z': entry_type = B'z' try: self.type = PiType(entry_type) except ValueError: xtpyi.log_warn(F'unknown type {entry_type!r} in field {name}') self.type = PiType.UNKNOWN self.name = name def __hash__(self): return hash(self.name)
Ancestors
class PyInstallerArchiveEpilogue (reader, offset, unmarshal=Unmarshal.No)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Before initialization of the struct, the memberbar
of the newly created structure will be set to the value29
.Expand source code Browse git
class PyInstallerArchiveEpilogue(Struct): MagicSignature = bytes.fromhex('4D45490C0B0A0B0E') def _read_libname(self, reader: StructReader) -> Optional[str]: position = reader.tell() try: libname, t, rest = reader.read_bytes(64).partition(B'\0') except EOF: reader.seekset(position) return None try: libname = libname.decode('utf8') except Exception: reader.seekset(position) return None if not t or any(rest) or len(rest) < 10 or not re.fullmatch(R'[\s!-~]+', libname): reader.seekset(position) return None return libname def __init__(self, reader: StructReader, offset: int, unmarshal: Unmarshal = Unmarshal.No): reader.bigendian = True reader.seekset(offset) self.reader = reader signature = reader.read_bytes(8) if signature != self.MagicSignature: raise ValueError( F'offset 0x{offset:X} has invalid signature {signature.hex().upper()}; ' F'should be {self.MagicSignature.hex().upper()}') self.size = reader.i32() toc_offset = reader.i32() toc_length = reader.i32() self.py_version = '.'.join(str(reader.u32())) self.py_libname = self._read_libname(reader) self.offset = reader.tell() - self.size self.toc: Dict[str, PiTOCEntry] = {} toc_end = self.offset + toc_offset + toc_length reader.seekset(self.offset + toc_offset) while reader.tell() < toc_end: try: entry = PiTOCEntry(reader) except EOF: xtpyi.logger.warning('end of file while reading TOC') break except Exception as error: xtpyi.logger.warning(F'unexpected error while reading TOC: {error!s}') break if entry.name in self.toc: raise KeyError(F'duplicate name {entry.name}') self.toc[entry.name] = entry self.files: Dict[str, PiMeta] = {} no_pyz_found = True pyz_entries: Dict[str, PYZ] = {} for entry in list(self.toc.values()): if entry.type is not PiType.PYZ: continue no_pyz_found = False name, xt = os.path.splitext(entry.name) name_pyz = F'{name}.pyz' if name == entry.name: del self.toc[name] self.toc[name_pyz] = entry entry.name = name_pyz reader.seekset(self.offset + entry.offset) if entry.is_compressed: data = self.extract(entry.name).unpack() else: data = reader pyz_entries[name] = PYZ(data, self.py_version) magics = {pyz.magic for pyz in pyz_entries.values()} if not magics: if not no_pyz_found: xtpyi.logger.warning( 'no magic signature could be recovered from embedded pyzip archives; this is ' 'unsual and means that there is no way to guess the missing magic for source ' 'file entries and it will likely not be possible to decompile them.') return elif len(magics) > 1: xtpyi.logger.warning('more than one magic signature was recovered; this is unusual.') magics = list(magics) keys: Set[bytes] = set() for entry in self.toc.values(): extracted = self.extract(entry.name) if entry.type not in (PiType.SOURCE, PiType.MODULE): self.files[entry.name] = extracted continue data = extracted.unpack() name, _ = os.path.splitext(extracted.name) del self.files[extracted.name] extracted.name = F'{name}.pyc' self.files[extracted.name] = extracted if len(magics) == 1 and data[:4] != magics[0]: extracted.data = magics[0] + data decompiled = make_decompiled_item(name, data, *magics) if entry.type is PiType.SOURCE: decompiled.type = PiType.USERCODE self.files[F'{name}.py'] = decompiled if name.endswith('crypto_key'): for key in decompiled.unpack() | carve('string', decode=True): if len(key) != 0x10: continue xtpyi.logger.info(F'found key: {key.decode(xtpyi.codec)}') keys.add(key) if unmarshal is Unmarshal.No: return if not keys: key = None else: key = next(iter(keys)) for name, pyz in pyz_entries.items(): pyz.unpack(unmarshal is Unmarshal.YesAndDecompile, key) for unpacked in pyz.entries: unpacked.name = path = F'{name}/{unpacked.name}' if path in self.files: raise ValueError(F'duplicate file name: {path}') self.files[path] = unpacked def extract(self, name: str) -> PiMeta: try: return self.files[name] except KeyError: pass entry = self.toc[name] with StreamDetour(self.reader, self.offset + entry.offset): data = self.reader.read(entry.size_of_compressed_data) if entry.is_compressed: def extracted(d=data): return zlib.decompress(d) else: extracted = data result = PiMeta(entry.type, name, extracted) self.files[name] = result return result
Ancestors
Class variables
var MagicSignature
Methods
def extract(self, name)
-
Expand source code Browse git
def extract(self, name: str) -> PiMeta: try: return self.files[name] except KeyError: pass entry = self.toc[name] with StreamDetour(self.reader, self.offset + entry.offset): data = self.reader.read(entry.size_of_compressed_data) if entry.is_compressed: def extracted(d=data): return zlib.decompress(d) else: extracted = data result = PiMeta(entry.type, name, extracted) self.files[name] = result return result
class xtpyi (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', user_code=False, unmarshal=0)
-
Extracts and decompiles files from a Python Installer (aka PyInstaller) archive.
Expand source code Browse git
class xtpyi(ArchiveUnit): """ Extracts and decompiles files from a Python Installer (aka PyInstaller) archive. """ def __init__( self, *paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', user_code: Arg.Switch('-u', group='FILTER', help=( 'Extract only source code files from the root of the archive. These usually implement ' 'the actual domain logic.')) = False, unmarshal: Arg('-y', action='count', group='FILTER', help=( '(DANGEROUS) Unmarshal embedded PYZ archives. Warning: Maliciously crafted packages can ' 'potentially exploit this to execute code. It is advised to only use this option inside ' 'an isolated environment. Specify twice to decompile unmarshalled Python bytecode.' )) = 0 ): super().__init__( *paths, list=list, join_path=join_path, drop_path=drop_path, fuzzy=fuzzy, exact=exact, regex=regex, path=path, date=date, unmarshal=unmarshal, user_code=user_code, ) @ArchiveUnit.Requires('xdis', 'arc', 'python', 'extended') def _xdis(): import xdis.load import xdis.magics import xdis.marsh import xdis.op_imports import xdis A, B, C, *_ = sys.version_info version = F'{A}.{B}.{C}' canonic = F'{A}.{B}' if version not in xdis.magics.canonic_python_version: class opcode_dummy: version = float(canonic) def __init__(self, name): self.name = name def __getattr__(self, key): return opcode_dummy(F'{self.name}.{key}') def __call__(self, *a, **k): return None def __str__(self): return self.name def __repr__(self): return self.name import importlib magic = importlib.util.MAGIC_NUMBER xdis.magics.add_magic_from_int(xdis.magics.magic2int(magic), version) xdis.magics.by_magic.setdefault(magic, set()).add(version) xdis.magics.by_version[version] = magic xdis.magics.magics[canonic] = magic xdis.magics.canonic_python_version[canonic] = canonic xdis.magics.add_canonic_versions(version, canonic) xdis.op_imports.op_imports.setdefault(canonic, opcode_dummy('dummy')) del A, B, C, version import xdis.std return xdis @ArchiveUnit.Requires('uncompyle6', 'arc', 'python', 'extended') def _uncompyle6(): import uncompyle6 import uncompyle6.main return uncompyle6 @ArchiveUnit.Requires('decompyle3', 'arc', 'python') def _decompyle3(): import decompyle3 import decompyle3.main return decompyle3 def unpack(self, data): view = memoryview(data) positions = [m.start() for m in re.finditer(re.escape(PyInstallerArchiveEpilogue.MagicSignature), view)] mode = Unmarshal(min(2, int(self.args.unmarshal))) self.log_debug(F'unmarshal mode: {mode.name}') if not positions: raise LookupError('unable to find PyInstaller signature') if len(positions) > 2: # first position is expected to be the sentinel value in the unpacker stub width = max(len(F'{p:X}') for p in positions) for position in positions: self.log_info(F'magic signature found at offset 0x{position:0{width}X}') self.log_warn(F'found {len(positions) - 1} potential PyInstaller epilogue markers; using last one.') archive = PyInstallerArchiveEpilogue(view, positions[-1], mode) for name, file in archive.files.items(): if self.args.user_code: if file.type != PiType.USERCODE: continue if name.startswith('pyiboot'): continue yield self._pack(name, None, file.data, type=file.type.name) @classmethod def handles(cls, data: ByteString) -> Optional[bool]: return PyInstallerArchiveEpilogue.MagicSignature in data
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Methods
def unpack(self, data)
-
Expand source code Browse git
def unpack(self, data): view = memoryview(data) positions = [m.start() for m in re.finditer(re.escape(PyInstallerArchiveEpilogue.MagicSignature), view)] mode = Unmarshal(min(2, int(self.args.unmarshal))) self.log_debug(F'unmarshal mode: {mode.name}') if not positions: raise LookupError('unable to find PyInstaller signature') if len(positions) > 2: # first position is expected to be the sentinel value in the unpacker stub width = max(len(F'{p:X}') for p in positions) for position in positions: self.log_info(F'magic signature found at offset 0x{position:0{width}X}') self.log_warn(F'found {len(positions) - 1} potential PyInstaller epilogue markers; using last one.') archive = PyInstallerArchiveEpilogue(view, positions[-1], mode) for name, file in archive.files.items(): if self.args.user_code: if file.type != PiType.USERCODE: continue if name.startswith('pyiboot'): continue yield self._pack(name, None, file.data, type=file.type.name)
Inherited members