Module refinery.lib.seven.quantum

This is a port of the Quantum decompression implementation in 7Zip to Python.

Expand source code Browse git
"""
This is a port of the Quantum decompression implementation in 7Zip to Python.
"""
from __future__ import annotations

_UPDATE_STEP = 8
_FREQ_SUM_MAX = 3800
_REORDER_COUNT_START = 4
_REORDER_COUNT = 50

_NUM_LIT_SELECTOR_BITS = 2
_NUM_LIT_SELECTORS = 1 << _NUM_LIT_SELECTOR_BITS
_NUM_LIT_SYMBOLS = 1 << (8 - _NUM_LIT_SELECTOR_BITS)
_NUM_MATCH_SELECTORS = 3
_NUM_SELECTORS = _NUM_LIT_SELECTORS + _NUM_MATCH_SELECTORS

_NUM_LEN_SYMBOLS = 27
_MATCH_MIN_LEN = 3
_NUM_SIMPLE_LEN_SLOTS = 6

_NUM_LEN_BOUND = _NUM_SIMPLE_LEN_SLOTS + _MATCH_MIN_LEN + _NUM_MATCH_SELECTORS - 1
_M32 = 0xFFFFFFFF


class _BitStream:
    __slots__ = '_data', '_pos', '_end', '_bits', '_nbits'

    def __init__(self, data: memoryview):
        self._data = data
        self._pos = 0
        self._end = len(data)
        self._bits = 0
        self._nbits = 0

    def read(self, count: int) -> int:
        while self._nbits < count:
            byte = self._data[self._pos] if self._pos < self._end else 0
            self._bits = (self._bits << 8) | byte
            self._pos += 1
            self._nbits += 8
        self._nbits -= count
        result = (self._bits >> self._nbits) & ((1 << count) - 1)
        self._bits &= (1 << self._nbits) - 1
        return result

    @property
    def overread(self) -> bool:
        return self._pos > self._end


