Module refinery.units.compression.lzw

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from enum import IntEnum
from typing import Optional
from array import array

import itertools

from refinery.units import Unit, RefineryPartialResult
from refinery.lib.structures import MemoryFile, StructReader


class LZW(IntEnum):
    INIT_BITS = 9
    BITS = 0x10
    CLEAR = 0x100
    FIRST = 0x101
    WSIZE = 0x8000
    EXTRA = 0x40


class lzw(Unit):
    '''
    LZW decompression based on ancient Unix sources.
    '''

    _MAGIC = B'\x1F\x9D'

    def process(self, data: bytearray):
        out = MemoryFile()
        inf = StructReader(memoryview(data))

        if inf.peek(2) != self._MAGIC:
            self.log_info('No LZW signature found, assuming raw stream.')
            maxbits = LZW.BITS
            block_mode = True
        else:
            inf.seekrel(2)
            maxbits = inf.read_integer(5)
            if inf.read_integer(2) != 0:
                self.log_info('reserved bits were set in LZW header')
            block_mode = bool(inf.read_bit())

        if maxbits > LZW.BITS:
            raise ValueError(F'Compressed with {maxbits} bits; cannot handle file.')

        maxmaxcode = 1 << maxbits

        ibuf = inf.read()

        tab_suffix = bytearray(LZW.WSIZE * 2)
        tab_prefix = array('H', itertools.repeat(0, 1 << LZW.BITS))

        n_bits = LZW.INIT_BITS
        maxcode = (1 << n_bits) - 1
        bitmask = (1 << n_bits) - 1
        oldcode = ~0
        finchar = +0
        posbits = +0

        free_entry = LZW.FIRST if block_mode else 0x100
        tab_suffix[:0x100] = range(0x100)
        resetbuf = True

        while resetbuf:
            resetbuf = False

            ibuf = ibuf[posbits >> 3:]
            insize = len(ibuf)
            posbits = 0

            if insize < LZW.EXTRA:
                inbits = (insize - insize % n_bits) << 3
            else:
                inbits = (insize << 3) - (n_bits - 1)

            while inbits > posbits:
                if free_entry > maxcode:
                    n = n_bits << 3
                    p = posbits - 1
                    posbits = p + (n - (p + n) % n)
                    n_bits += 1
                    if (n_bits == maxbits):
                        maxcode = maxmaxcode
                    else:
                        maxcode = (1 << n_bits) - 1
                    bitmask = (1 << n_bits) - 1
                    resetbuf = True
                    break

                p = ibuf[posbits >> 3:]
                code = int.from_bytes(p[:3], 'little') >> (posbits & 7) & bitmask
                posbits += n_bits

                if oldcode == -1:
                    if code >= 256:
                        raise ValueError('corrupt input.')
                    oldcode = code
                    finchar = oldcode
                    out.write_byte(finchar)
                    continue

                if code == LZW.CLEAR and block_mode:
                    tab_prefix[:0x100] = array('H', itertools.repeat(0, 0x100))
                    free_entry = LZW.FIRST - 1
                    n = n_bits << 3
                    p = posbits - 1
                    posbits = p + (n - (p + n) % n)
                    n_bits = LZW.INIT_BITS
                    maxcode = (1 << n_bits) - 1
                    bitmask = (1 << n_bits) - 1
                    resetbuf = True
                    break

                incode = code
                stack = bytearray()

                if code >= free_entry:
                    if code > free_entry:
                        raise RefineryPartialResult('corrupt input.', out.getbuffer())
                    stack.append(finchar)
                    code = oldcode
                while code >= 256:
                    stack.append(tab_suffix[code])
                    code = tab_prefix[code]

                finchar = tab_suffix[code]
                stack.append(finchar)
                stack.reverse()
                out.write(stack)
                code = free_entry

                if code < maxmaxcode:
                    tab_prefix[code] = oldcode & 0xFFFF
                    tab_suffix[code] = finchar & 0x00FF
                    free_entry = code + 1

                oldcode = incode

        return out.getvalue()

    @classmethod
    def handles(self, data: bytearray) -> Optional[bool]:
        sig = self._MAGIC
        if data[:len(sig)] == sig:
            return True

Classes

