Module refinery.units.compression.lzw

Expand source code Browse git
from __future__ import annotations

import itertools

from array import array
from enum import IntEnum

from refinery.lib.structures import MemoryFile, StructReaderBits
from refinery.units import RefineryPartialResult, Unit


class LZW(IntEnum):
    INIT_BITS = 9
    BITS = 0x10
    CLEAR = 0x100
    FIRST = 0x101
    WSIZE = 0x8000
    EXTRA = 0x40


class lzw(Unit):
    '''
    LZW decompression based on ancient Unix sources.
    '''

    _MAGIC = B'\x1F\x9D'

    def process(self, data: bytearray):
        out = MemoryFile()
        inf = StructReaderBits(memoryview(data))

        if inf.peek(2) != self._MAGIC:
            self.log_info('No LZW signature found, assuming raw stream.')
            maxbits = LZW.BITS
            block_mode = True
        else:
            inf.seekrel(2)
            maxbits = inf.read_integer(5)
            if inf.read_integer(2) != 0:
                self.log_info('reserved bits were set in LZW header')
            block_mode = bool(inf.read_bit())

        if maxbits > LZW.BITS:
            raise ValueError(F'Compressed with {maxbits} bits; cannot handle file.')

        maxmaxcode = 1 << maxbits

        ibuf = inf.read()

        tab_suffix = bytearray(LZW.WSIZE * 2)
        tab_prefix = array('H', itertools.repeat(0, 1 << LZW.BITS))

        n_bits = LZW.INIT_BITS
        maxcode = (1 << n_bits) - 1
        bitmask = (1 << n_bits) - 1
        oldcode = ~0
        finchar = +0
        posbits = +0

        free_entry = LZW.FIRST if block_mode else 0x100
        tab_suffix[:0x100] = range(0x100)
        resetbuf = True

        while resetbuf:
            resetbuf = False

            ibuf = ibuf[posbits >> 3:]
            insize = len(ibuf)
            posbits = 0
            inbits = (insize << 3) - (n_bits - 1)

            while inbits > posbits:
                if free_entry > maxcode:
                    n = n_bits << 3
                    p = posbits - 1
                    posbits = p + (n - (p + n) % n)
                    n_bits += 1
                    if (n_bits == maxbits):
                        maxcode = maxmaxcode
                    else:
                        maxcode = (1 << n_bits) - 1
                    bitmask = (1 << n_bits) - 1
                    resetbuf = True
                    break

                p = ibuf[posbits >> 3:]
                code = int.from_bytes(p[:3], 'little') >> (posbits & 7) & bitmask
                posbits += n_bits

                if oldcode == -1:
                    if code >= 256:
                        raise ValueError('corrupt input.')
                    oldcode = code
                    finchar = oldcode
                    out.write_byte(finchar)
                    continue

                if code == LZW.CLEAR and block_mode:
                    tab_prefix[:0x100] = array('H', itertools.repeat(0, 0x100))
                    free_entry = LZW.FIRST - 1
                    n = n_bits << 3
                    p = posbits - 1
                    posbits = p + (n - (p + n) % n)
                    n_bits = LZW.INIT_BITS
                    maxcode = (1 << n_bits) - 1
                    bitmask = (1 << n_bits) - 1
                    resetbuf = True
                    break

                incode = code
                stack = bytearray()

                if code >= free_entry:
                    if code > free_entry:
                        raise RefineryPartialResult('corrupt input.', out.getvalue())
                    stack.append(finchar)
                    code = oldcode
                while code >= 256:
                    stack.append(tab_suffix[code])
                    code = tab_prefix[code]

                finchar = tab_suffix[code]
                stack.append(finchar)
                stack.reverse()
                out.write(stack)
                code = free_entry

                if code < maxmaxcode:
                    tab_prefix[code] = oldcode & 0xFFFF
                    tab_suffix[code] = finchar & 0x00FF
                    free_entry = code + 1

                oldcode = incode

        return out.getvalue()

    @classmethod
    def handles(cls, data) -> bool | None:
        if data[:len(cls._MAGIC)] == cls._MAGIC:
            return True

Classes

class LZW (*args, **kwds)

Enum where members are also (and must be) ints

