Module refinery.units.compression.decompress

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import ByteString, List, NamedTuple, Optional

from refinery.units import Arg, Unit, RefineryPartialResult
from refinery.lib.types import INF

from .ap     import aplib  # noqa
from .blz    import blz    # noqa
from .bz2    import bz2    # noqa
from .jcalg  import jcalg  # noqa
from .lz     import lzma   # noqa
from .lz4    import lz4    # noqa
from .lzjb   import lzjb   # noqa
from .lznt1  import lznt1  # noqa
from .lzo    import lzo    # noqa
from .szdd   import szdd   # noqa
from .zl     import zl     # noqa
from .qlz    import qlz    # noqa
from .lzf    import lzf    # noqa
from .lzw    import lzw    # noqa


class decompress(Unit):
    """
    Attempts all available decompression units against the input and returns
    the output of the first successful one. If none succeeds, the data is
    returned unaltered. The process is heavily biased against LZNT1 decompression
    due to a large tendency for LZNT1 false positives.
    """
    def __init__(
        self,
        prepend: Arg.Switch('-P', '--no-prepend', off=True, help=(
            'By default, if decompression fails, the unit attempts to prefix '
            'the data with all possible values of a single byte and decompress '
            'the result. This behavior can be disabled with this flag.')
        ) = True,
        tolerance: Arg.Number('-t', help=(
            'Maximum number of bytes to strip from the beginning of the data; '
            'The default value is 12.')
        ) = 12,
        max_ratio: Arg('-m', metavar='R', help=(
            'To determine whether a decompression algorithm was successful, the '
            'ratio of compressed size to decompressed size may at most be as large '
            'as this number, a floating point value R; default value is {default}.')
        ) = 1,
        min_ratio: Arg('-n', metavar='R', help=(
            'Require that compression ratios must be at least as large as R. This '
            'is a "too good to be true" heuristic against algorithms like lznt1 '
            'that can produce false positives. The default is {default}.')
        ) = 0.0001,
    ):
        if min_ratio <= 0:
            raise ValueError('The compression factor must be nonnegative.')
        super().__init__(
            tolerance=tolerance,
            prepend=prepend,
            min_ratio=min_ratio,
            max_ratio=max_ratio
        )
        self.engines: List[Unit] = [
            engine.assemble() for engine in [
                szdd, zl, lzma, aplib, qlz, lzf, lzw, jcalg, bz2, blz, lzjb, lz4, lzo, lznt1]
        ]
        for engine in self.engines:
            engine.log_detach()

    def process(self, data):

        data = memoryview(data)

        class Decompression(NamedTuple):
            engine: Unit
            result: Optional[ByteString] = None
            cutoff: int = 0
            prefix: Optional[int] = None
            failed: bool = False

            def __str__(self):
                status = 'partial' if self.failed else 'success'
                prefix = self.prefix
                if prefix is not None:
                    prefix = F'0x{prefix:02X}'
                return F'prefix={prefix}, cutoff=0x{self.cutoff:02X}, [{status}] engine={self.engine.name}'

            def __len__(self):
                return len(self.result)

            @property
            def ratio(self):
                if not self.result:
                    return INF
                return len(data) / len(self.result)

            @property
            def unmodified(self):
                return self.cutoff == 0
                return self.prefix is None and self.cutoff == 0

            @property
            def method(self):
                return self.engine.name

        if self.args.prepend:
            buffer = bytearray(1 + len(data))
            buffer[1:] = data

        best_only_success: Optional[Decompression] = None
        best_with_failure: Optional[Decompression] = None

        def decompress(engine: Unit, cutoff: int = 0, prefix: Optional[int] = None):
            ingest = data[cutoff:]
            failed = True
            if prefix is not None:
                buffer[0] = prefix
                ingest = buffer
            if engine.handles(ingest) is False:
                return Decompression(engine, None, cutoff, prefix)
            try:
                result = engine.process(ingest)
            except RefineryPartialResult as pr:
                result = pr.partial
            except Exception:
                result = None
            else:
                failed = False
            return Decompression(engine, result, cutoff, prefix, failed)

        def update(new: Decompression, best: Optional[Decompression] = None, discard_if_too_good=False) -> Decompression:
            ratio = new.ratio
            if ratio > self.args.max_ratio:
                return best
            if ratio < self.args.min_ratio:
                return best
            prefix = new.prefix
            if prefix is not None:
                prefix = F'0x{prefix:02X}'
            r = 1 if new.unmodified and best and not best.unmodified else 0.95
            if not best or len(new) < len(best):
                q = 0
            else:
                q = ratio / best.ratio
            if q < r:
                if best and discard_if_too_good:
                    if q < 0.5:
                        return best
                    if new.failed:
                        return best
                self.log_info(lambda: F'obtained {ratio * 100:07.4f}% compression ratio [q={q:07.4f}] with: {new!s}')
                return new
            else:
                self.log_debug(F'obtained {ratio * 100:07.4f}% compression ratio [q={q:07.4f}] with: {new!s}')
                return best

        for engine in self.engines:
            self.log_debug(F'attempting engine: {engine.name}')
            careful = isinstance(engine, (lznt1, lzf, lzjb))
            for t in range(self.args.tolerance):
                if best_only_success and careful and t > 0:
                    break
                dc = decompress(engine, t)
                if not dc.failed:
                    best_only_success = update(dc, best_only_success, careful)
                else:
                    best_with_failure = update(dc, best_with_failure, careful)
            if self.args.prepend and not best_only_success:
                for p in range(0x100):
                    dc = decompress(engine, 0, p)
                    if not dc.failed:
                        best_only_success = update(dc, best_only_success, careful)
                    else:
                        best_with_failure = update(dc, best_with_failure, careful)

        if best_only_success is not None:
            return self.labelled(best_only_success.result, method=best_only_success.method)
        if best_with_failure is not None:
            self.log_info('the only decompression with result returned only a partial result.')
            return self.labelled(best_with_failure.result, method=best_with_failure.method)
        self.log_warn('no compression engine worked, returning original data.')
        return data

