Module refinery.lib.seven.deflate

Structures for unpacking ZIP archives. This can cover a lot more than the built-in zipfile module, but it is incapable of creating ZIP archives.

Expand source code Browse git
"""
Structures for unpacking ZIP archives. This can cover a lot more than the built-in zipfile module,
but it is incapable of creating ZIP archives.
"""
from __future__ import annotations

import enum

from typing import Callable

from refinery.lib.array import make_array
from refinery.lib.seven.huffman import BitDecoderBase, HuffmanDecoder, HuffmanDecoder7b
from refinery.lib.structures import StructReader

kNumHuffmanBits = 15
kHistorySize32 = (1 << 15)
kHistorySize64 = (1 << 16)
kDistTableSize32 = 30
kDistTableSize64 = 32
kNumLenSymbols32 = 256
kNumLenSymbols64 = 255
kNumLenSymbolsMax = kNumLenSymbols32

kNumLenSlots = 29

kFixedDistTableSize = 32
kFixedLenTableSize = 31

kSymbolEndOfBlock = 0x100
kSymbolMatch = kSymbolEndOfBlock + 1

kMainTableSize = kSymbolMatch + kNumLenSlots
kFixedMainTableSize = kSymbolMatch + kFixedLenTableSize

kLevelTableSize = 19

kTableDirectLevels = 16
kTableLevelRepNumber = kTableDirectLevels
kTableLevel0Number = kTableLevelRepNumber + 1
kTableLevel0Number2 = kTableLevel0Number + 1

kLevelMask = 0xF

kLenStartXX = (
    B'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x0a\x0c\x0e\x10\x14\x18\x1c'
    B'\x20\x28\x30\x38\x40\x50\x60\x70\x80\xa0\xc0\xe0'
)
kLenStart32 = kLenStartXX + B'\xff\x00\x00'
kLenStart64 = kLenStartXX + B'\x00\x00\x00'

kLenDirectBitsXX = (
    B'\x00\x00\x00\x00\x00\x00\x00\x00\x01\x01\x01\x01\x02\x02\x02\x02'
    B'\x03\x03\x03\x03\x04\x04\x04\x04\x05\x05\x05\x05'
)
kLenDirectBits32 = kLenDirectBitsXX + B'\x00\x00\x00'
kLenDirectBits64 = kLenDirectBitsXX + B'\x10\x00\x00'

kDistStart = make_array(4, init=[
    0x0000, 0x0001, 0x0002, 0x0003, 0x0004, 0x0006, 0x0008, 0x000C,
    0x0010, 0x0018, 0x0020, 0x0030, 0x0040, 0x0060, 0x0080, 0x00C0,
    0x0100, 0x0180, 0x0200, 0x0300, 0x0400, 0x0600, 0x0800, 0x0C00,
    0x1000, 0x1800, 0x2000, 0x3000, 0x4000, 0x6000, 0x8000, 0xC000,
])
kDistDirectBits = (
    B'\x00\x00\x00\x00\x01\x01\x02\x02\x03\x03\x04\x04\x05\x05\x06\x06'
    B'\x07\x07\x08\x08\x09\x09\x0a\x0a\x0b\x0b\x0c\x0c\x0d\x0d\x0e\x0e'
)

kLevelDirectBits = B'\02\03\07'
kCodeLengthAlphabetOrder = B'\x10\x11\x12\x00\x08\x07\x09\x06\x0a\x05\x0b\x04\x0c\x03\x0d\x02\x0e\x01\x0f'

kMatchMinLen = 3
kMatchMaxLen32 = kNumLenSymbols32 + kMatchMinLen - 1 // 256 + 2
kMatchMaxLen64 = kNumLenSymbols64 + kMatchMinLen - 1 // 255 + 2
kMatchMaxLen = kMatchMaxLen32

kFinalBlockFieldSize = 1

kBlockTypeFieldSize = 2

kNumLenCodesFieldSize = 5
kNumDistCodesFieldSize = 5
kNumLevelCodesFieldSize = 4

kNumLitLenCodesMin = 257
kNumDistCodesMin = 1
kNumLevelCodesMin = 4

kLevelFieldSize = 3

kStoredBlockLengthFieldSize = 16


class FinalBlockField(enum.IntEnum):
    NotFinalBlock = 0
    FinalBlock = 1


class BlockType(enum.IntEnum):
    Stored = 0
    FixedHuffman = 1
    DynamicHuffman = 2


class DecoderLevels:
    main_levels: bytearray
    dist_levels: bytearray

    def __init__(self) -> None:
        self.main_levels = bytearray(kFixedMainTableSize)
        self.dist_levels = bytearray(kFixedDistTableSize)

    def sub_clear(self):
        for i in range(kNumLitLenCodesMin, kFixedMainTableSize):
            self.main_levels[i] = 0
        for i in range(kFixedDistTableSize):
            self.dist_levels[i] = 0

    def set_fixed_levels(self):
        i = 0
        for i in range(144):
            self.main_levels[i] = 8
        for i in range(144, 256):
            self.main_levels[i] = 9
        for i in range(256, 280):
            self.main_levels[i] = 7
        for i in range(280, 288):
            self.main_levels[i] = 8
        for i in range(kFixedDistTableSize):
            self.dist_levels[i] = 5


kLenIdFinished = -1
kLenIdNeedInit = -2


kNumBigValueBits = 8 * 4
kNumValueBytes = 3
kNumValueBits = 8 * kNumValueBytes
kMask = (1 << kNumValueBits) - 1
kInvertTable = bytes(
    ((v * 0x202020202) & 0x10884422010) % 1023 for v in range(0x100))