Expand source code Browse git
class LZW(IntEnum):
    INIT_BITS = 9
    BITS = 0x10
    CLEAR = 0x100
    FIRST = 0x101
    WSIZE = 0x8000
    EXTRA = 0x40

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var INIT_BITS

The type of the None singleton.

var BITS

The type of the None singleton.

var CLEAR

The type of the None singleton.

var FIRST

The type of the None singleton.

var WSIZE

The type of the None singleton.

var EXTRA

The type of the None singleton.

class lzw

LZW decompression based on ancient Unix sources.

Expand source code Browse git
class lzw(Unit):
    '''
    LZW decompression based on ancient Unix sources.
    '''

    _MAGIC = B'\x1F\x9D'

    def process(self, data: bytearray):
        out = MemoryFile()
        inf = StructReaderBits(memoryview(data))

        if inf.peek(2) != self._MAGIC:
            self.log_info('No LZW signature found, assuming raw stream.')
            maxbits = LZW.BITS
            block_mode = True
        else:
            inf.seekrel(2)
            maxbits = inf.read_integer(5)
            if inf.read_integer(2) != 0:
                self.log_info('reserved bits were set in LZW header')
            block_mode = bool(inf.read_bit())

        if maxbits > LZW.BITS:
            raise ValueError(F'Compressed with {maxbits} bits; cannot handle file.')

        maxmaxcode = 1 << maxbits

        ibuf = inf.read()

        tab_suffix = bytearray(LZW.WSIZE * 2)
        tab_prefix = array('H', itertools.repeat(0, 1 << LZW.BITS))

        n_bits = LZW.INIT_BITS
        maxcode = (1 << n_bits) - 1
        bitmask = (1 << n_bits) - 1
        oldcode = ~0
        finchar = +0
        posbits = +0

        free_entry = LZW.FIRST if block_mode else 0x100
        tab_suffix[:0x100] = range(0x100)
        resetbuf = True

        while resetbuf:
            resetbuf = False

            ibuf = ibuf[posbits >> 3:]
            insize = len(ibuf)
            posbits = 0
            inbits = (insize << 3) - (n_bits - 1)

            while inbits > posbits:
                if free_entry > maxcode:
                    n = n_bits << 3
                    p = posbits - 1
                    posbits = p + (n - (p + n) % n)
                    n_bits += 1
                    if (n_bits == maxbits):
                        maxcode = maxmaxcode
                    else:
                        maxcode = (1 << n_bits) - 1
                    bitmask = (1 << n_bits) - 1
                    resetbuf = True
                    break

                p = ibuf[posbits >> 3:]
                code = int.from_bytes(p[:3], 'little') >> (posbits & 7) & bitmask
                posbits += n_bits

                if oldcode == -1:
                    if code >= 256:
                        raise ValueError('corrupt input.')
                    oldcode = code
                    finchar = oldcode
                    out.write_byte(finchar)
                    continue

                if code == LZW.CLEAR and block_mode:
                    tab_prefix[:0x100] = array('H', itertools.repeat(0, 0x100))
                    free_entry = LZW.FIRST - 1
                    n = n_bits << 3
                    p = posbits - 1
                    posbits = p + (n - (p + n) % n)
                    n_bits = LZW.INIT_BITS
                    maxcode = (1 << n_bits) - 1
                    bitmask = (1 << n_bits) - 1
                    resetbuf = True
                    break

                incode = code
                stack = bytearray()

                if code >= free_entry:
                    if code > free_entry:
                        raise RefineryPartialResult('corrupt input.', out.getvalue())
                    stack.append(finchar)
                    code = oldcode
                while code >= 256:
                    stack.append(tab_suffix[code])
                    code = tab_prefix[code]

                finchar = tab_suffix[code]
                stack.append(finchar)
                stack.reverse()
                out.write(stack)
                code = free_entry

                if code < maxmaxcode:
                    tab_prefix[code] = oldcode & 0xFFFF
                    tab_suffix[code] = finchar & 0x00FF
                    free_entry = code + 1

                oldcode = incode

        return out.getvalue()

    @classmethod
    def handles(cls, data) -> bool | None:
        if data[:len(cls._MAGIC)] == cls._MAGIC:
            return True

Ancestors

Subclasses

Inherited members