Module refinery.units.compression.lz

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from enum import IntFlag
from itertools import repeat, product

from lzma import (
    FILTER_DELTA,
    FILTER_LZMA1,
    FILTER_LZMA2,
    FORMAT_ALONE,
    FORMAT_RAW,
    FORMAT_XZ,
    LZMACompressor,
    LZMADecompressor,
    LZMAError,
    PRESET_EXTREME,
)

from refinery.units import Arg, Unit, RefineryPartialResult
from refinery.lib.structures import MemoryFile
from refinery.lib.decompression import parse_lzma_properties

__all__ = ['lzma', '_auto_decompress_lzma']


class F(IntFlag):
    DEFAULT = 0
    INJECT = 1
    STEPWISE = 2


class lzma(Unit):
    """
    LZMA compression and decompression.
    """

    _SEARCH_MIN_DICT = 0x1_0000
    _SEARCH_MAX_DICT = 0x1000_0000
    _SEARCH_MAX_BLOW = 1.2
    _SEARCH_SKIP1 = 0x08
    _SEARCH_SKIP2 = 0x10
    _ATTEMPT_PARTIAL = True

    def __init__(
        self,
        raw   : Arg.Switch('-r', group='MODE', help='Use raw (no container) format.') = False,
        alone : Arg.Switch('-a', group='MODE', help='Use the lzma container format.') = False,
        xz    : Arg.Switch('-x', group='MODE', help='Use the default xz format.') = False,
        level : Arg.Number('-l', bound=(0, 9), help='The compression level preset; between 0 and 9.') = 9,
        delta : Arg.Number('-d', help='Add a delta filter when compressing.') = None,
    ):
        if (raw, alone, xz).count(True) > 1:
            raise ValueError('Only one container format can be enabled.')
        if level not in range(10):
            raise ValueError('Compression level must be a number between 0 and 9.')
        super().__init__(filter=filter, raw=raw, alone=alone, xz=xz, delta=delta,
            level=level | PRESET_EXTREME)

    def reverse(self, data):
        filters = []
        if self.args.delta is not None:
            self.log_debug('adding delta filter')
            filters.append({'id': FILTER_DELTA, 'dist': self.args.delta})
        if self.args.alone:
            self.log_debug('setting alone format')
            mode = FORMAT_ALONE
            filters.append({'id': FILTER_LZMA1, 'preset': self.args.level})
        elif self.args.raw:
            self.log_debug('setting raw format')
            mode = FORMAT_RAW
            filters.append({'id': FILTER_LZMA2, 'preset': self.args.level})
        else:
            if not self.args.xz:
                self.log_info('choosing default .xz container format for compression')
            mode = FORMAT_XZ
            filters.append({'id': FILTER_LZMA2, 'preset': self.args.level})
        lz = LZMACompressor(mode, filters=filters)
        output = lz.compress(data)
        output += lz.flush()
        return output

    def _decompress(self, data: bytearray, lz: LZMADecompressor, partial: bool = False):
        temp = bytearray()
        sizes = repeat(1) if partial else [len(data)]
        with MemoryFile(temp) as output:
            with MemoryFile(data) as stream:
                for size in sizes:
                    if stream.eof or stream.closed:
                        break
                    try:
                        offset = stream.tell()
                        output.write(lz.decompress(stream.read(size)))
                    except (EOFError, LZMAError):
                        raise RefineryPartialResult(
                            F'compression failed at offset {offset}', temp)
        if n := len(lz.unused_data):
            raise RefineryPartialResult(F'Data stream is truncated, {n} bytes unused.', temp)
        return temp

    def _process(self, data: bytearray, partial=False):
        try:
            dc = LZMADecompressor()
            return self._decompress(data, dc, partial)
        except RefineryPartialResult as pe:
            best = pe
        except Exception:
            best = None
            self.log_info('default LZMA decompressor failed, brute-forcing custom header')
        view = memoryview(data)
        min_original_size = {
            # https://sourceforge.net/p/sevenzip/discussion/45797/thread/b6bd62f8/
            1: int((len(data) - 64_000) / 1.100), # noqa
            2: int((len(data) -  1_000) / 1.001), # noqa
        }
        for (version, p), offset_prop, to_data in product(
            ((1, 5),
             (2, 1)),
            range(self._SEARCH_SKIP1 + 1),
            range(self._SEARCH_SKIP2 + 1),
        ):
            if offset_prop + to_data > p + 20:
                # expect no more than a 20 byte header on top of the properties
                # that would be enough for, e.g. compressed & uncompressed size
                # each filling a full 64bit integer and 4 additional bytes.
                continue
            try:
                filter = parse_lzma_properties(
                    view[offset_prop:offset_prop + p],
                    version,
                    min_dict=self._SEARCH_MIN_DICT,
                    max_dict=self._SEARCH_MAX_DICT,
                )
                self.log_debug(F'attempt LZMA{version} at {offset_prop:02d}, skipping {to_data:02d}, filter: {filter!r}')
                engine = LZMADecompressor(FORMAT_RAW, filters=[filter])
                result = self._decompress(view[offset_prop + p + to_data:], engine, partial)
            except RefineryPartialResult as pe:
                if best is None:
                    best = pe
                elif len(best.partial) < len(pe.partial):
                    best = pe
                continue
            except Exception:
                continue
            if len(result) < min_original_size[version]:
                continue
            if len(result) * self._SEARCH_MAX_BLOW < len(data):
                continue
            self.log_info(
                F'success with LZMA{version} properties at {offset_prop} and raw stream starting at {to_data + offset_prop + p}')
            return result
        if partial or not self._ATTEMPT_PARTIAL:
            if best and len(best.partial) > 0:
                raise best
            raise ValueError('unable to find an LZMA stream')

    def process(self, data: bytearray):
        if out := self._process(data):
            return out
        return self._process(data, partial=True)

    @classmethod
    def handles(self, data: bytearray):
        if data[:4] == B'\x5D\0\0\0':
            return True
        if data[:5] == B'\xFD7zXZ':
            return True