class BitLDecoderBase:
    __slots__ = (
        '_bit_pos',
        '_value',
        '_stream',
        '_num_extra_bytes',
        'read_direct_byte',
    )

    read_direct_byte: Callable[[], int]

    def __init__(self, reader: StructReader):
        def _rdb():
            try:
                return u8fast()
            except Exception:
                self._num_extra_bytes += 1
                return 0xFF
        self._bit_pos = kNumBigValueBits
        self._value = 0
        self._stream = reader
        self._num_extra_bytes = 0
        u8fast = reader.u8fast
        self.read_direct_byte = _rdb

    def get_stream_size(self):
        if self.extra_bits_were_read():
            return len(self._stream)
        else:
            return self.tell()

    def tell(self):
        return self._stream.tell() - ((kNumBigValueBits - self._bit_pos) >> 3)

    def there_are_data_in_bit_buffer(self):
        return self._bit_pos != kNumBigValueBits

    def normalize(self):
        while self._bit_pos >= 8:
            self._value = (self.read_direct_byte() << (kNumBigValueBits - self._bit_pos)) | self._value
            self._bit_pos -= 8

    def read_bits(self, numBits: int):
        self.normalize()
        res = self._value & ((1 << numBits) - 1)
        self._bit_pos += numBits
        self._value >>= numBits
        return res

    def extra_bits_were_read(self):
        return (self._num_extra_bytes > 4 or kNumBigValueBits - self._bit_pos < (self._num_extra_bytes << 3))

    def extra_bits_were_read_fast(self):
        return self._num_extra_bytes > 4


class BitLDecoder(BitDecoderBase, BitLDecoderBase):

    __slots__ = '_normal_value',

    def __init__(self, reader: StructReader):
        super().__init__(reader)
        self._normal_value = 0

    def normalize(self):
        p = self._bit_pos
        if p < 8:
            return
        v = self._value
        n = self._normal_value
        while p >= 8:
            b = self.read_direct_byte()
            n |= b << (kNumBigValueBits - p)
            p -= 8
            v = ((v & 0xFFFFFF) << 8) | kInvertTable[b]
        self._bit_pos = p
        self._value = v
        self._normal_value = n

    def get_value(self, num_bits: int):
        self.normalize()
        return ((self._value >> (8 - self._bit_pos)) & kMask) >> (kNumValueBits - num_bits)

    def move_position(self, num_bits: int):
        self._bit_pos += num_bits
        self._normal_value >>= num_bits

    def read_bits(self, numBits: int):
        self.normalize()
        res = self._normal_value & ((1 << numBits) - 1)
        self.move_position(numBits)
        return res

    def align_to_byte(self):
        self.move_position((32 - self._bit_pos) & 7)

    def read_aligned_byte(self):
        if self._bit_pos == kNumBigValueBits:
            return self.read_direct_byte()
        b = self._normal_value & 0xFF
        self.move_position(8)
        return b

    def read_aligned_byte_from_buffer(self):
        if self._num_extra_bytes != 0:
            if self.extra_bits_were_read():
                return None
        return self.read_aligned_byte()


def replay(buffer: bytearray, offset: int, length: int):
    cursor = len(buffer)
    rep, r = divmod(length, offset)
    offset = cursor - offset
    replay = buffer[offset:offset + r]
    if rep > 0:
        prefix = buffer[offset:cursor]
        for _ in range(rep):
            buffer.extend(prefix)
    buffer.extend(replay)


