Module refinery.lib.fast.lzw
Expand source code Browse git
from __future__ import annotations
from refinery.lib.exceptions import RefineryPartialResult
_INIT_BITS = 9
_BITS = 0x10
_CLEAR = 0x100
_FIRST = 0x101
_WSIZE = 0x8000
def lzw_decompress(
ibuf: bytes | bytearray | memoryview,
maxbits: int,
block_mode: bool,
) -> bytearray:
"""
LZW decompression of raw stream data. The caller is responsible for parsing and stripping the
3-byte header (\\x1F\\x9D + flags) before invoking this function.
"""
if maxbits > _BITS:
raise ValueError(F'Compressed with {maxbits} bits; cannot handle file.')
ibuf = memoryview(ibuf)
maxmaxcode = 1 << maxbits
tab_suffix = bytearray(_WSIZE * 2)
tab_prefix = [0] * (1 << _BITS)
n_bits = _INIT_BITS
maxcode = (1 << n_bits) - 1
bitmask = (1 << n_bits) - 1
oldcode = ~0
finchar = 0
posbits = 0
free_entry = _FIRST if block_mode else 0x100
tab_suffix[:0x100] = range(0x100)
resetbuf = True
out = bytearray()
while resetbuf:
resetbuf = False
ibuf = ibuf[posbits >> 3:]
insize = len(ibuf)
posbits = 0
inbits = (insize << 3) - (n_bits - 1)
while inbits > posbits:
if free_entry > maxcode:
n = n_bits << 3
p = posbits - 1
posbits = p + (n - (p + n) % n)
n_bits += 1
if n_bits == maxbits:
maxcode = maxmaxcode
else:
maxcode = (1 << n_bits) - 1
bitmask = (1 << n_bits) - 1
resetbuf = True
break
p = ibuf[posbits >> 3:]
code = int.from_bytes(p[:3], 'little') >> (posbits & 7) & bitmask
posbits += n_bits
if oldcode == -1:
if code >= 256:
raise ValueError('corrupt input.')
oldcode = code
finchar = oldcode
out.append(finchar)
continue
if code == _CLEAR and block_mode:
tab_prefix[:0x100] = [0] * 0x100
free_entry = _FIRST - 1
n = n_bits << 3
p = posbits - 1
posbits = p + (n - (p + n) % n)
n_bits = _INIT_BITS
maxcode = (1 << n_bits) - 1
bitmask = (1 << n_bits) - 1
resetbuf = True
break
incode = code
stack = bytearray()
if code >= free_entry:
if code > free_entry:
raise RefineryPartialResult('corrupt input.', bytes(out))
stack.append(finchar)
code = oldcode
while code >= 256:
stack.append(tab_suffix[code])
code = tab_prefix[code]
finchar = tab_suffix[code]
stack.append(finchar)
stack.reverse()
out.extend(stack)
code = free_entry
if code < maxmaxcode:
tab_prefix[code] = oldcode & 0xFFFF
tab_suffix[code] = finchar & 0x00FF
free_entry = code + 1
oldcode = incode
return out
Functions
def lzw_decompress(ibuf, maxbits, block_mode)-
LZW decompression of raw stream data. The caller is responsible for parsing and stripping the 3-byte header (\x1F\x9D + flags) before invoking this function.
Expand source code Browse git
def lzw_decompress( ibuf: bytes | bytearray | memoryview, maxbits: int, block_mode: bool, ) -> bytearray: """ LZW decompression of raw stream data. The caller is responsible for parsing and stripping the 3-byte header (\\x1F\\x9D + flags) before invoking this function. """ if maxbits > _BITS: raise ValueError(F'Compressed with {maxbits} bits; cannot handle file.') ibuf = memoryview(ibuf) maxmaxcode = 1 << maxbits tab_suffix = bytearray(_WSIZE * 2) tab_prefix = [0] * (1 << _BITS) n_bits = _INIT_BITS maxcode = (1 << n_bits) - 1 bitmask = (1 << n_bits) - 1 oldcode = ~0 finchar = 0 posbits = 0 free_entry = _FIRST if block_mode else 0x100 tab_suffix[:0x100] = range(0x100) resetbuf = True out = bytearray() while resetbuf: resetbuf = False ibuf = ibuf[posbits >> 3:] insize = len(ibuf) posbits = 0 inbits = (insize << 3) - (n_bits - 1) while inbits > posbits: if free_entry > maxcode: n = n_bits << 3 p = posbits - 1 posbits = p + (n - (p + n) % n) n_bits += 1 if n_bits == maxbits: maxcode = maxmaxcode else: maxcode = (1 << n_bits) - 1 bitmask = (1 << n_bits) - 1 resetbuf = True break p = ibuf[posbits >> 3:] code = int.from_bytes(p[:3], 'little') >> (posbits & 7) & bitmask posbits += n_bits if oldcode == -1: if code >= 256: raise ValueError('corrupt input.') oldcode = code finchar = oldcode out.append(finchar) continue if code == _CLEAR and block_mode: tab_prefix[:0x100] = [0] * 0x100 free_entry = _FIRST - 1 n = n_bits << 3 p = posbits - 1 posbits = p + (n - (p + n) % n) n_bits = _INIT_BITS maxcode = (1 << n_bits) - 1 bitmask = (1 << n_bits) - 1 resetbuf = True break incode = code stack = bytearray() if code >= free_entry: if code > free_entry: raise RefineryPartialResult('corrupt input.', bytes(out)) stack.append(finchar) code = oldcode while code >= 256: stack.append(tab_suffix[code]) code = tab_prefix[code] finchar = tab_suffix[code] stack.append(finchar) stack.reverse() out.extend(stack) code = free_entry if code < maxmaxcode: tab_prefix[code] = oldcode & 0xFFFF tab_suffix[code] = finchar & 0x00FF free_entry = code + 1 oldcode = incode return out