Module refinery.lib.fast.lzjb
Expand source code Browse git
from __future__ import annotations
_MATCH_LEN = 6
_MATCH_MIN = 3
_MATCH_MAX = (1 << _MATCH_LEN) + (_MATCH_MIN - 1)
_OFFSET_MASK = (1 << (16 - _MATCH_LEN)) - 1
_LEMPEL_SIZE = 0x1000
def lzjb_decompress(data: bytes | bytearray | memoryview) -> bytearray:
src = memoryview(data)
end = len(src)
pos = 0
dst = bytearray()
while pos < end:
copy_byte = src[pos]
pos += 1
if copy_byte == 0:
chunk = src[pos:pos + 8]
dst.extend(chunk)
pos += len(chunk)
continue
for mask in (0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80):
if pos >= end:
break
if not (copy_byte & mask):
dst.append(src[pos])
pos += 1
else:
if not dst:
raise ValueError('copy requested against empty buffer')
if pos + 1 >= end:
break
pair = (src[pos] << 8) | src[pos + 1]
pos += 2
match_len = (pair >> 10) + _MATCH_MIN
match_pos = pair & 0x3FF
if match_pos == 0 or match_pos > len(dst):
raise RuntimeError('invalid match offset')
copy_src = len(dst) - match_pos
while match_len > 0:
match = dst[copy_src:copy_src + match_len]
dst.extend(match)
copy_src += len(match)
match_len -= len(match)
return dst
def lzjb_compress(data: bytes | bytearray | memoryview) -> bytearray:
src = memoryview(data)
length = len(src)
output = bytearray()
lempel = [0] * _LEMPEL_SIZE
copymask = 0x80
position = 0
copy_map = None
while position < length:
copymask <<= 1
if copymask >= 0x100:
copymask = 1
copy_map = len(output)
output.append(0)
if position > length - _MATCH_MAX:
output.append(src[position])
position += 1
continue
hsh = (src[position] << 16) + (src[position + 1] << 8) + src[position + 2]
hsh += hsh >> 9
hsh += hsh >> 5
hsh %= _LEMPEL_SIZE
offset = (position - lempel[hsh]) & _OFFSET_MASK
lempel[hsh] = position
cpy = position - offset
if cpy >= 0 and cpy != position and src[position:position + 3] == src[cpy:cpy + 3]:
if copy_map is None:
raise ValueError
output[copy_map] |= copymask
mlen = min(length - position, _MATCH_MAX)
for mlen in range(_MATCH_MIN, mlen):
if src[position + mlen] != src[cpy + mlen]:
break
output.append(((mlen - _MATCH_MIN) << (8 - _MATCH_LEN)) | (offset >> 8))
output.append(offset & 255)
position += mlen
else:
output.append(src[position])
position += 1
return output
Functions
def lzjb_decompress(data)-
Expand source code Browse git
def lzjb_decompress(data: bytes | bytearray | memoryview) -> bytearray: src = memoryview(data) end = len(src) pos = 0 dst = bytearray() while pos < end: copy_byte = src[pos] pos += 1 if copy_byte == 0: chunk = src[pos:pos + 8] dst.extend(chunk) pos += len(chunk) continue for mask in (0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80): if pos >= end: break if not (copy_byte & mask): dst.append(src[pos]) pos += 1 else: if not dst: raise ValueError('copy requested against empty buffer') if pos + 1 >= end: break pair = (src[pos] << 8) | src[pos + 1] pos += 2 match_len = (pair >> 10) + _MATCH_MIN match_pos = pair & 0x3FF if match_pos == 0 or match_pos > len(dst): raise RuntimeError('invalid match offset') copy_src = len(dst) - match_pos while match_len > 0: match = dst[copy_src:copy_src + match_len] dst.extend(match) copy_src += len(match) match_len -= len(match) return dst def lzjb_compress(data)-
Expand source code Browse git
def lzjb_compress(data: bytes | bytearray | memoryview) -> bytearray: src = memoryview(data) length = len(src) output = bytearray() lempel = [0] * _LEMPEL_SIZE copymask = 0x80 position = 0 copy_map = None while position < length: copymask <<= 1 if copymask >= 0x100: copymask = 1 copy_map = len(output) output.append(0) if position > length - _MATCH_MAX: output.append(src[position]) position += 1 continue hsh = (src[position] << 16) + (src[position + 1] << 8) + src[position + 2] hsh += hsh >> 9 hsh += hsh >> 5 hsh %= _LEMPEL_SIZE offset = (position - lempel[hsh]) & _OFFSET_MASK lempel[hsh] = position cpy = position - offset if cpy >= 0 and cpy != position and src[position:position + 3] == src[cpy:cpy + 3]: if copy_map is None: raise ValueError output[copy_map] |= copymask mlen = min(length - position, _MATCH_MAX) for mlen in range(_MATCH_MIN, mlen): if src[position + mlen] != src[cpy + mlen]: break output.append(((mlen - _MATCH_MIN) << (8 - _MATCH_LEN)) | (offset >> 8)) output.append(offset & 255) position += mlen else: output.append(src[position]) position += 1 return output