class Deflate:
    def __init__(
        self,
        dst: bytearray,
        src: StructReader,
        df64: bool = False,
        nsis: bool = False,
        zlib: bool = False,
    ):
        self.dst = dst
        self.src = src
        self.bits = BitLDecoder(src)
        self.main_decoder = HuffmanDecoder(kNumHuffmanBits, kFixedMainTableSize)
        self.dist_decoder = HuffmanDecoder(kNumHuffmanBits, kFixedDistTableSize)
        self.level_decoder = HuffmanDecoder7b(kLevelTableSize)
        self.stored_block_size = 0
        self.is_final_block = False
        self.stored_mode = False
        self.zlib_tail = bytearray(4)
        self.zlib_mode = zlib
        self.nsis_mode = nsis
        self.deflate64 = df64
        self.keep_history = False
        self._num_dist_levels = 0
        self._need_to_finish_input = False
        self._need_to_read_table = True
        self._leftover_replay_size = 0
        self._leftover_replay_dist = 0
        self._out_size = 0
        self._out_start_pos = 0

    @property
    def _out_size_defined(self):
        return self._out_size > 0

    def decode_levels(self, levels: memoryview, numSymbols: int):
        bits = self.bits
        i = 0
        while i < numSymbols:
            sym = self.level_decoder.decode(self.bits)
            if sym < kTableDirectLevels:
                levels[i] = sym
                i += 1
                continue
            if sym >= kLevelTableSize:
                return False
            if sym == kTableLevelRepNumber:
                if i == 0:
                    return False
                numBits = 2
                num = 0
                symbol = levels[i - 1]
            else:
                sym -= kTableLevel0Number
                sym <<= 2
                numBits = 3 + sym
                num = sym << 1
                symbol = 0
            num += i + 3 + bits.read_bits(numBits)
            if num > numSymbols:
                return False
            while True:
                levels[i] = symbol
                i += 1
                if i >= num:
                    break
        return True

    def read_tables(self):
        bits = self.bits
        self.is_final_block = (bits.read_bits(kFinalBlockFieldSize) == FinalBlockField.FinalBlock)
        if self.bits.extra_bits_were_read():
            return False
        blockType = bits.read_bits(kBlockTypeFieldSize)
        if blockType > BlockType.DynamicHuffman:
            return False
        if self.bits.extra_bits_were_read():
            return False
        if blockType == BlockType.Stored:
            self.stored_mode = True
            self.bits.align_to_byte()
            self.stored_block_size = self.read_aligned_u16()
            if self.nsis_mode:
                return True
            return (self.stored_block_size == ~self.read_aligned_u16() & 0xFFFF)
        else:
            self.stored_mode = False

        levels = DecoderLevels()

        if blockType == BlockType.FixedHuffman:
            levels.set_fixed_levels()
            self._num_dist_levels = kDistTableSize64 if self.deflate64 else kDistTableSize32
        else:
            numLitLenLevels = bits.read_bits(kNumLenCodesFieldSize) + kNumLitLenCodesMin
            self._num_dist_levels = bits.read_bits(kNumDistCodesFieldSize) + kNumDistCodesMin
            numLevelCodes = bits.read_bits(kNumLevelCodesFieldSize) + kNumLevelCodesMin
            if not self.deflate64:
                if self._num_dist_levels > kDistTableSize32:
                    return False
            levelLevels = bytearray(kLevelTableSize)
            for i in range(kLevelTableSize):
                position = kCodeLengthAlphabetOrder[i]
                if i < numLevelCodes:
                    levelLevels[position] = bits.read_bits(kLevelFieldSize)
                else:
                    levelLevels[position] = 0

            if self.bits.extra_bits_were_read():
                return False

            if not self.level_decoder.build(levelLevels):
                return False

            b_tmpLevels = bytearray(kFixedMainTableSize + kFixedDistTableSize)
            tmpLevels = memoryview(b_tmpLevels)
            if not self.decode_levels(tmpLevels, numLitLenLevels + self._num_dist_levels):
                return False
            if self.bits.extra_bits_were_read():
                return False
            levels.sub_clear()
            levels.main_levels[:numLitLenLevels] = tmpLevels[:numLitLenLevels]
            levels.dist_levels[:self._num_dist_levels] = tmpLevels[numLitLenLevels:][:self._num_dist_levels]
        if not self.main_decoder.build(levels.main_levels):
            return False
        return self.dist_decoder.build(levels.dist_levels)

    def decode_block(self, size: int, finish_input_stream: bool):
        bits = self.bits
        dst = self.dst
        main_decoder = self.main_decoder
        dist_decoder = self.dist_decoder
        write_byte = dst.append

        if self._leftover_replay_size == kLenIdFinished:
            return True
        if self._leftover_replay_size == kLenIdNeedInit:
            if not self.keep_history:
                dst.clear()
            self.is_final_block = False
            self._leftover_replay_size = 0
            self._need_to_read_table = True
        if carry := min(self._leftover_replay_size, size):
            size -= carry
            replay(dst, self._leftover_replay_dist + 1, carry)
            self._leftover_replay_size -= carry
        while size > 0 or finish_input_stream:
            if bits.extra_bits_were_read():
                return False
            if self._need_to_read_table:
                if self.is_final_block:
                    self._leftover_replay_size = kLenIdFinished
                    break
                if not self.read_tables():
                    return False
                if bits.extra_bits_were_read():
                    return False
                self._need_to_read_table = False
            if self.stored_mode:
                if finish_input_stream and size == 0 and self.stored_block_size != 0:
                    return False
                # NSIS version contains some bits in bitl bits buffer.
                # So we must read some first bytes via ReadAlignedByte
                while self.stored_block_size > 0 and size > 0 and bits.there_are_data_in_bit_buffer():
                    write_byte(bits.read_aligned_byte())
                    self.stored_block_size -= 1
                    size -= 1
                while self.stored_block_size > 0 and size > 0:
                    write_byte(bits.read_direct_byte())
                    self.stored_block_size -= 1
                    size -= 1
                self._need_to_read_table = self.stored_block_size == 0
                continue
            while size > 0:
                if bits.extra_bits_were_read_fast():
                    return False
                if (sym := main_decoder.decode(bits)) < 0x100:
                    write_byte(sym)
                    size -= 1
                    continue
                elif sym == kSymbolEndOfBlock:
                    self._need_to_read_table = True
                    break
                elif sym >= kMainTableSize:
                    return False
                else:
                    sym -= kSymbolMatch
                    if self.deflate64:
                        length = kLenStart64[sym]
                        n_bits = kLenDirectBits64[sym]
                    else:
                        length = kLenStart32[sym]
                        n_bits = kLenDirectBits32[sym]
                    length += kMatchMinLen + bits.read_bits(n_bits)
                    loc = min(length, size)
                    sym = dist_decoder.decode(bits)
                    if sym >= self._num_dist_levels:
                        return False
                    sym = kDistStart[sym] + bits.read_bits(kDistDirectBits[sym])
                    replay(dst, sym + 1, loc)
                    size -= loc
                    length -= loc
                    if length != 0:
                        self._leftover_replay_size = length
                        self._leftover_replay_dist = sym
                        break
            if finish_input_stream and size == 0:
                if main_decoder.decode(bits) != kSymbolEndOfBlock:
                    return False
                self._need_to_read_table = True
        return not bits.extra_bits_were_read()

    def decode_real(self):
        while True:
            size = 1 << 20
            finish_input_stream = False
            if self._out_size_defined:
                rem = self._out_size - (len(self.dst) - self._out_start_pos)
                if size >= rem:
                    size = rem
                    if self.zlib_mode or self._need_to_finish_input:
                        finish_input_stream = True
            if not finish_input_stream and size == 0:
                break
            if not self.decode_block(size, finish_input_stream):
                return False
            if self._leftover_replay_size == kLenIdFinished:
                break
        if self._leftover_replay_size == kLenIdFinished and self.zlib_mode:
            self.bits.align_to_byte()
            for i in range(4):
                self.zlib_tail[i] = self.bits.read_aligned_byte()
        return True

    def initialize_out_stream_for_resume(self, out_size: int = 0):
        if not self.keep_history:
            self.dst.clear()
        self._out_size = out_size
        self._out_start_pos = len(self.dst)
        self._leftover_replay_size = kLenIdNeedInit

    def decode(self, out_size: int = 0):
        self.initialize_out_stream_for_resume(out_size)
        return self.decode_real()

    def is_finished(self):
        return self._leftover_replay_size == kLenIdFinished

    def read_aligned_u16(self):
        b = self.bits
        v = b.read_aligned_byte()
        return v | (b.read_aligned_byte() << 8)

    def had_input_eof_error(self):
        return self.bits.extra_bits_were_read()