class _auto_decompress_lzma(lzma):
    _SEARCH_MIN_DICT = 0x1_0000
    _SEARCH_MAX_DICT = 0x100_0000
    _SEARCH_MAX_BLOW = 1.5
    _SEARCH_SKIP1 = 0
    _SEARCH_SKIP2 = 8
    _ATTEMPT_PARTIAL = False

Classes

class lzma (raw=False, alone=False, xz=False, level=9, delta=None)

LZMA compression and decompression.

Expand source code Browse git
class lzma(Unit):
    """
    LZMA compression and decompression.
    """

    _SEARCH_MIN_DICT = 0x1_0000
    _SEARCH_MAX_DICT = 0x1000_0000
    _SEARCH_MAX_BLOW = 1.2
    _SEARCH_SKIP1 = 0x08
    _SEARCH_SKIP2 = 0x10
    _ATTEMPT_PARTIAL = True

    def __init__(
        self,
        raw   : Arg.Switch('-r', group='MODE', help='Use raw (no container) format.') = False,
        alone : Arg.Switch('-a', group='MODE', help='Use the lzma container format.') = False,
        xz    : Arg.Switch('-x', group='MODE', help='Use the default xz format.') = False,
        level : Arg.Number('-l', bound=(0, 9), help='The compression level preset; between 0 and 9.') = 9,
        delta : Arg.Number('-d', help='Add a delta filter when compressing.') = None,
    ):
        if (raw, alone, xz).count(True) > 1:
            raise ValueError('Only one container format can be enabled.')
        if level not in range(10):
            raise ValueError('Compression level must be a number between 0 and 9.')
        super().__init__(filter=filter, raw=raw, alone=alone, xz=xz, delta=delta,
            level=level | PRESET_EXTREME)

    def reverse(self, data):
        filters = []
        if self.args.delta is not None:
            self.log_debug('adding delta filter')
            filters.append({'id': FILTER_DELTA, 'dist': self.args.delta})
        if self.args.alone:
            self.log_debug('setting alone format')
            mode = FORMAT_ALONE
            filters.append({'id': FILTER_LZMA1, 'preset': self.args.level})
        elif self.args.raw:
            self.log_debug('setting raw format')
            mode = FORMAT_RAW
            filters.append({'id': FILTER_LZMA2, 'preset': self.args.level})
        else:
            if not self.args.xz:
                self.log_info('choosing default .xz container format for compression')
            mode = FORMAT_XZ
            filters.append({'id': FILTER_LZMA2, 'preset': self.args.level})
        lz = LZMACompressor(mode, filters=filters)
        output = lz.compress(data)
        output += lz.flush()
        return output

    def _decompress(self, data: bytearray, lz: LZMADecompressor, partial: bool = False):
        temp = bytearray()
        sizes = repeat(1) if partial else [len(data)]
        with MemoryFile(temp) as output:
            with MemoryFile(data) as stream:
                for size in sizes:
                    if stream.eof or stream.closed:
                        break
                    try:
                        offset = stream.tell()
                        output.write(lz.decompress(stream.read(size)))
                    except (EOFError, LZMAError):
                        raise RefineryPartialResult(
                            F'compression failed at offset {offset}', temp)
        if n := len(lz.unused_data):
            raise RefineryPartialResult(F'Data stream is truncated, {n} bytes unused.', temp)
        return temp

    def _process(self, data: bytearray, partial=False):
        try:
            dc = LZMADecompressor()
            return self._decompress(data, dc, partial)
        except RefineryPartialResult as pe:
            best = pe
        except Exception:
            best = None
            self.log_info('default LZMA decompressor failed, brute-forcing custom header')
        view = memoryview(data)
        min_original_size = {
            # https://sourceforge.net/p/sevenzip/discussion/45797/thread/b6bd62f8/
            1: int((len(data) - 64_000) / 1.100), # noqa
            2: int((len(data) -  1_000) / 1.001), # noqa
        }
        for (version, p), offset_prop, to_data in product(
            ((1, 5),
             (2, 1)),
            range(self._SEARCH_SKIP1 + 1),
            range(self._SEARCH_SKIP2 + 1),
        ):
            if offset_prop + to_data > p + 20:
                # expect no more than a 20 byte header on top of the properties
                # that would be enough for, e.g. compressed & uncompressed size
                # each filling a full 64bit integer and 4 additional bytes.
                continue
            try:
                filter = parse_lzma_properties(
                    view[offset_prop:offset_prop + p],
                    version,
                    min_dict=self._SEARCH_MIN_DICT,
                    max_dict=self._SEARCH_MAX_DICT,
                )
                self.log_debug(F'attempt LZMA{version} at {offset_prop:02d}, skipping {to_data:02d}, filter: {filter!r}')
                engine = LZMADecompressor(FORMAT_RAW, filters=[filter])
                result = self._decompress(view[offset_prop + p + to_data:], engine, partial)
            except RefineryPartialResult as pe:
                if best is None:
                    best = pe
                elif len(best.partial) < len(pe.partial):
                    best = pe
                continue
            except Exception:
                continue
            if len(result) < min_original_size[version]:
                continue
            if len(result) * self._SEARCH_MAX_BLOW < len(data):
                continue
            self.log_info(
                F'success with LZMA{version} properties at {offset_prop} and raw stream starting at {to_data + offset_prop + p}')
            return result
        if partial or not self._ATTEMPT_PARTIAL:
            if best and len(best.partial) > 0:
                raise best
            raise ValueError('unable to find an LZMA stream')

    def process(self, data: bytearray):
        if out := self._process(data):
            return out
        return self._process(data, partial=True)

    @classmethod
    def handles(self, data: bytearray):
        if data[:4] == B'\x5D\0\0\0':
            return True
        if data[:5] == B'\xFD7zXZ':
            return True

Ancestors

Subclasses

Class variables

var required_dependencies
var optional_dependencies

Inherited members

class _auto_decompress_lzma (raw=False, alone=False, xz=False, level=9, delta=None)
Expand source code Browse git
class _auto_decompress_lzma(lzma):
    _SEARCH_MIN_DICT = 0x1_0000
    _SEARCH_MAX_DICT = 0x100_0000
    _SEARCH_MAX_BLOW = 1.5
    _SEARCH_SKIP1 = 0
    _SEARCH_SKIP2 = 8
    _ATTEMPT_PARTIAL = False

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Inherited members