class _RangeDecoder:
    __slots__ = '_stream', '_low', '_range', '_code'

    def __init__(self, stream: _BitStream):
        self._stream = stream
        self._low: int = 0
        self._range: int = 0x10000
        self._code: int = stream.read(16)

    def get_threshold(self, total: int) -> int:
        return ((self._code + 1) * total - 1) // self._range

    def decode(self, start: int, end: int, total: int):
        hi = (0 - (self._low + end * self._range // total)) & _M32
        offset = start * self._range // total
        lo = (self._low + offset) & _M32
        code = (self._code - offset) & _M32
        num_bits = 0
        lo ^= hi
        while lo & (1 << 15):
            lo = (lo << 1) & _M32
            hi = (hi << 1) & _M32
            num_bits += 1
        lo ^= hi
        an = lo & hi
        while an & (1 << 14):
            an = (an << 1) & _M32
            lo = (lo << 1) & _M32
            hi = (hi << 1) & _M32
            num_bits += 1
        self._low = lo
        self._range = ((~hi - lo) & 0xFFFF) + 1
        if num_bits:
            code = ((code << num_bits) + self._stream.read(num_bits)) & _M32
        self._code = code

    def read_bits(self, count: int) -> int:
        return self._stream.read(count)


class _Model:
    __slots__ = '_count', '_reorder_count', '_vals', '_freqs'

    def __init__(self, num_items: int, start_val: int):
        self._count = num_items
        self._reorder_count = _REORDER_COUNT_START
        self._vals = list(range(start_val, start_val + num_items))
        self._freqs = [num_items - i for i in range(num_items)] + [0]

    def _rescale_reorder(self):
        self._reorder_count = _REORDER_COUNT
        freqs = self._freqs
        vals = self._vals
        n = self._count
        nxt = 0
        for i in range(n - 1, -1, -1):
            cum = freqs[i]
            freqs[i] = (cum - nxt + 1) >> 1
            nxt = cum
        for i in range(n - 1):
            for k in range(i + 1, n):
                if freqs[i] < freqs[k]:
                    freqs[i], freqs[k] = freqs[k], freqs[i]
                    vals[i], vals[k] = vals[k], vals[i]
        cum = 0
        for i in range(n - 1, -1, -1):
            cum += freqs[i]
            freqs[i] = cum

    def _rescale_simple(self):
        freqs = self._freqs
        n = self._count
        nxt = 1
        for i in range(n - 1, -1, -1):
            freq = freqs[i] >> 1
            if freq < nxt:
                freq = nxt
            freqs[i] = freq
            nxt = freq + 1

    def decode(self, rc: _RangeDecoder) -> int:
        freqs = self._freqs
        if freqs[0] > _FREQ_SUM_MAX:
            self._reorder_count -= 1
            if self._reorder_count == 0:
                self._rescale_reorder()
            else:
                self._rescale_simple()
        total = freqs[0]
        freqs[0] = total + _UPDATE_STEP
        threshold = rc.get_threshold(total)
        k = 1
        while freqs[k] > threshold:
            freqs[k] += _UPDATE_STEP
            k += 1
        k -= 1
        result = self._vals[k]
        start = freqs[k + 1]
        end = freqs[k] - _UPDATE_STEP
        rc.decode(start, end, total)
        return result


class QuantumDecoder:
    """
    A stateful Quantum decompressor for use with CAB archives. The decoder maintains a sliding
    window and adaptive models across consecutive blocks within a folder.
    """
    __slots__ = (
        '_num_dict_bits',
        '_window_size',
        '_window',
        '_window_pos',
        '_over_window',
        '_selector',
        '_literals',
        '_pos_slot',
        '_len_slot',
    )

    _selector: _Model
    _len_slot: _Model
    _literals: list[_Model]
    _pos_slot: list[_Model]

    def __init__(self, num_dict_bits: int):
        if num_dict_bits > 21:
            raise ValueError(F'Invalid Quantum window bits: {num_dict_bits}')
        self._num_dict_bits = num_dict_bits
        win_bits = max(num_dict_bits, 15)
        self._window_size = 1 << win_bits
        self._window = bytearray(self._window_size)
        self._window_pos = 0
        self._over_window = False

    def _init_models(self):
        ndb = self._num_dict_bits
        num_pos_items = 1 if ndb == 0 else ndb << 1
        self._selector = _Model(_NUM_SELECTORS, 0)
        self._literals = [_Model(_NUM_LIT_SYMBOLS, i * _NUM_LIT_SYMBOLS) for i in range(_NUM_LIT_SELECTORS)]
        self._pos_slot = []
        for i in range(_NUM_MATCH_SELECTORS):
            num = 24 + i * 6 + ((i + 1) & 2) * 3
            self._pos_slot.append(_Model(min(num_pos_items, num), 0))
        self._len_slot = _Model(_NUM_LEN_SYMBOLS, _MATCH_MIN_LEN + _NUM_MATCH_SELECTORS - 1)

    def decompress(
        self,
        data: bytes | bytearray | memoryview,
        output_size: int,
        keep_history: bool = False,
    ) -> memoryview:
        view = memoryview(data)
        if len(view) < 2:
            raise ValueError('Quantum compressed block too short.')
        if not keep_history:
            self._window_pos = 0
            self._over_window = False
            self._init_models()
        stream = _BitStream(view)
        rc = _RangeDecoder(stream)
        win = self._window
        win_size = self._window_size
        win_pos = self._window_pos
        if win_pos == win_size:
            win_pos = 0
            self._window_pos = 0
            self._over_window = True
        if output_size > win_size - win_pos:
            raise ValueError('Quantum output would exceed window size.')
        out_start = win_pos
        remaining = output_size
        try:
            selector_model = self._selector
            literal_models = self._literals
            pos_model = self._pos_slot
            len_model = self._len_slot
        except AttributeError as AE:
            raise RuntimeError('Quantum decompressor was not initialized.') from AE
        while remaining > 0:
            selector = selector_model.decode(rc)
            if selector < _NUM_LIT_SELECTORS:
                win[win_pos] = literal_models[selector].decode(rc)
                win_pos += 1
                remaining -= 1
            else:
                match_idx = selector - _NUM_LIT_SELECTORS
                length = match_idx + _MATCH_MIN_LEN
                if match_idx == _NUM_MATCH_SELECTORS - 1:
                    length = len_model.decode(rc)
                    bound = _NUM_LEN_BOUND
                    if length >= bound:
                        length -= _NUM_LEN_BOUND - 4
                        num_direct = length >> 2
                        length = ((4 | (length & 3)) << num_direct) - 8 + _NUM_LEN_BOUND
                        if num_direct < 6:
                            length += rc.read_bits(num_direct)
                dist = pos_model[match_idx].decode(rc)
                if dist >= 4:
                    num_direct = (dist >> 1) - 1
                    dist = ((2 | (dist & 1)) << num_direct) + rc.read_bits(num_direct)
                if remaining < length:
                    raise ValueError('Quantum match length exceeds remaining output.')
                remaining -= length
                src = win_pos - dist - 1
                if src < 0:
                    if not self._over_window:
                        raise ValueError('Quantum match offset before start of data.')
                    wrap = -src
                    src += win_size
                    if wrap < length:
                        for _ in range(wrap):
                            win[win_pos] = win[src]
                            win_pos += 1
                            src += 1
                        length -= wrap
                        src = 0
                for _ in range(length):
                    win[win_pos] = win[src]
                    win_pos += 1
                    src += 1
        self._window_pos = win_pos
        view = memoryview(win)
        return view[out_start:win_pos]

Classes

class QuantumDecoder (num_dict_bits)

A stateful Quantum decompressor for use with CAB archives. The decoder maintains a sliding window and adaptive models across consecutive blocks within a folder.

Expand source code Browse git
class QuantumDecoder:
    """
    A stateful Quantum decompressor for use with CAB archives. The decoder maintains a sliding
    window and adaptive models across consecutive blocks within a folder.
    """
    __slots__ = (
        '_num_dict_bits',
        '_window_size',
        '_window',
        '_window_pos',
        '_over_window',
        '_selector',
        '_literals',
        '_pos_slot',
        '_len_slot',
    )

    _selector: _Model
    _len_slot: _Model
    _literals: list[_Model]
    _pos_slot: list[_Model]

    def __init__(self, num_dict_bits: int):
        if num_dict_bits > 21:
            raise ValueError(F'Invalid Quantum window bits: {num_dict_bits}')
        self._num_dict_bits = num_dict_bits
        win_bits = max(num_dict_bits, 15)
        self._window_size = 1 << win_bits
        self._window = bytearray(self._window_size)
        self._window_pos = 0
        self._over_window = False

    def _init_models(self):
        ndb = self._num_dict_bits
        num_pos_items = 1 if ndb == 0 else ndb << 1
        self._selector = _Model(_NUM_SELECTORS, 0)
        self._literals = [_Model(_NUM_LIT_SYMBOLS, i * _NUM_LIT_SYMBOLS) for i in range(_NUM_LIT_SELECTORS)]
        self._pos_slot = []
        for i in range(_NUM_MATCH_SELECTORS):
            num = 24 + i * 6 + ((i + 1) & 2) * 3
            self._pos_slot.append(_Model(min(num_pos_items, num), 0))
        self._len_slot = _Model(_NUM_LEN_SYMBOLS, _MATCH_MIN_LEN + _NUM_MATCH_SELECTORS - 1)

    def decompress(
        self,
        data: bytes | bytearray | memoryview,
        output_size: int,
        keep_history: bool = False,
    ) -> memoryview:
        view = memoryview(data)
        if len(view) < 2:
            raise ValueError('Quantum compressed block too short.')
        if not keep_history:
            self._window_pos = 0
            self._over_window = False
            self._init_models()
        stream = _BitStream(view)
        rc = _RangeDecoder(stream)
        win = self._window
        win_size = self._window_size
        win_pos = self._window_pos
        if win_pos == win_size:
            win_pos = 0
            self._window_pos = 0
            self._over_window = True
        if output_size > win_size - win_pos:
            raise ValueError('Quantum output would exceed window size.')
        out_start = win_pos
        remaining = output_size
        try:
            selector_model = self._selector
            literal_models = self._literals
            pos_model = self._pos_slot
            len_model = self._len_slot
        except AttributeError as AE:
            raise RuntimeError('Quantum decompressor was not initialized.') from AE
        while remaining > 0:
            selector = selector_model.decode(rc)
            if selector < _NUM_LIT_SELECTORS:
                win[win_pos] = literal_models[selector].decode(rc)
                win_pos += 1
                remaining -= 1
            else:
                match_idx = selector - _NUM_LIT_SELECTORS
                length = match_idx + _MATCH_MIN_LEN
                if match_idx == _NUM_MATCH_SELECTORS - 1:
                    length = len_model.decode(rc)
                    bound = _NUM_LEN_BOUND
                    if length >= bound:
                        length -= _NUM_LEN_BOUND - 4
                        num_direct = length >> 2
                        length = ((4 | (length & 3)) << num_direct) - 8 + _NUM_LEN_BOUND
                        if num_direct < 6:
                            length += rc.read_bits(num_direct)
                dist = pos_model[match_idx].decode(rc)
                if dist >= 4:
                    num_direct = (dist >> 1) - 1
                    dist = ((2 | (dist & 1)) << num_direct) + rc.read_bits(num_direct)
                if remaining < length:
                    raise ValueError('Quantum match length exceeds remaining output.')
                remaining -= length
                src = win_pos - dist - 1
                if src < 0:
                    if not self._over_window:
                        raise ValueError('Quantum match offset before start of data.')
                    wrap = -src
                    src += win_size
                    if wrap < length:
                        for _ in range(wrap):
                            win[win_pos] = win[src]
                            win_pos += 1
                            src += 1
                        length -= wrap
                        src = 0
                for _ in range(length):
                    win[win_pos] = win[src]
                    win_pos += 1
                    src += 1
        self._window_pos = win_pos
        view = memoryview(win)
        return view[out_start:win_pos]

Methods

def decompress(self, data, output_size, keep_history=False)
Expand source code Browse git
def decompress(
    self,
    data: bytes | bytearray | memoryview,
    output_size: int,
    keep_history: bool = False,
) -> memoryview:
    view = memoryview(data)
    if len(view) < 2:
        raise ValueError('Quantum compressed block too short.')
    if not keep_history:
        self._window_pos = 0
        self._over_window = False
        self._init_models()
    stream = _BitStream(view)
    rc = _RangeDecoder(stream)
    win = self._window
    win_size = self._window_size
    win_pos = self._window_pos
    if win_pos == win_size:
        win_pos = 0
        self._window_pos = 0
        self._over_window = True
    if output_size > win_size - win_pos:
        raise ValueError('Quantum output would exceed window size.')
    out_start = win_pos
    remaining = output_size
    try:
        selector_model = self._selector
        literal_models = self._literals
        pos_model = self._pos_slot
        len_model = self._len_slot
    except AttributeError as AE:
        raise RuntimeError('Quantum decompressor was not initialized.') from AE
    while remaining > 0:
        selector = selector_model.decode(rc)
        if selector < _NUM_LIT_SELECTORS:
            win[win_pos] = literal_models[selector].decode(rc)
            win_pos += 1
            remaining -= 1
        else:
            match_idx = selector - _NUM_LIT_SELECTORS
            length = match_idx + _MATCH_MIN_LEN
            if match_idx == _NUM_MATCH_SELECTORS - 1:
                length = len_model.decode(rc)
                bound = _NUM_LEN_BOUND
                if length >= bound:
                    length -= _NUM_LEN_BOUND - 4
                    num_direct = length >> 2
                    length = ((4 | (length & 3)) << num_direct) - 8 + _NUM_LEN_BOUND
                    if num_direct < 6:
                        length += rc.read_bits(num_direct)
            dist = pos_model[match_idx].decode(rc)
            if dist >= 4:
                num_direct = (dist >> 1) - 1
                dist = ((2 | (dist & 1)) << num_direct) + rc.read_bits(num_direct)
            if remaining < length:
                raise ValueError('Quantum match length exceeds remaining output.')
            remaining -= length
            src = win_pos - dist - 1
            if src < 0:
                if not self._over_window:
                    raise ValueError('Quantum match offset before start of data.')
                wrap = -src
                src += win_size
                if wrap < length:
                    for _ in range(wrap):
                        win[win_pos] = win[src]
                        win_pos += 1
                        src += 1
                    length -= wrap
                    src = 0
            for _ in range(length):
                win[win_pos] = win[src]
                win_pos += 1
                src += 1
    self._window_pos = win_pos
    view = memoryview(win)
    return view[out_start:win_pos]