Module refinery.lib.fast.blz
Expand source code Browse git
from __future__ import annotations
def blz_decompress_chunk(
data: bytes | bytearray | memoryview,
src_offset: int,
verbatim_offset: int,
size: int,
prefix: bytes | bytearray | memoryview | None = None,
) -> tuple[bytearray, int]:
"""
Decompress a single BriefLZ chunk.
"""
src = memoryview(data)
end = len(src)
pos = src_offset
bitcount = 0
bitstore = 0
prefix_len = len(prefix) if prefix else 0
def readbit():
nonlocal bitcount, bitstore, pos
if not bitcount:
if pos + 1 >= end:
raise EOFError('unexpected end of input during bit read')
bitstore = src[pos] | (src[pos + 1] << 8)
pos += 2
bitcount = 0xF
else:
bitcount -= 1
return (bitstore >> bitcount) & 1
def readint():
result = 2 + readbit()
while readbit():
result = (result << 1) | readbit()
return result
out = bytearray()
out.append(src[verbatim_offset])
decompressed = 1
try:
while decompressed < size:
if readbit():
length = readint() + 2
sector = readint() - 2
if pos >= end:
raise EOFError('unexpected end of input reading offset byte')
offset = src[pos] + 1
pos += 1
delta = offset + 0x100 * sector
available = prefix_len + len(out)
if delta > available:
raise ValueError(
F'Requested rewind by 0x{delta:08X} bytes '
F'with only 0x{available:08X} bytes in output buffer.'
)
global_pos = available - delta
remaining = length
while remaining > 0:
if global_pos < prefix_len:
chunk_len = min(prefix_len - global_pos, remaining)
out.extend(prefix[global_pos:global_pos + chunk_len])
global_pos += chunk_len
remaining -= chunk_len
else:
ref_start = global_pos - prefix_len
chunk_len = min(len(out) - ref_start, remaining)
if chunk_len <= 0:
raise ValueError('zero-length copy in replay')
out.extend(out[ref_start:ref_start + chunk_len])
global_pos += chunk_len
remaining -= chunk_len
decompressed += length
else:
if pos >= end:
raise EOFError('unexpected end of input reading literal')
out.append(src[pos])
pos += 1
decompressed += 1
except EOFError:
raise
return (out, pos)
Functions
def blz_decompress_chunk(data, src_offset, verbatim_offset, size, prefix=None)-
Decompress a single BriefLZ chunk.
Expand source code Browse git
def blz_decompress_chunk( data: bytes | bytearray | memoryview, src_offset: int, verbatim_offset: int, size: int, prefix: bytes | bytearray | memoryview | None = None, ) -> tuple[bytearray, int]: """ Decompress a single BriefLZ chunk. """ src = memoryview(data) end = len(src) pos = src_offset bitcount = 0 bitstore = 0 prefix_len = len(prefix) if prefix else 0 def readbit(): nonlocal bitcount, bitstore, pos if not bitcount: if pos + 1 >= end: raise EOFError('unexpected end of input during bit read') bitstore = src[pos] | (src[pos + 1] << 8) pos += 2 bitcount = 0xF else: bitcount -= 1 return (bitstore >> bitcount) & 1 def readint(): result = 2 + readbit() while readbit(): result = (result << 1) | readbit() return result out = bytearray() out.append(src[verbatim_offset]) decompressed = 1 try: while decompressed < size: if readbit(): length = readint() + 2 sector = readint() - 2 if pos >= end: raise EOFError('unexpected end of input reading offset byte') offset = src[pos] + 1 pos += 1 delta = offset + 0x100 * sector available = prefix_len + len(out) if delta > available: raise ValueError( F'Requested rewind by 0x{delta:08X} bytes ' F'with only 0x{available:08X} bytes in output buffer.' ) global_pos = available - delta remaining = length while remaining > 0: if global_pos < prefix_len: chunk_len = min(prefix_len - global_pos, remaining) out.extend(prefix[global_pos:global_pos + chunk_len]) global_pos += chunk_len remaining -= chunk_len else: ref_start = global_pos - prefix_len chunk_len = min(len(out) - ref_start, remaining) if chunk_len <= 0: raise ValueError('zero-length copy in replay') out.extend(out[ref_start:ref_start + chunk_len]) global_pos += chunk_len remaining -= chunk_len decompressed += length else: if pos >= end: raise EOFError('unexpected end of input reading literal') out.append(src[pos]) pos += 1 decompressed += 1 except EOFError: raise return (out, pos)