Functions

def replay(buffer, offset, length)
Expand source code Browse git
def replay(buffer: bytearray, offset: int, length: int):
    cursor = len(buffer)
    rep, r = divmod(length, offset)
    offset = cursor - offset
    replay = buffer[offset:offset + r]
    if rep > 0:
        prefix = buffer[offset:cursor]
        for _ in range(rep):
            buffer.extend(prefix)
    buffer.extend(replay)

Classes

class FinalBlockField (*args, **kwds)

Enum where members are also (and must be) ints

Expand source code Browse git
class FinalBlockField(enum.IntEnum):
    NotFinalBlock = 0
    FinalBlock = 1

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var NotFinalBlock

The type of the None singleton.

var FinalBlock

The type of the None singleton.

class BlockType (*args, **kwds)

Enum where members are also (and must be) ints

Expand source code Browse git
class BlockType(enum.IntEnum):
    Stored = 0
    FixedHuffman = 1
    DynamicHuffman = 2

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var Stored

The type of the None singleton.

var FixedHuffman

The type of the None singleton.

var DynamicHuffman

The type of the None singleton.

class DecoderLevels
Expand source code Browse git
class DecoderLevels:
    main_levels: bytearray
    dist_levels: bytearray

    def __init__(self) -> None:
        self.main_levels = bytearray(kFixedMainTableSize)
        self.dist_levels = bytearray(kFixedDistTableSize)

    def sub_clear(self):
        for i in range(kNumLitLenCodesMin, kFixedMainTableSize):
            self.main_levels[i] = 0
        for i in range(kFixedDistTableSize):
            self.dist_levels[i] = 0

    def set_fixed_levels(self):
        i = 0
        for i in range(144):
            self.main_levels[i] = 8
        for i in range(144, 256):
            self.main_levels[i] = 9
        for i in range(256, 280):
            self.main_levels[i] = 7
        for i in range(280, 288):
            self.main_levels[i] = 8
        for i in range(kFixedDistTableSize):
            self.dist_levels[i] = 5

Class variables

var main_levels

The type of the None singleton.

var dist_levels

The type of the None singleton.

Methods

def sub_clear(self)
Expand source code Browse git
def sub_clear(self):
    for i in range(kNumLitLenCodesMin, kFixedMainTableSize):
        self.main_levels[i] = 0
    for i in range(kFixedDistTableSize):
        self.dist_levels[i] = 0
def set_fixed_levels(self)
Expand source code Browse git
def set_fixed_levels(self):
    i = 0
    for i in range(144):
        self.main_levels[i] = 8
    for i in range(144, 256):
        self.main_levels[i] = 9
    for i in range(256, 280):
        self.main_levels[i] = 7
    for i in range(280, 288):
        self.main_levels[i] = 8
    for i in range(kFixedDistTableSize):
        self.dist_levels[i] = 5
class BitLDecoderBase (reader)
Expand source code Browse git
class BitLDecoderBase:
    __slots__ = (
        '_bit_pos',
        '_value',
        '_stream',
        '_num_extra_bytes',
        'read_direct_byte',
    )

    read_direct_byte: Callable[[], int]

    def __init__(self, reader: StructReader):
        def _rdb():
            try:
                return u8fast()
            except Exception:
                self._num_extra_bytes += 1
                return 0xFF
        self._bit_pos = kNumBigValueBits
        self._value = 0
        self._stream = reader
        self._num_extra_bytes = 0
        u8fast = reader.u8fast
        self.read_direct_byte = _rdb

    def get_stream_size(self):
        if self.extra_bits_were_read():
            return len(self._stream)
        else:
            return self.tell()

    def tell(self):
        return self._stream.tell() - ((kNumBigValueBits - self._bit_pos) >> 3)

    def there_are_data_in_bit_buffer(self):
        return self._bit_pos != kNumBigValueBits

    def normalize(self):
        while self._bit_pos >= 8:
            self._value = (self.read_direct_byte() << (kNumBigValueBits - self._bit_pos)) | self._value
            self._bit_pos -= 8

    def read_bits(self, numBits: int):
        self.normalize()
        res = self._value & ((1 << numBits) - 1)
        self._bit_pos += numBits
        self._value >>= numBits
        return res

    def extra_bits_were_read(self):
        return (self._num_extra_bytes > 4 or kNumBigValueBits - self._bit_pos < (self._num_extra_bytes << 3))

    def extra_bits_were_read_fast(self):
        return self._num_extra_bytes > 4

Subclasses

Instance variables