Classes

class decompress (prepend=True, tolerance=12, max_ratio=1, min_ratio=0.0001)

Attempts all available decompression units against the input and returns the output of the first successful one. If none succeeds, the data is returned unaltered. The process is heavily biased against LZNT1 decompression due to a large tendency for LZNT1 false positives.

Expand source code Browse git
class decompress(Unit):
    """
    Attempts all available decompression units against the input and returns
    the output of the first successful one. If none succeeds, the data is
    returned unaltered. The process is heavily biased against LZNT1 decompression
    due to a large tendency for LZNT1 false positives.
    """
    def __init__(
        self,
        prepend: Arg.Switch('-P', '--no-prepend', off=True, help=(
            'By default, if decompression fails, the unit attempts to prefix '
            'the data with all possible values of a single byte and decompress '
            'the result. This behavior can be disabled with this flag.')
        ) = True,
        tolerance: Arg.Number('-t', help=(
            'Maximum number of bytes to strip from the beginning of the data; '
            'The default value is 12.')
        ) = 12,
        max_ratio: Arg('-m', metavar='R', help=(
            'To determine whether a decompression algorithm was successful, the '
            'ratio of compressed size to decompressed size may at most be as large '
            'as this number, a floating point value R; default value is {default}.')
        ) = 1,
        min_ratio: Arg('-n', metavar='R', help=(
            'Require that compression ratios must be at least as large as R. This '
            'is a "too good to be true" heuristic against algorithms like lznt1 '
            'that can produce false positives. The default is {default}.')
        ) = 0.0001,
    ):
        if min_ratio <= 0:
            raise ValueError('The compression factor must be nonnegative.')
        super().__init__(
            tolerance=tolerance,
            prepend=prepend,
            min_ratio=min_ratio,
            max_ratio=max_ratio
        )
        self.engines: List[Unit] = [
            engine.assemble() for engine in [
                szdd, zl, lzma, aplib, qlz, lzf, lzw, jcalg, bz2, blz, lzjb, lz4, lzo, lznt1]
        ]
        for engine in self.engines:
            engine.log_detach()

    def process(self, data):

        data = memoryview(data)

        class Decompression(NamedTuple):
            engine: Unit
            result: Optional[ByteString] = None
            cutoff: int = 0
            prefix: Optional[int] = None
            failed: bool = False

            def __str__(self):
                status = 'partial' if self.failed else 'success'
                prefix = self.prefix
                if prefix is not None:
                    prefix = F'0x{prefix:02X}'
                return F'prefix={prefix}, cutoff=0x{self.cutoff:02X}, [{status}] engine={self.engine.name}'

            def __len__(self):
                return len(self.result)

            @property
            def ratio(self):
                if not self.result:
                    return INF
                return len(data) / len(self.result)

            @property
            def unmodified(self):
                return self.cutoff == 0
                return self.prefix is None and self.cutoff == 0

            @property
            def method(self):
                return self.engine.name

        if self.args.prepend:
            buffer = bytearray(1 + len(data))
            buffer[1:] = data

        best_only_success: Optional[Decompression] = None
        best_with_failure: Optional[Decompression] = None

        def decompress(engine: Unit, cutoff: int = 0, prefix: Optional[int] = None):
            ingest = data[cutoff:]
            failed = True
            if prefix is not None:
                buffer[0] = prefix
                ingest = buffer
            if engine.handles(ingest) is False:
                return Decompression(engine, None, cutoff, prefix)
            try:
                result = engine.process(ingest)
            except RefineryPartialResult as pr:
                result = pr.partial
            except Exception:
                result = None
            else:
                failed = False
            return Decompression(engine, result, cutoff, prefix, failed)

        def update(new: Decompression, best: Optional[Decompression] = None, discard_if_too_good=False) -> Decompression:
            ratio = new.ratio
            if ratio > self.args.max_ratio:
                return best
            if ratio < self.args.min_ratio:
                return best
            prefix = new.prefix
            if prefix is not None:
                prefix = F'0x{prefix:02X}'
            r = 1 if new.unmodified and best and not best.unmodified else 0.95
            if not best or len(new) < len(best):
                q = 0
            else:
                q = ratio / best.ratio
            if q < r:
                if best and discard_if_too_good:
                    if q < 0.5:
                        return best
                    if new.failed:
                        return best
                self.log_info(lambda: F'obtained {ratio * 100:07.4f}% compression ratio [q={q:07.4f}] with: {new!s}')
                return new
            else:
                self.log_debug(F'obtained {ratio * 100:07.4f}% compression ratio [q={q:07.4f}] with: {new!s}')
                return best

        for engine in self.engines:
            self.log_debug(F'attempting engine: {engine.name}')
            careful = isinstance(engine, (lznt1, lzf, lzjb))
            for t in range(self.args.tolerance):
                if best_only_success and careful and t > 0:
                    break
                dc = decompress(engine, t)
                if not dc.failed:
                    best_only_success = update(dc, best_only_success, careful)
                else:
                    best_with_failure = update(dc, best_with_failure, careful)
            if self.args.prepend and not best_only_success:
                for p in range(0x100):
                    dc = decompress(engine, 0, p)
                    if not dc.failed:
                        best_only_success = update(dc, best_only_success, careful)
                    else:
                        best_with_failure = update(dc, best_with_failure, careful)

        if best_only_success is not None:
            return self.labelled(best_only_success.result, method=best_only_success.method)
        if best_with_failure is not None:
            self.log_info('the only decompression with result returned only a partial result.')
            return self.labelled(best_with_failure.result, method=best_with_failure.method)
        self.log_warn('no compression engine worked, returning original data.')
        return data

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Inherited members