class LZW (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code Browse git
class LZW(IntEnum):
    INIT_BITS = 9
    BITS = 0x10
    CLEAR = 0x100
    FIRST = 0x101
    WSIZE = 0x8000
    EXTRA = 0x40

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var INIT_BITS
var BITS
var CLEAR
var FIRST
var WSIZE
var EXTRA
class lzw

LZW decompression based on ancient Unix sources.

Expand source code Browse git
class lzw(Unit):
    '''
    LZW decompression based on ancient Unix sources.
    '''

    _MAGIC = B'\x1F\x9D'

    def process(self, data: bytearray):
        out = MemoryFile()
        inf = StructReader(memoryview(data))

        if inf.peek(2) != self._MAGIC:
            self.log_info('No LZW signature found, assuming raw stream.')
            maxbits = LZW.BITS
            block_mode = True
        else:
            inf.seekrel(2)
            maxbits = inf.read_integer(5)
            if inf.read_integer(2) != 0:
                self.log_info('reserved bits were set in LZW header')
            block_mode = bool(inf.read_bit())

        if maxbits > LZW.BITS:
            raise ValueError(F'Compressed with {maxbits} bits; cannot handle file.')

        maxmaxcode = 1 << maxbits

        ibuf = inf.read()

        tab_suffix = bytearray(LZW.WSIZE * 2)
        tab_prefix = array('H', itertools.repeat(0, 1 << LZW.BITS))

        n_bits = LZW.INIT_BITS
        maxcode = (1 << n_bits) - 1
        bitmask = (1 << n_bits) - 1
        oldcode = ~0
        finchar = +0
        posbits = +0

        free_entry = LZW.FIRST if block_mode else 0x100
        tab_suffix[:0x100] = range(0x100)
        resetbuf = True

        while resetbuf:
            resetbuf = False

            ibuf = ibuf[posbits >> 3:]
            insize = len(ibuf)
            posbits = 0

            if insize < LZW.EXTRA:
                inbits = (insize - insize % n_bits) << 3
            else:
                inbits = (insize << 3) - (n_bits - 1)

            while inbits > posbits:
                if free_entry > maxcode:
                    n = n_bits << 3
                    p = posbits - 1
                    posbits = p + (n - (p + n) % n)
                    n_bits += 1
                    if (n_bits == maxbits):
                        maxcode = maxmaxcode
                    else:
                        maxcode = (1 << n_bits) - 1
                    bitmask = (1 << n_bits) - 1
                    resetbuf = True
                    break

                p = ibuf[posbits >> 3:]
                code = int.from_bytes(p[:3], 'little') >> (posbits & 7) & bitmask
                posbits += n_bits

                if oldcode == -1:
                    if code >= 256:
                        raise ValueError('corrupt input.')
                    oldcode = code
                    finchar = oldcode
                    out.write_byte(finchar)
                    continue

                if code == LZW.CLEAR and block_mode:
                    tab_prefix[:0x100] = array('H', itertools.repeat(0, 0x100))
                    free_entry = LZW.FIRST - 1
                    n = n_bits << 3
                    p = posbits - 1
                    posbits = p + (n - (p + n) % n)
                    n_bits = LZW.INIT_BITS
                    maxcode = (1 << n_bits) - 1
                    bitmask = (1 << n_bits) - 1
                    resetbuf = True
                    break

                incode = code
                stack = bytearray()

                if code >= free_entry:
                    if code > free_entry:
                        raise RefineryPartialResult('corrupt input.', out.getbuffer())
                    stack.append(finchar)
                    code = oldcode
                while code >= 256:
                    stack.append(tab_suffix[code])
                    code = tab_prefix[code]

                finchar = tab_suffix[code]
                stack.append(finchar)
                stack.reverse()
                out.write(stack)
                code = free_entry

                if code < maxmaxcode:
                    tab_prefix[code] = oldcode & 0xFFFF
                    tab_suffix[code] = finchar & 0x00FF
                    free_entry = code + 1

                oldcode = incode

        return out.getvalue()

    @classmethod
    def handles(self, data: bytearray) -> Optional[bool]:
        sig = self._MAGIC
        if data[:len(sig)] == sig:
            return True

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Inherited members