var read_direct_byte
Expand source code Browse git
class BitLDecoderBase:
    __slots__ = (
        '_bit_pos',
        '_value',
        '_stream',
        '_num_extra_bytes',
        'read_direct_byte',
    )

    read_direct_byte: Callable[[], int]

    def __init__(self, reader: StructReader):
        def _rdb():
            try:
                return u8fast()
            except Exception:
                self._num_extra_bytes += 1
                return 0xFF
        self._bit_pos = kNumBigValueBits
        self._value = 0
        self._stream = reader
        self._num_extra_bytes = 0
        u8fast = reader.u8fast
        self.read_direct_byte = _rdb

    def get_stream_size(self):
        if self.extra_bits_were_read():
            return len(self._stream)
        else:
            return self.tell()

    def tell(self):
        return self._stream.tell() - ((kNumBigValueBits - self._bit_pos) >> 3)

    def there_are_data_in_bit_buffer(self):
        return self._bit_pos != kNumBigValueBits

    def normalize(self):
        while self._bit_pos >= 8:
            self._value = (self.read_direct_byte() << (kNumBigValueBits - self._bit_pos)) | self._value
            self._bit_pos -= 8

    def read_bits(self, numBits: int):
        self.normalize()
        res = self._value & ((1 << numBits) - 1)
        self._bit_pos += numBits
        self._value >>= numBits
        return res

    def extra_bits_were_read(self):
        return (self._num_extra_bytes > 4 or kNumBigValueBits - self._bit_pos < (self._num_extra_bytes << 3))

    def extra_bits_were_read_fast(self):
        return self._num_extra_bytes > 4

Methods

def get_stream_size(self)
Expand source code Browse git
def get_stream_size(self):
    if self.extra_bits_were_read():
        return len(self._stream)
    else:
        return self.tell()
def tell(self)
Expand source code Browse git
def tell(self):
    return self._stream.tell() - ((kNumBigValueBits - self._bit_pos) >> 3)
def there_are_data_in_bit_buffer(self)
Expand source code Browse git
def there_are_data_in_bit_buffer(self):
    return self._bit_pos != kNumBigValueBits
def normalize(self)
Expand source code Browse git
def normalize(self):
    while self._bit_pos >= 8:
        self._value = (self.read_direct_byte() << (kNumBigValueBits - self._bit_pos)) | self._value
        self._bit_pos -= 8
def read_bits(self, numBits)
Expand source code Browse git
def read_bits(self, numBits: int):
    self.normalize()
    res = self._value & ((1 << numBits) - 1)
    self._bit_pos += numBits
    self._value >>= numBits
    return res
def extra_bits_were_read(self)
Expand source code Browse git
def extra_bits_were_read(self):
    return (self._num_extra_bytes > 4 or kNumBigValueBits - self._bit_pos < (self._num_extra_bytes << 3))
def extra_bits_were_read_fast(self)
Expand source code Browse git
def extra_bits_were_read_fast(self):
    return self._num_extra_bytes > 4
class BitLDecoder (reader)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code Browse git
class BitLDecoder(BitDecoderBase, BitLDecoderBase):

    __slots__ = '_normal_value',

    def __init__(self, reader: StructReader):
        super().__init__(reader)
        self._normal_value = 0

    def normalize(self):
        p = self._bit_pos
        if p < 8:
            return
        v = self._value
        n = self._normal_value
        while p >= 8:
            b = self.read_direct_byte()
            n |= b << (kNumBigValueBits - p)
            p -= 8
            v = ((v & 0xFFFFFF) << 8) | kInvertTable[b]
        self._bit_pos = p
        self._value = v
        self._normal_value = n

    def get_value(self, num_bits: int):
        self.normalize()
        return ((self._value >> (8 - self._bit_pos)) & kMask) >> (kNumValueBits - num_bits)

    def move_position(self, num_bits: int):
        self._bit_pos += num_bits
        self._normal_value >>= num_bits

    def read_bits(self, numBits: int):
        self.normalize()
        res = self._normal_value & ((1 << numBits) - 1)
        self.move_position(numBits)
        return res

    def align_to_byte(self):
        self.move_position((32 - self._bit_pos) & 7)

    def read_aligned_byte(self):
        if self._bit_pos == kNumBigValueBits:
            return self.read_direct_byte()
        b = self._normal_value & 0xFF
        self.move_position(8)
        return b

    def read_aligned_byte_from_buffer(self):
        if self._num_extra_bytes != 0:
            if self.extra_bits_were_read():
                return None
        return self.read_aligned_byte()

Ancestors

Methods

def normalize(self)
Expand source code Browse git
def normalize(self):
    p = self._bit_pos
    if p < 8:
        return
    v = self._value
    n = self._normal_value
    while p >= 8:
        b = self.read_direct_byte()
        n |= b << (kNumBigValueBits - p)
        p -= 8
        v = ((v & 0xFFFFFF) << 8) | kInvertTable[b]
    self._bit_pos = p
    self._value = v
    self._normal_value = n
def get_value(self, num_bits)
Expand source code Browse git
def get_value(self, num_bits: int):
    self.normalize()
    return ((self._value >> (8 - self._bit_pos)) & kMask) >> (kNumValueBits - num_bits)
def move_position(self, num_bits)
Expand source code Browse git
def move_position(self, num_bits: int):
    self._bit_pos += num_bits
    self._normal_value >>= num_bits
def read_bits(self, numBits)
Expand source code Browse git
def read_bits(self, numBits: int):
    self.normalize()
    res = self._normal_value & ((1 << numBits) - 1)
    self.move_position(numBits)
    return res
def align_to_byte(self)
Expand source code Browse git
def align_to_byte(self):
    self.move_position((32 - self._bit_pos) & 7)
def read_aligned_byte(self)
Expand source code Browse git
def read_aligned_byte(self):
    if self._bit_pos == kNumBigValueBits:
        return self.read_direct_byte()
    b = self._normal_value & 0xFF
    self.move_position(8)
    return b
def read_aligned_byte_from_buffer(self)
Expand source code Browse git
def read_aligned_byte_from_buffer(self):
    if self._num_extra_bytes != 0:
        if self.extra_bits_were_read():
            return None
    return self.read_aligned_byte()
