Module refinery.units.compression.zl

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import zlib

from refinery.units import Arg, Unit


class zl(Unit):
    """
    ZLib compression and decompression.
    """

    def __init__(
        self,
        level  : Arg.Number('-l', bound=(0, 0X9), help='Specify a compression level between 0 and 9.') = 9,
        window : Arg.Number('-w', bound=(8, 0XF), help='Manually specify the window size between 8 and 15.') = 15,
        force  : Arg.Switch('-f', help='Decompress as far as possible, even if all known methods fail.') = False,
        zlib_header: Arg.Switch('-z', group='MODE', help='Use a ZLIB header.') = False,
        gzip_header: Arg.Switch('-g', group='MODE', help='Use a GZIP header.') = False
    ):
        if zlib_header and gzip_header:
            raise ValueError('You can only specify one header type (ZLIB or GZIP).')
        return super().__init__(level=level, window=window, force=force, zlib_header=zlib_header, gzip_header=gzip_header)

    def _force_decompress(self, data, mode):
        z = zlib.decompressobj(mode)

        def as_many_as_possible():
            for k in range(len(data)):
                try: yield z.decompress(data[k : k + 1])
                except zlib.error: break

        return B''.join(as_many_as_possible())

    def process(self, data):
        if data[0] == 0x78 or data[0:2] == B'\x1F\x8B' or self.args.zlib_header or self.args.gzip_header:
            mode_candidates = [self.args.window | 0x20, -self.args.window, 0]
        else:
            mode_candidates = [-self.args.window, self.args.window | 0x20, 0]
        for mode in mode_candidates:
            self.log_debug(F'using mode {mode} for decompression')
            try:
                z = zlib.decompressobj(mode)
                return z.decompress(data)
            except zlib.error:
                pass
        if self.args.force:
            return self._force_decompress(data, mode_candidates[0])
        raise ValueError('could not detect any zlib stream.')

    def reverse(self, data):
        mode = -self.args.window
        if self.args.zlib_header:
            mode = -mode
        if self.args.gzip_header:
            mode = -mode | 0x10
        self.log_debug(F'using mode {mode:+2d} for compression')
        zl = zlib.compressobj(self.args.level, zlib.DEFLATED, mode)
        zz = zl.compress(data)
        return zz + zl.flush(zlib.Z_FINISH)

    @classmethod
    def handles(self, data: bytearray):
        for sig in (
            B'\x1F\x8B',  # gzip header
            B'\x78\x01',  # zlib low compression
            B'\x78\x9C',  # zlib medium compression
            B'\x78\xDA',  # zlib high compression
        ):
            if data[:2] == sig:
                return True

Classes

class zl (level=9, window=15, force=False, zlib_header=False, gzip_header=False)

ZLib compression and decompression.

Expand source code Browse git
class zl(Unit):
    """
    ZLib compression and decompression.
    """

    def __init__(
        self,
        level  : Arg.Number('-l', bound=(0, 0X9), help='Specify a compression level between 0 and 9.') = 9,
        window : Arg.Number('-w', bound=(8, 0XF), help='Manually specify the window size between 8 and 15.') = 15,
        force  : Arg.Switch('-f', help='Decompress as far as possible, even if all known methods fail.') = False,
        zlib_header: Arg.Switch('-z', group='MODE', help='Use a ZLIB header.') = False,
        gzip_header: Arg.Switch('-g', group='MODE', help='Use a GZIP header.') = False
    ):
        if zlib_header and gzip_header:
            raise ValueError('You can only specify one header type (ZLIB or GZIP).')
        return super().__init__(level=level, window=window, force=force, zlib_header=zlib_header, gzip_header=gzip_header)

    def _force_decompress(self, data, mode):
        z = zlib.decompressobj(mode)

        def as_many_as_possible():
            for k in range(len(data)):
                try: yield z.decompress(data[k : k + 1])
                except zlib.error: break

        return B''.join(as_many_as_possible())

    def process(self, data):
        if data[0] == 0x78 or data[0:2] == B'\x1F\x8B' or self.args.zlib_header or self.args.gzip_header:
            mode_candidates = [self.args.window | 0x20, -self.args.window, 0]
        else:
            mode_candidates = [-self.args.window, self.args.window | 0x20, 0]
        for mode in mode_candidates:
            self.log_debug(F'using mode {mode} for decompression')
            try:
                z = zlib.decompressobj(mode)
                return z.decompress(data)
            except zlib.error:
                pass
        if self.args.force:
            return self._force_decompress(data, mode_candidates[0])
        raise ValueError('could not detect any zlib stream.')

    def reverse(self, data):
        mode = -self.args.window
        if self.args.zlib_header:
            mode = -mode
        if self.args.gzip_header:
            mode = -mode | 0x10
        self.log_debug(F'using mode {mode:+2d} for compression')
        zl = zlib.compressobj(self.args.level, zlib.DEFLATED, mode)
        zz = zl.compress(data)
        return zz + zl.flush(zlib.Z_FINISH)

    @classmethod
    def handles(self, data: bytearray):
        for sig in (
            B'\x1F\x8B',  # gzip header
            B'\x78\x01',  # zlib low compression
            B'\x78\x9C',  # zlib medium compression
            B'\x78\xDA',  # zlib high compression
        ):
            if data[:2] == sig:
                return True

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Inherited members