Module refinery.units.compression.flz
Expand source code Browse git
from __future__ import annotations
from refinery.units import Unit, Arg
from refinery.lib.structures import MemoryFile
from refinery.lib.array import make_array
_MAX_CPY = 0x20
_MAX_LEN = 0x100 + 8
_MAX_LEN_M2 = _MAX_LEN - 2
_MAX_L1_DISTANCE = 0x2000
_MAX_L2_DISTANCE = 0x1FFF
_MAX_FARDISTANCE = 0xFFFF + _MAX_L2_DISTANCE - 1
_HASH_BITS = 13
_HASH_SIZE = (1 << _HASH_BITS)
_HASH_MASK = (_HASH_SIZE - 1)
def _flz_hash(v: int):
h = (v * 0x9E3779B9) >> (32 - _HASH_BITS)
return h & _HASH_MASK
def _flz_cmp(p: memoryview, q: memoryview):
upper = len(q)
lower = 1
if upper not in range(1, len(p) + 1):
raise ValueError
while lower <= upper and p[:lower] == q[:lower]:
lower <<= 1
upper = min(lower, upper)
lower >>= 1
while lower < upper:
midpoint = (lower + upper + 1) // 2
if p[:midpoint] == q[:midpoint]:
lower = midpoint
else:
upper = midpoint - 1
return lower + 1
def _flz_literals(runs: int, src: memoryview, dst: bytearray):
while runs >= _MAX_CPY:
dst.append(_MAX_CPY - 1)
dst.extend(src[:_MAX_CPY])
src = src[_MAX_CPY:]
runs -= _MAX_CPY
if runs > 0:
dst.append((runs - 1) & 0xFF)
dst.extend(src[:runs])
def _flz1_match(nc: int, distance: int, op: bytearray):
distance -= 1
write = op.append
while nc > _MAX_LEN_M2:
write((7 << 5) + (distance >> 8) & 0xFF)
write(_MAX_LEN_M2 - 7 - 2)
write(distance & 0xFF)
nc -= _MAX_LEN_M2
if nc < 7:
write((nc << 5) + (distance >> 8) & 0xFF)
write(distance & 0xFF)
else:
write((7 << 5) + (distance >> 8) & 0xFF)
write(nc - 7 & 0xFF)
write(distance & 0xFF)
def _flz2_match(nc: int, distance: int, op: bytearray):
distance -= 1
write = op.append
if distance < _MAX_L2_DISTANCE:
if nc < 7:
write((nc << 5) + (distance >> 8))
write(distance & 0xFF)
else:
write((7 << 5) + (distance >> 8))
nc -= 7
while nc > 0xFF:
nc -= 0xFF
write(0xFF)
write(nc)
write(distance & 0xFF)
else:
if nc < 7:
distance -= _MAX_L2_DISTANCE
write((nc << 5) + 31)
write(255)
write(distance >> 8)
write(distance & 0xFF)
else:
distance -= _MAX_L2_DISTANCE
write((7 << 5) + 31)
nc -= 7
while nc > 0xFF:
nc -= 0xFF
write(0xFF)
write(nc)
write(255)
write(distance >> 8)
write(distance & 0xFF)
class InputOutOfBounds(EOFError):
pass
class InvalidFlzLevel(ValueError):
def __init__(self, level: int) -> None:
super().__init__(F'Invalid level {level!r}, may only be 0 or 1.')
def _flz_compress(
input: memoryview,
op: bytearray,
level: int,
):
if level not in (0, 1):
raise InvalidFlzLevel(level)
elif level:
matches = _flz2_match
lim = _MAX_FARDISTANCE
else:
matches = _flz1_match
lim = _MAX_L1_DISTANCE
total = len(input)
ip = 0
ip_bound = total - 4
ip_limit = total - 12 - 1
tab = make_array(4, _HASH_SIZE)
sq = 0
rp = 0
anchor = ip
ip += 2
while ip < ip_limit:
while True:
sb = input[ip:][:3]
sq = int.from_bytes(sb, 'little')
hv = _flz_hash(sq)
rp, tab[hv] = tab[hv], ip
distance = ip - rp
if ip < ip_limit:
ip += 1
else:
break
if distance < lim and sb == input[rp:rp + 3]:
break
if ip >= ip_limit:
break
if level > 0 and distance >= _MAX_L2_DISTANCE and input[rp:][3:5] != input[ip:][3:5]:
continue
else:
ip -= 1
if ip > anchor:
_flz_literals(ip - anchor, input[anchor:], op)
nc = _flz_cmp(input[rp + 3:], input[ip + 3:ip_bound])
matches(nc, distance, op)
ip += nc
sq = int.from_bytes(input[ip:ip + 4], 'little')
tab[_flz_hash((sq >> 0) & 0xFFFFFF)], ip = ip, ip + 1
tab[_flz_hash((sq >> 8) & 0xFFFFFF)], ip = ip, ip + 1
anchor = ip
_flz_literals(total - anchor, input[anchor:], op)
op[0] |= level << 5
def _flz_decompress(input: memoryview, level: int):
if level not in (0, 1):
raise InvalidFlzLevel(level)
ip = 0
ip_limit = ip + len(input)
ip_bound = ip_limit - 2
ctrl = input[0] & 0x1F
bound_checked = input[:len(input) - 2]
ip += 1
op = MemoryFile()
while True:
if ctrl >= 0x20:
length = (ctrl >> 5)
offset = (ctrl & 31) << 8
ref = offset + 1
if length == 7:
while True:
ip, inc = ip + 1, bound_checked[ip]
length += inc
if level == 0 or inc != 0xFF:
break
ip, inc = ip + 1, input[ip]
ref += inc
length += 2
if level > 0 and inc == 0xFF and offset == 0x1F00:
ip, offset = ip + 2, (bound_checked[ip] << 8) + input[ip + 1]
ref = offset + _MAX_L2_DISTANCE + 1
op.replay(ref, length)
else:
if len(t := input[ip:(ip := ip + (ctrl := ctrl + 1))]) != ctrl:
raise InputOutOfBounds
op.write(t)
if ip > ip_bound:
break
ctrl = input[ip]
ip += 1
return op.getvalue()
class flz(Unit):
"""
FastLZ (or FLZ for short) compression and decompression. This implementation was ported to
pure Python from the C reference and is therefore much slower.
"""
def __init__(
self,
level: Arg.Number('-l', bound=(0, 1), help=(
'Specify a FastLZ level (either 0 or 1). By default, compression will select a level '
'based on buffer length like the reference implementation. Decompression reads level '
'information from the header by default.')) = None
):
super().__init__(level=level)
def reverse(self, data):
if not data:
return data
if (level := self.args.level) is None:
level = int(len(data) >= 0x10000)
output = bytearray()
_flz_compress(memoryview(data), output, level)
return output
def process(self, data):
try:
hl = data[0] >> 5
except IndexError:
return None
if (level := self.args.level) is None:
level = hl
if level != hl:
self.log_info(F'Using level {level} despite header-defined level {hl}.')
return _flz_decompress(memoryview(data), level)
@classmethod
def handles(cls, data: bytearray):
if data and (data[0] >> 5) > 1:
return False
Classes
class InputOutOfBounds (*args, **kwargs)
-
Read beyond end of file.
Expand source code Browse git
class InputOutOfBounds(EOFError): pass
Ancestors
- builtins.EOFError
- builtins.Exception
- builtins.BaseException
class InvalidFlzLevel (level)
-
Inappropriate argument value (of correct type).
Expand source code Browse git
class InvalidFlzLevel(ValueError): def __init__(self, level: int) -> None: super().__init__(F'Invalid level {level!r}, may only be 0 or 1.')
Ancestors
- builtins.ValueError
- builtins.Exception
- builtins.BaseException
class flz (level=None)
-
FastLZ (or FLZ for short) compression and decompression. This implementation was ported to pure Python from the C reference and is therefore much slower.
Expand source code Browse git
class flz(Unit): """ FastLZ (or FLZ for short) compression and decompression. This implementation was ported to pure Python from the C reference and is therefore much slower. """ def __init__( self, level: Arg.Number('-l', bound=(0, 1), help=( 'Specify a FastLZ level (either 0 or 1). By default, compression will select a level ' 'based on buffer length like the reference implementation. Decompression reads level ' 'information from the header by default.')) = None ): super().__init__(level=level) def reverse(self, data): if not data: return data if (level := self.args.level) is None: level = int(len(data) >= 0x10000) output = bytearray() _flz_compress(memoryview(data), output, level) return output def process(self, data): try: hl = data[0] >> 5 except IndexError: return None if (level := self.args.level) is None: level = hl if level != hl: self.log_info(F'Using level {level} despite header-defined level {hl}.') return _flz_decompress(memoryview(data), level) @classmethod def handles(cls, data: bytearray): if data and (data[0] >> 5) > 1: return False
Ancestors
Subclasses
Class variables
var required_dependencies
var optional_dependencies
var console
Methods
def reverse(self, data)
-
Expand source code Browse git
def reverse(self, data): if not data: return data if (level := self.args.level) is None: level = int(len(data) >= 0x10000) output = bytearray() _flz_compress(memoryview(data), output, level) return output
Inherited members