class Deflate (dst, src, df64=False, nsis=False, zlib=False)
Expand source code Browse git
class Deflate:
    def __init__(
        self,
        dst: bytearray,
        src: StructReader,
        df64: bool = False,
        nsis: bool = False,
        zlib: bool = False,
    ):
        self.dst = dst
        self.src = src
        self.bits = BitLDecoder(src)
        self.main_decoder = HuffmanDecoder(kNumHuffmanBits, kFixedMainTableSize)
        self.dist_decoder = HuffmanDecoder(kNumHuffmanBits, kFixedDistTableSize)
        self.level_decoder = HuffmanDecoder7b(kLevelTableSize)
        self.stored_block_size = 0
        self.is_final_block = False
        self.stored_mode = False
        self.zlib_tail = bytearray(4)
        self.zlib_mode = zlib
        self.nsis_mode = nsis
        self.deflate64 = df64
        self.keep_history = False
        self._num_dist_levels = 0
        self._need_to_finish_input = False
        self._need_to_read_table = True
        self._leftover_replay_size = 0
        self._leftover_replay_dist = 0
        self._out_size = 0
        self._out_start_pos = 0

    @property
    def _out_size_defined(self):
        return self._out_size > 0

    def decode_levels(self, levels: memoryview, numSymbols: int):
        bits = self.bits
        i = 0
        while i < numSymbols:
            sym = self.level_decoder.decode(self.bits)
            if sym < kTableDirectLevels:
                levels[i] = sym
                i += 1
                continue
            if sym >= kLevelTableSize:
                return False
            if sym == kTableLevelRepNumber:
                if i == 0:
                    return False
                numBits = 2
                num = 0
                symbol = levels[i - 1]
            else:
                sym -= kTableLevel0Number
                sym <<= 2
                numBits = 3 + sym
                num = sym << 1
                symbol = 0
            num += i + 3 + bits.read_bits(numBits)
            if num > numSymbols:
                return False
            while True:
                levels[i] = symbol
                i += 1
                if i >= num:
                    break
        return True

    def read_tables(self):
        bits = self.bits
        self.is_final_block = (bits.read_bits(kFinalBlockFieldSize) == FinalBlockField.FinalBlock)
        if self.bits.extra_bits_were_read():
            return False
        blockType = bits.read_bits(kBlockTypeFieldSize)
        if blockType > BlockType.DynamicHuffman:
            return False
        if self.bits.extra_bits_were_read():
            return False
        if blockType == BlockType.Stored:
            self.stored_mode = True
            self.bits.align_to_byte()
            self.stored_block_size = self.read_aligned_u16()
            if self.nsis_mode:
                return True
            return (self.stored_block_size == ~self.read_aligned_u16() & 0xFFFF)
        else:
            self.stored_mode = False

        levels = DecoderLevels()

        if blockType == BlockType.FixedHuffman:
            levels.set_fixed_levels()
            self._num_dist_levels = kDistTableSize64 if self.deflate64 else kDistTableSize32
        else:
            numLitLenLevels = bits.read_bits(kNumLenCodesFieldSize) + kNumLitLenCodesMin
            self._num_dist_levels = bits.read_bits(kNumDistCodesFieldSize) + kNumDistCodesMin
            numLevelCodes = bits.read_bits(kNumLevelCodesFieldSize) + kNumLevelCodesMin
            if not self.deflate64:
                if self._num_dist_levels > kDistTableSize32:
                    return False
            levelLevels = bytearray(kLevelTableSize)
            for i in range(kLevelTableSize):
                position = kCodeLengthAlphabetOrder[i]
                if i < numLevelCodes:
                    levelLevels[position] = bits.read_bits(kLevelFieldSize)
                else:
                    levelLevels[position] = 0

            if self.bits.extra_bits_were_read():
                return False

            if not self.level_decoder.build(levelLevels):
                return False

            b_tmpLevels = bytearray(kFixedMainTableSize + kFixedDistTableSize)
            tmpLevels = memoryview(b_tmpLevels)
            if not self.decode_levels(tmpLevels, numLitLenLevels + self._num_dist_levels):
                return False
            if self.bits.extra_bits_were_read():
                return False
            levels.sub_clear()
            levels.main_levels[:numLitLenLevels] = tmpLevels[:numLitLenLevels]
            levels.dist_levels[:self._num_dist_levels] = tmpLevels[numLitLenLevels:][:self._num_dist_levels]
        if not self.main_decoder.build(levels.main_levels):
            return False
        return self.dist_decoder.build(levels.dist_levels)

    def decode_block(self, size: int, finish_input_stream: bool):
        bits = self.bits
        dst = self.dst
        main_decoder = self.main_decoder
        dist_decoder = self.dist_decoder
        write_byte = dst.append

        if self._leftover_replay_size == kLenIdFinished:
            return True
        if self._leftover_replay_size == kLenIdNeedInit:
            if not self.keep_history:
                dst.clear()
            self.is_final_block = False
            self._leftover_replay_size = 0
            self._need_to_read_table = True
        if carry := min(self._leftover_replay_size, size):
            size -= carry
            replay(dst, self._leftover_replay_dist + 1, carry)
            self._leftover_replay_size -= carry
        while size > 0 or finish_input_stream:
            if bits.extra_bits_were_read():
                return False
            if self._need_to_read_table:
                if self.is_final_block:
                    self._leftover_replay_size = kLenIdFinished
                    break
                if not self.read_tables():
                    return False
                if bits.extra_bits_were_read():
                    return False
                self._need_to_read_table = False
            if self.stored_mode:
                if finish_input_stream and size == 0 and self.stored_block_size != 0:
                    return False
                # NSIS version contains some bits in bitl bits buffer.
                # So we must read some first bytes via ReadAlignedByte
                while self.stored_block_size > 0 and size > 0 and bits.there_are_data_in_bit_buffer():
                    write_byte(bits.read_aligned_byte())
                    self.stored_block_size -= 1
                    size -= 1
                while self.stored_block_size > 0 and size > 0:
                    write_byte(bits.read_direct_byte())
                    self.stored_block_size -= 1
                    size -= 1
                self._need_to_read_table = self.stored_block_size == 0
                continue
            while size > 0:
                if bits.extra_bits_were_read_fast():
                    return False
                if (sym := main_decoder.decode(bits)) < 0x100:
                    write_byte(sym)
                    size -= 1
                    continue
                elif sym == kSymbolEndOfBlock:
                    self._need_to_read_table = True
                    break
                elif sym >= kMainTableSize:
                    return False
                else:
                    sym -= kSymbolMatch
                    if self.deflate64:
                        length = kLenStart64[sym]
                        n_bits = kLenDirectBits64[sym]
                    else:
                        length = kLenStart32[sym]
                        n_bits = kLenDirectBits32[sym]
                    length += kMatchMinLen + bits.read_bits(n_bits)
                    loc = min(length, size)
                    sym = dist_decoder.decode(bits)
                    if sym >= self._num_dist_levels:
                        return False
                    sym = kDistStart[sym] + bits.read_bits(kDistDirectBits[sym])
                    replay(dst, sym + 1, loc)
                    size -= loc
                    length -= loc
                    if length != 0:
                        self._leftover_replay_size = length
                        self._leftover_replay_dist = sym
                        break
            if finish_input_stream and size == 0:
                if main_decoder.decode(bits) != kSymbolEndOfBlock:
                    return False
                self._need_to_read_table = True
        return not bits.extra_bits_were_read()

    def decode_real(self):
        while True:
            size = 1 << 20
            finish_input_stream = False
            if self._out_size_defined:
                rem = self._out_size - (len(self.dst) - self._out_start_pos)
                if size >= rem:
                    size = rem
                    if self.zlib_mode or self._need_to_finish_input:
                        finish_input_stream = True
            if not finish_input_stream and size == 0:
                break
            if not self.decode_block(size, finish_input_stream):
                return False
            if self._leftover_replay_size == kLenIdFinished:
                break
        if self._leftover_replay_size == kLenIdFinished and self.zlib_mode:
            self.bits.align_to_byte()
            for i in range(4):
                self.zlib_tail[i] = self.bits.read_aligned_byte()
        return True

    def initialize_out_stream_for_resume(self, out_size: int = 0):
        if not self.keep_history:
            self.dst.clear()
        self._out_size = out_size
        self._out_start_pos = len(self.dst)
        self._leftover_replay_size = kLenIdNeedInit

    def decode(self, out_size: int = 0):
        self.initialize_out_stream_for_resume(out_size)
        return self.decode_real()

    def is_finished(self):
        return self._leftover_replay_size == kLenIdFinished

    def read_aligned_u16(self):
        b = self.bits
        v = b.read_aligned_byte()
        return v | (b.read_aligned_byte() << 8)

    def had_input_eof_error(self):
        return self.bits.extra_bits_were_read()

