Module refinery.units.encoding.b92
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from refinery.units import Unit, RefineryPartialResult
from refinery.lib.structures import StructReader, MemoryFile
_B92_ALPHABET = (
RB"!#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_abcdefghijklmnopqrstuvwxyz{|}"
)
_B92_DECODING = {
c: k for k, c in enumerate(_B92_ALPHABET)
}
class b92(Unit):
"""
Base92 encoding and decoding.
"""
def reverse(self, data):
if not data:
return B'~'
reader = StructReader(data, bigendian=True)
output = MemoryFile()
while reader.remaining_bits > 0:
try:
block = reader.read_integer(13)
except EOFError:
count = reader.remaining_bits
block = reader.read_integer(count)
self.log_debug(F'reading {count} remaining bits: {block:0{count}b}')
shift = 6 - count
if shift >= 0:
block <<= shift
self.log_debug(F'encoding block: {block:06b}')
output.write_byte(_B92_ALPHABET[block])
break
block <<= 13 - count
self.log_debug(F'encoding block: {block:013b}')
hi, lo = divmod(block, 91)
output.write_byte(_B92_ALPHABET[hi])
output.write_byte(_B92_ALPHABET[lo])
return output.getvalue()
def process(self, data):
if data == B'~':
return B''
output = MemoryFile()
buffer = 0
length = 0
view = memoryview(data)
q, r = divmod(len(view), 2)
if r > 0:
bits = 6
tail = _B92_DECODING[data[~0]]
else:
bits = 13
tail = _B92_DECODING[data[~1]] * 91 + _B92_DECODING[data[~0]]
view = view[:(q - 1) * 2]
it = iter(view)
for a, b in zip(it, it):
block = _B92_DECODING[a] * 91 + _B92_DECODING[b]
assert length < 8
buffer <<= 13
buffer |= block
length += 13
size, length = divmod(length, 8)
assert size > 0
output.write((buffer >> length).to_bytes(size, 'big'))
buffer &= (1 << length) - 1
missing = 8 - length
shift = bits - missing
if shift < 8:
bytecount = 1
else:
bytecount = 2
shift -= 8
missing += 8
if shift < 0:
raise RefineryPartialResult(
F'Invalid padding, missing {-shift} bits.',
output.getvalue())
buffer <<= missing
buffer |= tail >> shift
length += missing
output.write(buffer.to_bytes(bytecount, 'big'))
if tail & ((1 << shift) - 1) != 0:
raise RefineryPartialResult(
F'Invalid padding, lower {shift} bits of {tail:0{bits}b} are not zero.',
output.getvalue())
return output.getvalue()
@classmethod
def handles(self, data: bytearray):
from refinery.lib.patterns import formats
return formats.b92.value.fullmatch(data)
Classes
class b92
-
Base92 encoding and decoding.
Expand source code Browse git
class b92(Unit): """ Base92 encoding and decoding. """ def reverse(self, data): if not data: return B'~' reader = StructReader(data, bigendian=True) output = MemoryFile() while reader.remaining_bits > 0: try: block = reader.read_integer(13) except EOFError: count = reader.remaining_bits block = reader.read_integer(count) self.log_debug(F'reading {count} remaining bits: {block:0{count}b}') shift = 6 - count if shift >= 0: block <<= shift self.log_debug(F'encoding block: {block:06b}') output.write_byte(_B92_ALPHABET[block]) break block <<= 13 - count self.log_debug(F'encoding block: {block:013b}') hi, lo = divmod(block, 91) output.write_byte(_B92_ALPHABET[hi]) output.write_byte(_B92_ALPHABET[lo]) return output.getvalue() def process(self, data): if data == B'~': return B'' output = MemoryFile() buffer = 0 length = 0 view = memoryview(data) q, r = divmod(len(view), 2) if r > 0: bits = 6 tail = _B92_DECODING[data[~0]] else: bits = 13 tail = _B92_DECODING[data[~1]] * 91 + _B92_DECODING[data[~0]] view = view[:(q - 1) * 2] it = iter(view) for a, b in zip(it, it): block = _B92_DECODING[a] * 91 + _B92_DECODING[b] assert length < 8 buffer <<= 13 buffer |= block length += 13 size, length = divmod(length, 8) assert size > 0 output.write((buffer >> length).to_bytes(size, 'big')) buffer &= (1 << length) - 1 missing = 8 - length shift = bits - missing if shift < 8: bytecount = 1 else: bytecount = 2 shift -= 8 missing += 8 if shift < 0: raise RefineryPartialResult( F'Invalid padding, missing {-shift} bits.', output.getvalue()) buffer <<= missing buffer |= tail >> shift length += missing output.write(buffer.to_bytes(bytecount, 'big')) if tail & ((1 << shift) - 1) != 0: raise RefineryPartialResult( F'Invalid padding, lower {shift} bits of {tail:0{bits}b} are not zero.', output.getvalue()) return output.getvalue() @classmethod def handles(self, data: bytearray): from refinery.lib.patterns import formats return formats.b92.value.fullmatch(data)
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Inherited members