Module refinery.units.formats.pym

Expand source code Browse git
from __future__ import annotations

import marshal
import sys

from types import CodeType

from refinery.lib import json
from refinery.lib.py import SYS_PYTHON, Marshal, code_header, version2tuple
from refinery.lib.types import Param
from refinery.units import Arg, Unit


class pym(Unit):
    """
    Convert Python-Marshaled code objects to PYC format.

    If it is an older Python version, you can use the `refinery.pyc` unit to then decompile
    the code, but for more recent versions a separate Python decompiler will be required.
    """
    @classmethod
    def handles(cls, data) -> bool | None:
        from refinery.lib.id import PycMagicPattern
        if PycMagicPattern.fullmatch(data[:4]):
            return True

    def __init__(
        self,
        version: Param[str | None, Arg.String('-V', metavar='V',
            help='Optionally select the (known) Python version.')] = None,
        system: Param[bool, Arg.Switch('-s',
            help='Try to use the built-in marshal.loads before using the parser.')] = False,
        redump: Param[bool, Arg.Switch('-r',
            help='Load marshaled code objects before re-dumping them.')] = False,
    ):
        super().__init__(
            version=version,
            system=system,
            redump=redump,
        )

    def reverse(self, data):
        return marshal.dumps(data)

    def process(self, data):
        def toblob(data):
            if isinstance(data, (bytes, bytearray)):
                self.log_info('unmarshalled a byte string, returning as is')
                return data
            if isinstance(data, str):
                self.log_info(F'unmarshalled a string object, encoding as {self.codec}')
                return data.encode(self.codec)
            if isinstance(data, CodeType):
                self.log_info('unmarshalled a code object, converting to pyc')
                pyc = code_header()
                pyc.extend(marshal.dumps(data))
                return pyc
            if isinstance(data, int):
                self.log_info('unmarshalled an integer, returning big endian encoding')
                q, r = divmod(data.bit_length(), 8)
                q += int(bool(r))
                return data.to_bytes(q, 'big')
            if isinstance(data, dict):
                return json.dumps(data, pretty=False, tojson=json.bytes_as_string)
            raise NotImplementedError(
                F'No serialization implemented for object of type {data.__class__.__name__}')

        if version := self.args.version:
            version = version2tuple(version)

        if version and version != SYS_PYTHON or not self.args.system:
            out = None
        else:
            try:
                out = marshal.loads(data)
            except Exception as error:
                self.log_info(F'the marshal.loads method failed: {error!s}')
                out = None
            else:
                v = sys.version_info
                self.log_info(F'unmarshaled using the {v.major}.{v.minor}.{v.micro} built-in marshal.loads')

        if out is None:
            dumpcode = not self.args.redump
            memory = memoryview(data)
            unpacker = Marshal(memory, version=version, dumpcode=dumpcode)
            out = unpacker.object()

        if isinstance(out, (list, tuple, set, frozenset)):
            self.log_info('object is a collection, converting each item individually')
            for item in out:
                yield toblob(item)
        else:
            yield toblob(out)

Classes

class pym (version=None, system=False, redump=False)

Convert Python-Marshaled code objects to PYC format.

If it is an older Python version, you can use the pyc unit to then decompile the code, but for more recent versions a separate Python decompiler will be required.

Expand source code Browse git
class pym(Unit):
    """
    Convert Python-Marshaled code objects to PYC format.

    If it is an older Python version, you can use the `refinery.pyc` unit to then decompile
    the code, but for more recent versions a separate Python decompiler will be required.
    """
    @classmethod
    def handles(cls, data) -> bool | None:
        from refinery.lib.id import PycMagicPattern
        if PycMagicPattern.fullmatch(data[:4]):
            return True

    def __init__(
        self,
        version: Param[str | None, Arg.String('-V', metavar='V',
            help='Optionally select the (known) Python version.')] = None,
        system: Param[bool, Arg.Switch('-s',
            help='Try to use the built-in marshal.loads before using the parser.')] = False,
        redump: Param[bool, Arg.Switch('-r',
            help='Load marshaled code objects before re-dumping them.')] = False,
    ):
        super().__init__(
            version=version,
            system=system,
            redump=redump,
        )

    def reverse(self, data):
        return marshal.dumps(data)

    def process(self, data):
        def toblob(data):
            if isinstance(data, (bytes, bytearray)):
                self.log_info('unmarshalled a byte string, returning as is')
                return data
            if isinstance(data, str):
                self.log_info(F'unmarshalled a string object, encoding as {self.codec}')
                return data.encode(self.codec)
            if isinstance(data, CodeType):
                self.log_info('unmarshalled a code object, converting to pyc')
                pyc = code_header()
                pyc.extend(marshal.dumps(data))
                return pyc
            if isinstance(data, int):
                self.log_info('unmarshalled an integer, returning big endian encoding')
                q, r = divmod(data.bit_length(), 8)
                q += int(bool(r))
                return data.to_bytes(q, 'big')
            if isinstance(data, dict):
                return json.dumps(data, pretty=False, tojson=json.bytes_as_string)
            raise NotImplementedError(
                F'No serialization implemented for object of type {data.__class__.__name__}')

        if version := self.args.version:
            version = version2tuple(version)

        if version and version != SYS_PYTHON or not self.args.system:
            out = None
        else:
            try:
                out = marshal.loads(data)
            except Exception as error:
                self.log_info(F'the marshal.loads method failed: {error!s}')
                out = None
            else:
                v = sys.version_info
                self.log_info(F'unmarshaled using the {v.major}.{v.minor}.{v.micro} built-in marshal.loads')

        if out is None:
            dumpcode = not self.args.redump
            memory = memoryview(data)
            unpacker = Marshal(memory, version=version, dumpcode=dumpcode)
            out = unpacker.object()

        if isinstance(out, (list, tuple, set, frozenset)):
            self.log_info('object is a collection, converting each item individually')
            for item in out:
                yield toblob(item)
        else:
            yield toblob(out)

Ancestors

Subclasses

Inherited members