Methods

def decode_levels(self, levels, numSymbols)
Expand source code Browse git
def decode_levels(self, levels: memoryview, numSymbols: int):
    bits = self.bits
    i = 0
    while i < numSymbols:
        sym = self.level_decoder.decode(self.bits)
        if sym < kTableDirectLevels:
            levels[i] = sym
            i += 1
            continue
        if sym >= kLevelTableSize:
            return False
        if sym == kTableLevelRepNumber:
            if i == 0:
                return False
            numBits = 2
            num = 0
            symbol = levels[i - 1]
        else:
            sym -= kTableLevel0Number
            sym <<= 2
            numBits = 3 + sym
            num = sym << 1
            symbol = 0
        num += i + 3 + bits.read_bits(numBits)
        if num > numSymbols:
            return False
        while True:
            levels[i] = symbol
            i += 1
            if i >= num:
                break
    return True
def read_tables(self)
Expand source code Browse git
def read_tables(self):
    bits = self.bits
    self.is_final_block = (bits.read_bits(kFinalBlockFieldSize) == FinalBlockField.FinalBlock)
    if self.bits.extra_bits_were_read():
        return False
    blockType = bits.read_bits(kBlockTypeFieldSize)
    if blockType > BlockType.DynamicHuffman:
        return False
    if self.bits.extra_bits_were_read():
        return False
    if blockType == BlockType.Stored:
        self.stored_mode = True
        self.bits.align_to_byte()
        self.stored_block_size = self.read_aligned_u16()
        if self.nsis_mode:
            return True
        return (self.stored_block_size == ~self.read_aligned_u16() & 0xFFFF)
    else:
        self.stored_mode = False

    levels = DecoderLevels()

    if blockType == BlockType.FixedHuffman:
        levels.set_fixed_levels()
        self._num_dist_levels = kDistTableSize64 if self.deflate64 else kDistTableSize32
    else:
        numLitLenLevels = bits.read_bits(kNumLenCodesFieldSize) + kNumLitLenCodesMin
        self._num_dist_levels = bits.read_bits(kNumDistCodesFieldSize) + kNumDistCodesMin
        numLevelCodes = bits.read_bits(kNumLevelCodesFieldSize) + kNumLevelCodesMin
        if not self.deflate64:
            if self._num_dist_levels > kDistTableSize32:
                return False
        levelLevels = bytearray(kLevelTableSize)
        for i in range(kLevelTableSize):
            position = kCodeLengthAlphabetOrder[i]
            if i < numLevelCodes:
                levelLevels[position] = bits.read_bits(kLevelFieldSize)
            else:
                levelLevels[position] = 0

        if self.bits.extra_bits_were_read():
            return False

        if not self.level_decoder.build(levelLevels):
            return False

        b_tmpLevels = bytearray(kFixedMainTableSize + kFixedDistTableSize)
        tmpLevels = memoryview(b_tmpLevels)
        if not self.decode_levels(tmpLevels, numLitLenLevels + self._num_dist_levels):
            return False
        if self.bits.extra_bits_were_read():
            return False
        levels.sub_clear()
        levels.main_levels[:numLitLenLevels] = tmpLevels[:numLitLenLevels]
        levels.dist_levels[:self._num_dist_levels] = tmpLevels[numLitLenLevels:][:self._num_dist_levels]
    if not self.main_decoder.build(levels.main_levels):
        return False
    return self.dist_decoder.build(levels.dist_levels)
