Module refinery.units.compression.zl

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import zlib
import itertools

from refinery.units import Arg, Unit, RefineryPartialResult
from refinery.lib.tools import exception_to_string


class zl(Unit):
    """
    ZLib compression and decompression.
    """

    def __init__(
        self,
        level  : Arg.Number('-l', bound=(0, 0X9), help='Specify a compression level between 0 and 9.') = 9,
        window : Arg.Number('-w', bound=(8, 0XF), help='Manually specify the window size between 8 and 15.') = 15,
        zlib_header: Arg.Switch('-z', group='MODE', help='Use a ZLIB header.') = False,
        gzip_header: Arg.Switch('-g', group='MODE', help='Use a GZIP header.') = False
    ):
        if zlib_header and gzip_header:
            raise ValueError('You can only specify one header type (ZLIB or GZIP).')
        return super().__init__(level=level, window=window, zlib_header=zlib_header, gzip_header=gzip_header)

    def _decompress_data(self, data, mode: int, step: int):
        zl = zlib.decompressobj(mode)
        memory = memoryview(data)
        result = bytearray()
        while not zl.eof:
            read = min(step, len(memory))
            try:
                chunk = zl.decompress(memory[:read])
            except zlib.error as e:
                raise RefineryPartialResult(exception_to_string(e), result) from e
            else:
                result.extend(chunk)
                consumed = read - len(zl.unused_data)
                if not memory or consumed == 0:
                    break
                memory = memory[consumed:]
        return result, memory

    def process(self, data):
        if data[0] == 0x78 or data[0:2] == B'\x1F\x8B' or self.args.zlib_header or self.args.gzip_header:
            modes = [self.args.window | 0x20, -self.args.window]
        else:
            modes = [-self.args.window, self.args.window | 0x20]
        modes.extend([0x10 | self.args.window, 0])
        view = memoryview(data)
        step = 32 if self.leniency > 0 else len(data)
        for k in itertools.count(1):
            error = None
            rest = view
            for mode in modes:
                try:
                    out, rest = self._decompress_data(view, mode, step)
                except Exception as e:
                    error = error or e
                else:
                    self.log_info(F'used mode {mode} to decompress chunk {k}')
                    yield out
                    error = None
                    break
            if error:
                raise error
            if not rest:
                break
            if len(rest) == len(view):
                break
            if len(rest) > len(view):
                raise RuntimeError('Decompressor returned more tail data than input data.')
            yield out
            view = rest
        if k <= 0:
            raise ValueError('Could not detect any zlib stream.')

    def reverse(self, data):
        mode = -self.args.window
        if self.args.zlib_header:
            mode = -mode
        if self.args.gzip_header:
            mode = -mode | 0x10
        self.log_debug(F'using mode {mode:+2d} for compression')
        zl = zlib.compressobj(self.args.level, zlib.DEFLATED, mode)
        zz = zl.compress(data)
        return zz + zl.flush(zlib.Z_FINISH)

    @classmethod
    def handles(self, data: bytearray):
        for sig in (
            B'\x1F\x8B',  # gzip header
            B'\x78\x01',  # zlib low compression
            B'\x78\x9C',  # zlib medium compression
            B'\x78\xDA',  # zlib high compression
        ):
            if data[:2] == sig:
                return True

Classes

class zl (level=9, window=15, zlib_header=False, gzip_header=False)

ZLib compression and decompression.

Expand source code Browse git
class zl(Unit):
    """
    ZLib compression and decompression.
    """

    def __init__(
        self,
        level  : Arg.Number('-l', bound=(0, 0X9), help='Specify a compression level between 0 and 9.') = 9,
        window : Arg.Number('-w', bound=(8, 0XF), help='Manually specify the window size between 8 and 15.') = 15,
        zlib_header: Arg.Switch('-z', group='MODE', help='Use a ZLIB header.') = False,
        gzip_header: Arg.Switch('-g', group='MODE', help='Use a GZIP header.') = False
    ):
        if zlib_header and gzip_header:
            raise ValueError('You can only specify one header type (ZLIB or GZIP).')
        return super().__init__(level=level, window=window, zlib_header=zlib_header, gzip_header=gzip_header)

    def _decompress_data(self, data, mode: int, step: int):
        zl = zlib.decompressobj(mode)
        memory = memoryview(data)
        result = bytearray()
        while not zl.eof:
            read = min(step, len(memory))
            try:
                chunk = zl.decompress(memory[:read])
            except zlib.error as e:
                raise RefineryPartialResult(exception_to_string(e), result) from e
            else:
                result.extend(chunk)
                consumed = read - len(zl.unused_data)
                if not memory or consumed == 0:
                    break
                memory = memory[consumed:]
        return result, memory

    def process(self, data):
        if data[0] == 0x78 or data[0:2] == B'\x1F\x8B' or self.args.zlib_header or self.args.gzip_header:
            modes = [self.args.window | 0x20, -self.args.window]
        else:
            modes = [-self.args.window, self.args.window | 0x20]
        modes.extend([0x10 | self.args.window, 0])
        view = memoryview(data)
        step = 32 if self.leniency > 0 else len(data)
        for k in itertools.count(1):
            error = None
            rest = view
            for mode in modes:
                try:
                    out, rest = self._decompress_data(view, mode, step)
                except Exception as e:
                    error = error or e
                else:
                    self.log_info(F'used mode {mode} to decompress chunk {k}')
                    yield out
                    error = None
                    break
            if error:
                raise error
            if not rest:
                break
            if len(rest) == len(view):
                break
            if len(rest) > len(view):
                raise RuntimeError('Decompressor returned more tail data than input data.')
            yield out
            view = rest
        if k <= 0:
            raise ValueError('Could not detect any zlib stream.')

    def reverse(self, data):
        mode = -self.args.window
        if self.args.zlib_header:
            mode = -mode
        if self.args.gzip_header:
            mode = -mode | 0x10
        self.log_debug(F'using mode {mode:+2d} for compression')
        zl = zlib.compressobj(self.args.level, zlib.DEFLATED, mode)
        zz = zl.compress(data)
        return zz + zl.flush(zlib.Z_FINISH)

    @classmethod
    def handles(self, data: bytearray):
        for sig in (
            B'\x1F\x8B',  # gzip header
            B'\x78\x01',  # zlib low compression
            B'\x78\x9C',  # zlib medium compression
            B'\x78\xDA',  # zlib high compression
        ):
            if data[:2] == sig:
                return True

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Inherited members