def decode_block(self, size, finish_input_stream)
Expand source code Browse git
def decode_block(self, size: int, finish_input_stream: bool):
    bits = self.bits
    dst = self.dst
    main_decoder = self.main_decoder
    dist_decoder = self.dist_decoder
    write_byte = dst.append

    if self._leftover_replay_size == kLenIdFinished:
        return True
    if self._leftover_replay_size == kLenIdNeedInit:
        if not self.keep_history:
            dst.clear()
        self.is_final_block = False
        self._leftover_replay_size = 0
        self._need_to_read_table = True
    if carry := min(self._leftover_replay_size, size):
        size -= carry
        replay(dst, self._leftover_replay_dist + 1, carry)
        self._leftover_replay_size -= carry
    while size > 0 or finish_input_stream:
        if bits.extra_bits_were_read():
            return False
        if self._need_to_read_table:
            if self.is_final_block:
                self._leftover_replay_size = kLenIdFinished
                break
            if not self.read_tables():
                return False
            if bits.extra_bits_were_read():
                return False
            self._need_to_read_table = False
        if self.stored_mode:
            if finish_input_stream and size == 0 and self.stored_block_size != 0:
                return False
            # NSIS version contains some bits in bitl bits buffer.
            # So we must read some first bytes via ReadAlignedByte
            while self.stored_block_size > 0 and size > 0 and bits.there_are_data_in_bit_buffer():
                write_byte(bits.read_aligned_byte())
                self.stored_block_size -= 1
                size -= 1
            while self.stored_block_size > 0 and size > 0:
                write_byte(bits.read_direct_byte())
                self.stored_block_size -= 1
                size -= 1
            self._need_to_read_table = self.stored_block_size == 0
            continue
        while size > 0:
            if bits.extra_bits_were_read_fast():
                return False
            if (sym := main_decoder.decode(bits)) < 0x100:
                write_byte(sym)
                size -= 1
                continue
            elif sym == kSymbolEndOfBlock:
                self._need_to_read_table = True
                break
            elif sym >= kMainTableSize:
                return False
            else:
                sym -= kSymbolMatch
                if self.deflate64:
                    length = kLenStart64[sym]
                    n_bits = kLenDirectBits64[sym]
                else:
                    length = kLenStart32[sym]
                    n_bits = kLenDirectBits32[sym]
                length += kMatchMinLen + bits.read_bits(n_bits)
                loc = min(length, size)
                sym = dist_decoder.decode(bits)
                if sym >= self._num_dist_levels:
                    return False
                sym = kDistStart[sym] + bits.read_bits(kDistDirectBits[sym])
                replay(dst, sym + 1, loc)
                size -= loc
                length -= loc
                if length != 0:
                    self._leftover_replay_size = length
                    self._leftover_replay_dist = sym
                    break
        if finish_input_stream and size == 0:
            if main_decoder.decode(bits) != kSymbolEndOfBlock:
                return False
            self._need_to_read_table = True
    return not bits.extra_bits_were_read()
def decode_real(self)
Expand source code Browse git
def decode_real(self):
    while True:
        size = 1 << 20
        finish_input_stream = False
        if self._out_size_defined:
            rem = self._out_size - (len(self.dst) - self._out_start_pos)
            if size >= rem:
                size = rem
                if self.zlib_mode or self._need_to_finish_input:
                    finish_input_stream = True
        if not finish_input_stream and size == 0:
            break
        if not self.decode_block(size, finish_input_stream):
            return False
        if self._leftover_replay_size == kLenIdFinished:
            break
    if self._leftover_replay_size == kLenIdFinished and self.zlib_mode:
        self.bits.align_to_byte()
        for i in range(4):
            self.zlib_tail[i] = self.bits.read_aligned_byte()
    return True
def initialize_out_stream_for_resume(self, out_size=0)
Expand source code Browse git
def initialize_out_stream_for_resume(self, out_size: int = 0):
    if not self.keep_history:
        self.dst.clear()
    self._out_size = out_size
    self._out_start_pos = len(self.dst)
    self._leftover_replay_size = kLenIdNeedInit
def decode(self, out_size=0)
Expand source code Browse git
def decode(self, out_size: int = 0):
    self.initialize_out_stream_for_resume(out_size)
    return self.decode_real()
def is_finished(self)
Expand source code Browse git
def is_finished(self):
    return self._leftover_replay_size == kLenIdFinished
def read_aligned_u16(self)
Expand source code Browse git
def read_aligned_u16(self):
    b = self.bits
    v = b.read_aligned_byte()
    return v | (b.read_aligned_byte() << 8)
def had_input_eof_error(self)
Expand source code Browse git
def had_input_eof_error(self):
    return self.bits.extra_bits_were_read()