Module refinery.lib.structures
Interfaces and classes to read structured data.
Expand source code Browse git
"""
Interfaces and classes to read structured data.
"""
from __future__ import annotations
import contextlib
import codecs
import itertools
import enum
import functools
import io
import re
import struct
import weakref
from uuid import UUID
from typing import (
overload,
cast,
Any,
Dict,
Generic,
Sized,
Iterable,
List,
Optional,
Tuple,
Type,
TypeVar,
Union,
TYPE_CHECKING,
)
if TYPE_CHECKING:
from collections.abc import Buffer
from refinery.lib.types import ByteStr
T = TypeVar('T', bound=Union[bytearray, bytes, memoryview])
B = TypeVar('B', bound=Union[bytearray, bytes, memoryview], default=T)
C = TypeVar('C', bound=Union[bytearray, bytes, memoryview])
R = TypeVar('R', bound=io.IOBase)
else:
T = TypeVar('T')
B = TypeVar('B')
C = TypeVar('C')
R = TypeVar('R')
UnpackType = Union[int, bool, float, bytes]
def signed(k: int, bitsize: int):
"""
If `k` is an integer of the given bit size, cast it to a signed one.
"""
M = 1 << bitsize
k = k & (M - 1)
return k - M if k >> (bitsize - 1) else k
class EOF(EOFError):
"""
While reading from a `refinery.lib.structures.MemoryFile`, less bytes were available than
requested. The exception contains the data from the incomplete read.
"""
def __init__(self, rest: ByteStr = B''):
super().__init__('Unexpected end of buffer.')
self.rest = rest
def __bytes__(self):
return bytes(self.rest)
class StreamDetour(Generic[R]):
"""
A stream detour is used as a context manager to temporarily read from a different location
in the stream and then return to the original offset when the context ends.
"""
def __init__(self, stream: R, offset: Optional[int] = None, whence: int = io.SEEK_SET):
self.stream = stream
self.offset = offset
self.whence = whence
def __enter__(self):
self.cursor = self.stream.tell()
if self.offset is not None:
self.stream.seek(self.offset, self.whence)
return self
def __exit__(self, *args):
self.stream.seek(self.cursor, io.SEEK_SET)
class MemoryFileMethods(Generic[T, B]):
"""
A thin wrapper around (potentially mutable) byte sequences which gives it the features of a
file-like object.
"""
_data: T
_output: type[B]
_cursor: int
_closed: bool
class SEEK(int, enum.Enum):
CUR = io.SEEK_CUR
END = io.SEEK_END
SET = io.SEEK_SET
def __bytes__(self):
return bytes(self._data)
def __init__(
self,
data: Optional[T] = None,
output: Optional[type[B]] = None,
fileno: Optional[int] = None,
size_limit: Optional[int] = None,
) -> None:
if data is None:
if TYPE_CHECKING:
data = cast(T, bytearray())
else:
data = bytearray()
if output is None:
if TYPE_CHECKING:
output = cast(type[B], type(data))
else:
output = type(data)
if size_limit is not None and len(data) > size_limit:
raise ValueError('Initial data exceeds size limit')
self._output = output
self._cursor = 0
self._closed = False
self._fileno = fileno
self._quicksave = 0
self._size_limit = size_limit
self._data = data
def close(self) -> None:
self._closed = True
@property
def closed(self) -> bool:
return self._closed
def __enter__(self):
return self
def __exit__(self, ex_type, ex_value, trace) -> bool:
return False
def flush(self) -> None:
pass
def isatty(self) -> bool:
return False
def __iter__(self):
return self
def __len__(self):
return len(self._data)
def __next__(self):
line = self.readline()
if not line:
raise StopIteration
return line
def fileno(self) -> int:
if self._fileno is None:
raise OSError
return self._fileno
def readable(self) -> bool:
return not self._closed
def seekable(self) -> bool:
return not self._closed
@property
def eof(self) -> bool:
return self._closed or self._cursor >= len(self._data)
@property
def remaining_bytes(self) -> int:
return len(self._data) - self.tell()
def detour(self, offset: Optional[int] = None, whence: int = io.SEEK_SET):
return StreamDetour(cast(io.IOBase, self), offset, whence=whence)
def detour_absolute(self, offset: Optional[int] = None):
return self.detour(offset, io.SEEK_SET)
def detour_relative(self, offset: Optional[int] = None):
return self.detour(offset, io.SEEK_CUR)
def detour_from_end(self, offset: Optional[int] = None):
return self.detour(offset, io.SEEK_END)
def writable(self) -> bool:
if self._closed:
return False
if isinstance(self._data, memoryview):
return not self._data.readonly
return isinstance(self._data, bytearray)
def read_as(self, cast: Type[C], size: int = -1, peek: bool = False) -> C:
out = self.read(size, peek)
if not isinstance(out, cast):
out = cast(out)
return out
def read(self, size: Optional[int] = None, peek: bool = False) -> B:
beginning = self._cursor
if size is None or size < 0:
end = len(self._data)
else:
end = min(self._cursor + size, len(self._data))
result = self._data[beginning:end]
if not isinstance(result, t := self._output):
result = t(result)
if not peek:
self._cursor = end
return result
def readif(self, value: bytes) -> bool:
size = len(value)
stop = self._cursor + size
mv = memoryview(self._data)
if match := mv[self._cursor:stop] == value:
self._cursor = stop
return match
def peek(self, size: Optional[int] = None) -> memoryview:
cursor = self._cursor
mv = memoryview(self._data)
if size is None or size < 0:
return mv[cursor:]
return mv[cursor:cursor + size]
def read1(self, size: Optional[int] = None, peek: bool = False) -> B:
return self.read(size, peek)
def _find_linebreak(self, beginning: int, end: int) -> int:
if not isinstance(self._data, memoryview):
return self._data.find(B'\n', beginning, end)
for k in range(beginning, end):
if self._data[k] == 0xA: return k
return -1
def readline(self, size: Optional[int] = None) -> B:
beginning, end = self._cursor, len(self._data)
if size is not None and size >= 0:
end = beginning + size
p = self._find_linebreak(beginning, end)
self._cursor = end if p < 0 else p + 1
result = self._data[beginning:self._cursor]
if not isinstance(result, t := self._output):
result = t(result)
return result
def readlines_iter(self, hint: Optional[int] = None) -> Iterable[B]:
if hint is None or hint < 0:
yield from self
else:
total = 0
while total < hint:
line = next(self)
total += len(line)
yield line
def readlines(self, hint: Optional[int] = None) -> list[bytes]:
it = self.readlines_iter(hint)
if issubclass(self._output, bytes):
return list(it)
return [bytes(t) for t in it]
def readinto1(self, b) -> int:
data = self.read(len(b))
size = len(data)
b[:size] = data
return size
def readinto(self, b) -> int:
return self.readinto1(b)
def tell(self) -> int:
return self._cursor
def skip(self, n: int):
self._cursor += n
def seekrel(self, offset: int) -> int:
return self.seek(offset, io.SEEK_CUR)
def seekset(self, offset: int) -> int:
if offset < 0:
return self.seek(offset, io.SEEK_END)
else:
return self.seek(offset, io.SEEK_SET)
def getbuffer(self) -> memoryview:
return memoryview(self._data)
def getvalue(self) -> T:
return self._data
def seek(self, offset: int, whence=io.SEEK_SET) -> int:
if whence == io.SEEK_SET:
if offset < 0:
raise ValueError('no negative offsets allowed for SEEK_SET.')
self._cursor = offset
elif whence == io.SEEK_CUR:
self._cursor += offset
elif whence == io.SEEK_END:
self._cursor = len(self._data) + offset
self._cursor = max(self._cursor, 0)
self._cursor = min(self._cursor, len(self._data))
return self._cursor
def writelines(self, lines: Union[Iterable[Iterable[int]], Iterable[Buffer]]) -> None:
for line in lines:
self.write(line)
def truncate(self, size: Optional[int] = None) -> int:
if not isinstance(self._data, bytearray):
raise TypeError
if size is not None:
if not (0 <= size <= len(self._data)):
raise ValueError('invalid size value')
self._cursor = size
del self._data[self._cursor:]
return self.tell()
def write_byte(self, byte: int) -> None:
if isinstance(self._data, bytes):
raise TypeError
if isinstance(self._data, memoryview):
raise NotImplementedError
limit = self._size_limit
cc = self._cursor
nc = cc + 1
if limit and nc > limit:
raise EOF(bytes((byte,)))
try:
if cc < len(self._data):
self._data[cc] = byte
else:
self._data.append(byte)
except Exception as T:
raise OSError(str(T)) from T
else:
self._cursor = nc
def write(self, _data: Union[Buffer, Iterable[int]]) -> int:
out = self._data
end = len(out)
if isinstance(out, memoryview):
if out.readonly:
raise PermissionError
out = out.obj
if not isinstance(out, bytearray):
raise PermissionError
try:
getbuf = cast('Buffer', _data).__buffer__
except AttributeError:
data = cast('Iterable[int]', _data)
else:
data = getbuf(0)
beginning = self._cursor
limit = self._size_limit
if limit is None and beginning == end:
out[end:] = data
self._cursor = end = len(out)
return end - beginning
try:
size = len(cast(Sized, data))
except Exception:
it = iter(data)
cursor = 0
for cursor, b in enumerate(it, beginning):
out[cursor] = b
if cursor >= end - 1:
break
else:
cursor += 1
self._cursor = cursor
return cursor - beginning
if limit is None:
out[end:] = bytes(it)
else:
out[end:limit] = bytes(itertools.islice(it, 0, limit - end))
try:
b = next(it)
except StopIteration:
self._cursor = limit
return limit - beginning
else:
rest = bytearray((b,))
rest[1:] = it
raise EOF(rest)
else:
if limit and size + beginning > limit:
raise EOF(bytes(data))
self._cursor += size
try:
out[beginning:self._cursor] = data
except Exception as T:
self._cursor = beginning
raise OSError(str(T)) from T
return size
self._cursor = end = len(out)
return end - beginning
def __getitem__(self, slice):
result = self._data[slice]
if not isinstance(result, t := self._output):
result = t(result)
return result
def replay(self, offset: int, length: int):
cursor = self._cursor
if offset not in range(cursor + 1):
raise ValueError(F'The supplied delta {offset} is not in the valid range [0,{self._cursor}].')
rep, r = divmod(length, offset)
offset = cursor - offset
replay = self._data[offset:offset + r]
if rep > 0:
# While this is technically a copy, it is faster than repeated calls to write.
replay = bytes(self._data[offset:cursor]) * rep + replay
self.write(replay)
class MemoryFile(MemoryFileMethods[T, B], io.BytesIO):
pass
class order(str, enum.Enum):
big = '>'
little = '<'
class StructReader(MemoryFile[T, T]):
"""
An extension of a `refinery.lib.structures.MemoryFile` which provides methods to read
structured data.
"""
class Unaligned(RuntimeError):
pass
def __init__(self, data: T, bigendian: bool = False):
super().__init__(data)
self._bbits = 0
self._nbits = 0
self.bigendian = bigendian
def __enter__(self) -> StructReader:
return super().__enter__()
@property
@contextlib.contextmanager
def be(self):
self.bigendian = True
try:
yield self
finally:
self.bigendian = False
@property
def byteorder_format(self) -> str:
return '>' if self.bigendian else '<'
@property
def byteorder_name(self):
return 'big' if self.bigendian else 'little'
def seek(self, offset, whence=io.SEEK_SET) -> int:
self._bbits = 0
self._nbits = 0
return super().seek(offset, whence)
def read_exactly(self, size: Optional[int] = None, peek: bool = False) -> B:
"""
Read bytes from the underlying stream. Raises a `RuntimeError` when the stream is not currently
byte-aligned, i.e. when `refinery.lib.structures.StructReader.byte_aligned` is `False`. Raises
an exception of type `refinery.lib.structures.EOF` when fewer data is available in the stream than
requested via the `size` parameter. The remaining data can be extracted from the exception.
Use `refinery.lib.structures.StructReader.read_bytes` to read bytes from the stream when it is
not byte-aligned.
"""
if not self.byte_aligned:
raise StructReader.Unaligned('buffer is not byte-aligned')
data = self.read1(size, peek)
if size and len(data) < size:
raise EOF(data)
return data
@property
def byte_aligned(self) -> bool:
"""
This property is `True` if and only if there are currently no bits still waiting in the internal
bit buffer.
"""
return not self._nbits
def byte_align(self, blocksize: int = 1) -> Tuple[int, int]:
"""
This method clears the internal bit buffer and moves the cursor to the next byte. It returns a
tuple containing the size and contents of the bit buffer.
"""
nbits = self._nbits
bbits = self._bbits
self._nbits = 0
self._bbits = 0
mod = self._cursor % blocksize
if mod:
self.seekrel(blocksize - mod)
return nbits, bbits
@property
def remaining_bits(self) -> int:
return 8 * self.remaining_bytes + self._nbits
def read_integer(self, length: Optional[int] = None, peek: bool = False, bigendian: Optional[bool] = None) -> int:
"""
Read `length` many bits from the underlying stream as an integer.
"""
if length is None:
length = self.remaining_bits
if bigendian is None:
bigendian = self.bigendian
if length < self._nbits:
new_count = self._nbits - length
if bigendian:
result = self._bbits >> new_count
if not peek:
self._bbits ^= result << new_count
else:
result = self._bbits & 2 ** length - 1
if not peek:
self._bbits >>= length
if not peek:
self._nbits = new_count
return result
nbits, bbits = self._nbits, self._bbits
number_of_missing_bits = length - nbits
bytecount, rest = divmod(number_of_missing_bits, 8)
if rest:
bytecount += 1
rest = 8 - rest
bb = self.read1(bytecount, True)
if len(bb) != bytecount:
raise EOFError
if not peek:
self.seekrel(bytecount)
if bytecount == 1:
result, = bb
else:
result = int.from_bytes(bb, self.byteorder_name)
if not nbits and not rest:
return result
if bigendian:
rbmask = 2 ** rest - 1 # noqa
excess = result & rbmask # noqa
result >>= rest # noqa
result ^= bbits << number_of_missing_bits # noqa
else:
excess = result >> number_of_missing_bits # noqa
result ^= excess << number_of_missing_bits # noqa
result <<= nbits # noqa
result |= bbits # noqa
assert excess.bit_length() <= rest
if not peek:
self._nbits = rest
self._bbits = excess
return result
def read_bytes(self, size: int, peek: bool = False) -> bytes:
"""
The method reads `size` many bytes from the underlying stream starting at the current bit.
"""
if self.byte_aligned:
data = self.read_exactly(size, peek)
if not isinstance(data, bytes):
data = bytes(data)
return data
else:
return self.read_integer(size * 8, peek).to_bytes(size, self.byteorder_name)
def read_bit(self) -> int:
"""
This function is a shortcut for calling `refinery.lib.structures.StructReader.read_integer` with
an argument of `1`, i.e. this reads the next bit from the stream. The bits of any byte in the stream
are read from least significant to most significant.
"""
return self.read_integer(1)
def read_bits(self, nbits: int, bigendian: Optional[bool] = None) -> Iterable[int]:
"""
This method returns the bits of `refinery.lib.structures.StructReader.read_integer` one by one.
"""
if bigendian is None:
bigendian = self.bigendian
chunk = self.read_integer(nbits, bigendian=bigendian)
it = range(nbits - 1, -1, -1) if bigendian else range(nbits)
for k in it:
yield chunk >> k & 1
def read_flags(self, nbits: int, reverse=False) -> Iterable[bool]:
"""
Identical to `refinery.lib.structures.StructReader.read_bits` with every bit value cast to a boolean.
"""
bits = list(self.read_bits(nbits))
if reverse:
bits.reverse()
for bit in bits:
yield bool(bit)
def read_one_struct(self, spec: str, peek=False) -> UnpackType:
item, = self.read_struct(spec, peek=peek)
return item
def read_struct(self, spec: str, peek=False) -> List[UnpackType]:
"""
Read structured data from the stream in any format supported by the `struct` module. The `format`
argument can be used to override the current byte ordering. If the `unwrap` parameter is `True`, a
single unpacked value will be returned as a scalar, not as a tuple with one element.
"""
if not spec:
raise ValueError('no format specified')
byteorder = spec[:1]
if byteorder in '<!=@>':
spec = spec[1:]
else:
byteorder = self.byteorder_format
data = []
current_cursor = self.tell()
# reserved struct characters: xcbB?hHiIlLqQnNefdspP
for k, part in enumerate(re.split('(\\d*[auwgk])', spec)):
if k % 2 == 1:
count = 1 if len(part) == 1 else int(part[:~0])
part = part[~0]
for _ in range(count):
if part == 'a':
data.append(self.read_c_string())
elif part == 'g':
data.append(self.read_guid())
elif part == 'u':
data.append(self.read_w_string())
elif part == 'w':
data.append(codecs.decode(self.read_w_string(), 'utf-16le'))
elif part == 'k':
data.append(self.read_7bit_encoded_int())
continue
else:
part = F'{byteorder}{part}'
data.extend(struct.unpack(part, self.read_bytes(struct.calcsize(part))))
if peek:
self.seekset(current_cursor)
return data
def read_nibble(self, peek: bool = False) -> int:
"""
Calls `refinery.lib.structures.StructReader.read_integer` with an argument of `4`.
"""
return self.read_integer(4, peek)
def u8(self, peek: bool = False) -> int: return self.read_integer(8, peek)
def i8(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8)
def u16(self, peek: bool = False) -> int: return self.read_integer(16, peek)
def u32(self, peek: bool = False) -> int: return self.read_integer(32, peek)
def u64(self, peek: bool = False) -> int: return self.read_integer(64, peek)
def i16(self, peek: bool = False) -> int: return signed(self.read_integer(16, peek), 16)
def i32(self, peek: bool = False) -> int: return signed(self.read_integer(32, peek), 32)
def i64(self, peek: bool = False) -> int: return signed(self.read_integer(64, peek), 64)
def f32(self, peek: bool = False) -> float: return cast(float, self.read_one_struct('f', peek=peek))
def f64(self, peek: bool = False) -> float: return cast(float, self.read_one_struct('d', peek=peek))
def u8fast(self):
try:
b = self._data[self._cursor]
except IndexError:
raise EOFError
else:
self._cursor += 1
return b
def read_byte(self, peek: bool = False) -> int: return self.read_integer(8, peek)
def read_char(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8)
def read_terminated_array(self, terminator: bytes, alignment: int = 1) -> bytearray:
buf = self.getvalue()
pos = self.tell()
if isinstance(buf, memoryview):
def find(whence: int):
n = len(terminator)
for k in range(whence, len(buf)):
if buf[k:k + n] == terminator:
return k
return -1
else:
def find(whence: int):
return buf.find(terminator, whence)
try:
end = pos - 1
while True:
end = find(end + 1)
if end < 0 or not (end - pos) % alignment:
break
except AttributeError:
result = bytearray()
while not self.eof:
result.extend(self.read_bytes(alignment))
if result.endswith(terminator):
return result[:-len(terminator)]
self.seek(pos)
raise EOF
else:
data = self.read_exactly(end - pos)
self.seekrel(len(terminator))
return bytearray(data)
def read_guid(self) -> UUID:
return UUID(bytes_le=self.read_bytes(16))
def read_uuid(self) -> UUID:
return UUID(bytes=self.read_bytes(16))
@overload
def read_c_string(self) -> bytearray:
...
@overload
def read_c_string(self, encoding: str) -> str:
...
def read_c_string(self, encoding=None) -> Union[str, bytearray]:
data = self.read_terminated_array(B'\0')
if encoding is not None:
data = codecs.decode(data, encoding)
return data
@overload
def read_w_string(self) -> bytearray:
...
@overload
def read_w_string(self, encoding: str) -> str:
...
def read_w_string(self, encoding=None) -> Union[str, bytearray]:
data = self.read_terminated_array(B'\0\0', 2)
if encoding is not None:
data = codecs.decode(data, encoding)
return data
def read_length_prefixed_ascii(self, prefix_size: int = 32):
return self.read_length_prefixed(prefix_size, 'latin1')
def read_length_prefixed_utf8(self, prefix_size: int = 32):
return self.read_length_prefixed(prefix_size, 'utf8')
def read_length_prefixed_utf16(self, prefix_size: int = 32, bytecount: bool = False):
block_size = 1 if bytecount else 2
return self.read_length_prefixed(prefix_size, 'utf-16le', block_size)
@overload
def read_length_prefixed(self, *, encoding: str, prefix_size: int = 32, block_size: int = 1) -> str:
...
@overload
def read_length_prefixed(self, prefix_size: int, encoding: str, block_size: int = 1) -> str:
...
@overload
def read_length_prefixed(self, *, prefix_size: int = 32, block_size: int = 1) -> T:
...
@overload
def read_length_prefixed(self, prefix_size: int, *, block_size: int = 1) -> T:
...
def read_length_prefixed(self, prefix_size: int = 32, encoding: Optional[str] = None, block_size: int = 1) -> Union[T, str]:
prefix = self.read_integer(prefix_size) * block_size
data = self.read(prefix)
if encoding is not None:
data = codecs.decode(data, encoding)
return data
def read_7bit_encoded_int(self, max_bits: int = 0, bigendian: bool | None = None) -> int:
value = 0
shift = 0
if bigendian is None:
bigendian = self.bigendian
while True:
b = self.u8fast()
if bigendian:
value <<= 7
value |= (b & 0x7F)
else:
value |= (b & 0x7F) << shift
if not b & 0x80:
return value
if (shift := shift + 7) > max_bits > 0:
raise OverflowError('Maximum bits were exceeded by encoded integer.')
class StructMeta(type):
"""
A metaclass to facilitate the behavior outlined for `refinery.lib.structures.Struct`.
"""
def __new__(mcls, name, bases, nmspc, parser=StructReader):
return type.__new__(mcls, name, bases, nmspc)
def __init__(cls, name, bases, nmspc, parser=StructReader):
super(StructMeta, cls).__init__(name, bases, nmspc)
original__init__ = cls.__init__
@functools.wraps(original__init__)
def wrapped__init__(self: Struct, reader, *args, **kwargs):
if not isinstance(reader, parser):
if issubclass(parser, reader.__class__):
raise ValueError(
F'A reader of type {reader.__class__.__name__} was passed to {cls.__name__}, '
F'but a {parser.__name__} is required.')
reader = parser(reader)
start = reader.tell()
view = reader.getbuffer()
original__init__(self, reader, *args, **kwargs)
self._data = view[start:reader.tell()]
del view
setattr(cls, '__init__', wrapped__init__)
class Struct(metaclass=StructMeta):
"""
A class to parse structured data. A `refinery.lib.structures.Struct` class can be instantiated
as follows:
foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument `reader`. If
the object `data` is already a `refinery.lib.structures.StructReader`, then it will be passed
as `reader`. Otherwise, the argument will be wrapped in a `refinery.lib.structures.StructReader`.
Additional arguments to the struct are passed through.
"""
_data: Union[memoryview, bytearray]
def __len__(self):
return len(self._data)
def __bytes__(self):
return bytes(self._data)
def get_data(self, decouple=False):
if decouple and isinstance(self._data, memoryview):
self._data = bytearray(self._data)
return self._data
def __init__(self, reader: StructReader, *args, **kwargs):
pass
AttrType = TypeVar('AttrType')
class PerInstanceAttribute(Generic[AttrType]):
def resolve(self, parent, value: Any) -> AttrType:
return value
def __init__(self):
self.__set: Dict[int, Any] = {}
self.__get: Dict[int, AttrType] = {}
def __set__(self, parent: Any, value: Any) -> None:
pid = id(parent)
if pid not in self.__set:
def cleanup(self, pid):
self.__set.pop(pid, None)
self.__get.pop(pid, None)
self.__set[pid] = value
weakref.finalize(parent, cleanup, self, id(parent))
def __get__(self, parent, tp=None) -> AttrType:
pid = id(parent)
if pid not in self.__get:
try:
seed = self.__set[pid]
except KeyError as K:
raise AttributeError from K
self.__get[pid] = self.resolve(parent, seed)
return self.__get[pid]
Functions
def signed(k, bitsize)
-
If
k
is an integer of the given bit size, cast it to a signed one.Expand source code Browse git
def signed(k: int, bitsize: int): """ If `k` is an integer of the given bit size, cast it to a signed one. """ M = 1 << bitsize k = k & (M - 1) return k - M if k >> (bitsize - 1) else k
Classes
class EOF (rest=b'')
-
While reading from a
MemoryFile
, less bytes were available than requested. The exception contains the data from the incomplete read.Expand source code Browse git
class EOF(EOFError): """ While reading from a `refinery.lib.structures.MemoryFile`, less bytes were available than requested. The exception contains the data from the incomplete read. """ def __init__(self, rest: ByteStr = B''): super().__init__('Unexpected end of buffer.') self.rest = rest def __bytes__(self): return bytes(self.rest)
Ancestors
- builtins.EOFError
- builtins.Exception
- builtins.BaseException
class StreamDetour (stream, offset=None, whence=0)
-
A stream detour is used as a context manager to temporarily read from a different location in the stream and then return to the original offset when the context ends.
Expand source code Browse git
class StreamDetour(Generic[R]): """ A stream detour is used as a context manager to temporarily read from a different location in the stream and then return to the original offset when the context ends. """ def __init__(self, stream: R, offset: Optional[int] = None, whence: int = io.SEEK_SET): self.stream = stream self.offset = offset self.whence = whence def __enter__(self): self.cursor = self.stream.tell() if self.offset is not None: self.stream.seek(self.offset, self.whence) return self def __exit__(self, *args): self.stream.seek(self.cursor, io.SEEK_SET)
Ancestors
- typing.Generic
class MemoryFileMethods (data=None, output=None, fileno=None, size_limit=None)
-
A thin wrapper around (potentially mutable) byte sequences which gives it the features of a file-like object.
Expand source code Browse git
class MemoryFileMethods(Generic[T, B]): """ A thin wrapper around (potentially mutable) byte sequences which gives it the features of a file-like object. """ _data: T _output: type[B] _cursor: int _closed: bool class SEEK(int, enum.Enum): CUR = io.SEEK_CUR END = io.SEEK_END SET = io.SEEK_SET def __bytes__(self): return bytes(self._data) def __init__( self, data: Optional[T] = None, output: Optional[type[B]] = None, fileno: Optional[int] = None, size_limit: Optional[int] = None, ) -> None: if data is None: if TYPE_CHECKING: data = cast(T, bytearray()) else: data = bytearray() if output is None: if TYPE_CHECKING: output = cast(type[B], type(data)) else: output = type(data) if size_limit is not None and len(data) > size_limit: raise ValueError('Initial data exceeds size limit') self._output = output self._cursor = 0 self._closed = False self._fileno = fileno self._quicksave = 0 self._size_limit = size_limit self._data = data def close(self) -> None: self._closed = True @property def closed(self) -> bool: return self._closed def __enter__(self): return self def __exit__(self, ex_type, ex_value, trace) -> bool: return False def flush(self) -> None: pass def isatty(self) -> bool: return False def __iter__(self): return self def __len__(self): return len(self._data) def __next__(self): line = self.readline() if not line: raise StopIteration return line def fileno(self) -> int: if self._fileno is None: raise OSError return self._fileno def readable(self) -> bool: return not self._closed def seekable(self) -> bool: return not self._closed @property def eof(self) -> bool: return self._closed or self._cursor >= len(self._data) @property def remaining_bytes(self) -> int: return len(self._data) - self.tell() def detour(self, offset: Optional[int] = None, whence: int = io.SEEK_SET): return StreamDetour(cast(io.IOBase, self), offset, whence=whence) def detour_absolute(self, offset: Optional[int] = None): return self.detour(offset, io.SEEK_SET) def detour_relative(self, offset: Optional[int] = None): return self.detour(offset, io.SEEK_CUR) def detour_from_end(self, offset: Optional[int] = None): return self.detour(offset, io.SEEK_END) def writable(self) -> bool: if self._closed: return False if isinstance(self._data, memoryview): return not self._data.readonly return isinstance(self._data, bytearray) def read_as(self, cast: Type[C], size: int = -1, peek: bool = False) -> C: out = self.read(size, peek) if not isinstance(out, cast): out = cast(out) return out def read(self, size: Optional[int] = None, peek: bool = False) -> B: beginning = self._cursor if size is None or size < 0: end = len(self._data) else: end = min(self._cursor + size, len(self._data)) result = self._data[beginning:end] if not isinstance(result, t := self._output): result = t(result) if not peek: self._cursor = end return result def readif(self, value: bytes) -> bool: size = len(value) stop = self._cursor + size mv = memoryview(self._data) if match := mv[self._cursor:stop] == value: self._cursor = stop return match def peek(self, size: Optional[int] = None) -> memoryview: cursor = self._cursor mv = memoryview(self._data) if size is None or size < 0: return mv[cursor:] return mv[cursor:cursor + size] def read1(self, size: Optional[int] = None, peek: bool = False) -> B: return self.read(size, peek) def _find_linebreak(self, beginning: int, end: int) -> int: if not isinstance(self._data, memoryview): return self._data.find(B'\n', beginning, end) for k in range(beginning, end): if self._data[k] == 0xA: return k return -1 def readline(self, size: Optional[int] = None) -> B: beginning, end = self._cursor, len(self._data) if size is not None and size >= 0: end = beginning + size p = self._find_linebreak(beginning, end) self._cursor = end if p < 0 else p + 1 result = self._data[beginning:self._cursor] if not isinstance(result, t := self._output): result = t(result) return result def readlines_iter(self, hint: Optional[int] = None) -> Iterable[B]: if hint is None or hint < 0: yield from self else: total = 0 while total < hint: line = next(self) total += len(line) yield line def readlines(self, hint: Optional[int] = None) -> list[bytes]: it = self.readlines_iter(hint) if issubclass(self._output, bytes): return list(it) return [bytes(t) for t in it] def readinto1(self, b) -> int: data = self.read(len(b)) size = len(data) b[:size] = data return size def readinto(self, b) -> int: return self.readinto1(b) def tell(self) -> int: return self._cursor def skip(self, n: int): self._cursor += n def seekrel(self, offset: int) -> int: return self.seek(offset, io.SEEK_CUR) def seekset(self, offset: int) -> int: if offset < 0: return self.seek(offset, io.SEEK_END) else: return self.seek(offset, io.SEEK_SET) def getbuffer(self) -> memoryview: return memoryview(self._data) def getvalue(self) -> T: return self._data def seek(self, offset: int, whence=io.SEEK_SET) -> int: if whence == io.SEEK_SET: if offset < 0: raise ValueError('no negative offsets allowed for SEEK_SET.') self._cursor = offset elif whence == io.SEEK_CUR: self._cursor += offset elif whence == io.SEEK_END: self._cursor = len(self._data) + offset self._cursor = max(self._cursor, 0) self._cursor = min(self._cursor, len(self._data)) return self._cursor def writelines(self, lines: Union[Iterable[Iterable[int]], Iterable[Buffer]]) -> None: for line in lines: self.write(line) def truncate(self, size: Optional[int] = None) -> int: if not isinstance(self._data, bytearray): raise TypeError if size is not None: if not (0 <= size <= len(self._data)): raise ValueError('invalid size value') self._cursor = size del self._data[self._cursor:] return self.tell() def write_byte(self, byte: int) -> None: if isinstance(self._data, bytes): raise TypeError if isinstance(self._data, memoryview): raise NotImplementedError limit = self._size_limit cc = self._cursor nc = cc + 1 if limit and nc > limit: raise EOF(bytes((byte,))) try: if cc < len(self._data): self._data[cc] = byte else: self._data.append(byte) except Exception as T: raise OSError(str(T)) from T else: self._cursor = nc def write(self, _data: Union[Buffer, Iterable[int]]) -> int: out = self._data end = len(out) if isinstance(out, memoryview): if out.readonly: raise PermissionError out = out.obj if not isinstance(out, bytearray): raise PermissionError try: getbuf = cast('Buffer', _data).__buffer__ except AttributeError: data = cast('Iterable[int]', _data) else: data = getbuf(0) beginning = self._cursor limit = self._size_limit if limit is None and beginning == end: out[end:] = data self._cursor = end = len(out) return end - beginning try: size = len(cast(Sized, data)) except Exception: it = iter(data) cursor = 0 for cursor, b in enumerate(it, beginning): out[cursor] = b if cursor >= end - 1: break else: cursor += 1 self._cursor = cursor return cursor - beginning if limit is None: out[end:] = bytes(it) else: out[end:limit] = bytes(itertools.islice(it, 0, limit - end)) try: b = next(it) except StopIteration: self._cursor = limit return limit - beginning else: rest = bytearray((b,)) rest[1:] = it raise EOF(rest) else: if limit and size + beginning > limit: raise EOF(bytes(data)) self._cursor += size try: out[beginning:self._cursor] = data except Exception as T: self._cursor = beginning raise OSError(str(T)) from T return size self._cursor = end = len(out) return end - beginning def __getitem__(self, slice): result = self._data[slice] if not isinstance(result, t := self._output): result = t(result) return result def replay(self, offset: int, length: int): cursor = self._cursor if offset not in range(cursor + 1): raise ValueError(F'The supplied delta {offset} is not in the valid range [0,{self._cursor}].') rep, r = divmod(length, offset) offset = cursor - offset replay = self._data[offset:offset + r] if rep > 0: # While this is technically a copy, it is faster than repeated calls to write. replay = bytes(self._data[offset:cursor]) * rep + replay self.write(replay)
Ancestors
- typing.Generic
Subclasses
Class variables
var SEEK
-
int([x]) -> integer int(x, base=10) -> integer
Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating-point numbers, this truncates towards zero.
If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.
>>> int('0b100', base=0) 4
Instance variables
var closed
-
Expand source code Browse git
@property def closed(self) -> bool: return self._closed
var eof
-
Expand source code Browse git
@property def eof(self) -> bool: return self._closed or self._cursor >= len(self._data)
var remaining_bytes
-
Expand source code Browse git
@property def remaining_bytes(self) -> int: return len(self._data) - self.tell()
Methods
def close(self)
-
Expand source code Browse git
def close(self) -> None: self._closed = True
def flush(self)
-
Expand source code Browse git
def flush(self) -> None: pass
def isatty(self)
-
Expand source code Browse git
def isatty(self) -> bool: return False
def fileno(self)
-
Expand source code Browse git
def fileno(self) -> int: if self._fileno is None: raise OSError return self._fileno
def readable(self)
-
Expand source code Browse git
def readable(self) -> bool: return not self._closed
def seekable(self)
-
Expand source code Browse git
def seekable(self) -> bool: return not self._closed
def detour(self, offset=None, whence=0)
-
Expand source code Browse git
def detour(self, offset: Optional[int] = None, whence: int = io.SEEK_SET): return StreamDetour(cast(io.IOBase, self), offset, whence=whence)
def detour_absolute(self, offset=None)
-
Expand source code Browse git
def detour_absolute(self, offset: Optional[int] = None): return self.detour(offset, io.SEEK_SET)
def detour_relative(self, offset=None)
-
Expand source code Browse git
def detour_relative(self, offset: Optional[int] = None): return self.detour(offset, io.SEEK_CUR)
def detour_from_end(self, offset=None)
-
Expand source code Browse git
def detour_from_end(self, offset: Optional[int] = None): return self.detour(offset, io.SEEK_END)
def writable(self)
-
Expand source code Browse git
def writable(self) -> bool: if self._closed: return False if isinstance(self._data, memoryview): return not self._data.readonly return isinstance(self._data, bytearray)
def read_as(self, cast, size=-1, peek=False)
-
Expand source code Browse git
def read_as(self, cast: Type[C], size: int = -1, peek: bool = False) -> C: out = self.read(size, peek) if not isinstance(out, cast): out = cast(out) return out
def read(self, size=None, peek=False)
-
Expand source code Browse git
def read(self, size: Optional[int] = None, peek: bool = False) -> B: beginning = self._cursor if size is None or size < 0: end = len(self._data) else: end = min(self._cursor + size, len(self._data)) result = self._data[beginning:end] if not isinstance(result, t := self._output): result = t(result) if not peek: self._cursor = end return result
def readif(self, value)
-
Expand source code Browse git
def readif(self, value: bytes) -> bool: size = len(value) stop = self._cursor + size mv = memoryview(self._data) if match := mv[self._cursor:stop] == value: self._cursor = stop return match
def peek(self, size=None)
-
Expand source code Browse git
def peek(self, size: Optional[int] = None) -> memoryview: cursor = self._cursor mv = memoryview(self._data) if size is None or size < 0: return mv[cursor:] return mv[cursor:cursor + size]
def read1(self, size=None, peek=False)
-
Expand source code Browse git
def read1(self, size: Optional[int] = None, peek: bool = False) -> B: return self.read(size, peek)
def readline(self, size=None)
-
Expand source code Browse git
def readline(self, size: Optional[int] = None) -> B: beginning, end = self._cursor, len(self._data) if size is not None and size >= 0: end = beginning + size p = self._find_linebreak(beginning, end) self._cursor = end if p < 0 else p + 1 result = self._data[beginning:self._cursor] if not isinstance(result, t := self._output): result = t(result) return result
def readlines_iter(self, hint=None)
-
Expand source code Browse git
def readlines_iter(self, hint: Optional[int] = None) -> Iterable[B]: if hint is None or hint < 0: yield from self else: total = 0 while total < hint: line = next(self) total += len(line) yield line
def readlines(self, hint=None)
-
Expand source code Browse git
def readlines(self, hint: Optional[int] = None) -> list[bytes]: it = self.readlines_iter(hint) if issubclass(self._output, bytes): return list(it) return [bytes(t) for t in it]
def readinto1(self, b)
-
Expand source code Browse git
def readinto1(self, b) -> int: data = self.read(len(b)) size = len(data) b[:size] = data return size
def readinto(self, b)
-
Expand source code Browse git
def readinto(self, b) -> int: return self.readinto1(b)
def tell(self)
-
Expand source code Browse git
def tell(self) -> int: return self._cursor
def skip(self, n)
-
Expand source code Browse git
def skip(self, n: int): self._cursor += n
def seekrel(self, offset)
-
Expand source code Browse git
def seekrel(self, offset: int) -> int: return self.seek(offset, io.SEEK_CUR)
def seekset(self, offset)
-
Expand source code Browse git
def seekset(self, offset: int) -> int: if offset < 0: return self.seek(offset, io.SEEK_END) else: return self.seek(offset, io.SEEK_SET)
def getbuffer(self)
-
Expand source code Browse git
def getbuffer(self) -> memoryview: return memoryview(self._data)
def getvalue(self)
-
Expand source code Browse git
def getvalue(self) -> T: return self._data
def seek(self, offset, whence=0)
-
Expand source code Browse git
def seek(self, offset: int, whence=io.SEEK_SET) -> int: if whence == io.SEEK_SET: if offset < 0: raise ValueError('no negative offsets allowed for SEEK_SET.') self._cursor = offset elif whence == io.SEEK_CUR: self._cursor += offset elif whence == io.SEEK_END: self._cursor = len(self._data) + offset self._cursor = max(self._cursor, 0) self._cursor = min(self._cursor, len(self._data)) return self._cursor
def writelines(self, lines)
-
Expand source code Browse git
def writelines(self, lines: Union[Iterable[Iterable[int]], Iterable[Buffer]]) -> None: for line in lines: self.write(line)
def truncate(self, size=None)
-
Expand source code Browse git
def truncate(self, size: Optional[int] = None) -> int: if not isinstance(self._data, bytearray): raise TypeError if size is not None: if not (0 <= size <= len(self._data)): raise ValueError('invalid size value') self._cursor = size del self._data[self._cursor:] return self.tell()
def write_byte(self, byte)
-
Expand source code Browse git
def write_byte(self, byte: int) -> None: if isinstance(self._data, bytes): raise TypeError if isinstance(self._data, memoryview): raise NotImplementedError limit = self._size_limit cc = self._cursor nc = cc + 1 if limit and nc > limit: raise EOF(bytes((byte,))) try: if cc < len(self._data): self._data[cc] = byte else: self._data.append(byte) except Exception as T: raise OSError(str(T)) from T else: self._cursor = nc
def write(self, _data)
-
Expand source code Browse git
def write(self, _data: Union[Buffer, Iterable[int]]) -> int: out = self._data end = len(out) if isinstance(out, memoryview): if out.readonly: raise PermissionError out = out.obj if not isinstance(out, bytearray): raise PermissionError try: getbuf = cast('Buffer', _data).__buffer__ except AttributeError: data = cast('Iterable[int]', _data) else: data = getbuf(0) beginning = self._cursor limit = self._size_limit if limit is None and beginning == end: out[end:] = data self._cursor = end = len(out) return end - beginning try: size = len(cast(Sized, data)) except Exception: it = iter(data) cursor = 0 for cursor, b in enumerate(it, beginning): out[cursor] = b if cursor >= end - 1: break else: cursor += 1 self._cursor = cursor return cursor - beginning if limit is None: out[end:] = bytes(it) else: out[end:limit] = bytes(itertools.islice(it, 0, limit - end)) try: b = next(it) except StopIteration: self._cursor = limit return limit - beginning else: rest = bytearray((b,)) rest[1:] = it raise EOF(rest) else: if limit and size + beginning > limit: raise EOF(bytes(data)) self._cursor += size try: out[beginning:self._cursor] = data except Exception as T: self._cursor = beginning raise OSError(str(T)) from T return size self._cursor = end = len(out) return end - beginning
def replay(self, offset, length)
-
Expand source code Browse git
def replay(self, offset: int, length: int): cursor = self._cursor if offset not in range(cursor + 1): raise ValueError(F'The supplied delta {offset} is not in the valid range [0,{self._cursor}].') rep, r = divmod(length, offset) offset = cursor - offset replay = self._data[offset:offset + r] if rep > 0: # While this is technically a copy, it is faster than repeated calls to write. replay = bytes(self._data[offset:cursor]) * rep + replay self.write(replay)
class MemoryFile (data=None, output=None, fileno=None, size_limit=None)
-
A thin wrapper around (potentially mutable) byte sequences which gives it the features of a file-like object.
Expand source code Browse git
class MemoryFile(MemoryFileMethods[T, B], io.BytesIO): pass
Ancestors
- MemoryFileMethods
- typing.Generic
- _io.BytesIO
- _io._BufferedIOBase
- _io._IOBase
Subclasses
Inherited members
class order (*args, **kwds)
-
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
Expand source code Browse git
class order(str, enum.Enum): big = '>' little = '<'
Ancestors
- builtins.str
- enum.Enum
Class variables
var big
var little
class StructReader (data, bigendian=False)
-
An extension of a
MemoryFile
which provides methods to read structured data.Expand source code Browse git
class StructReader(MemoryFile[T, T]): """ An extension of a `refinery.lib.structures.MemoryFile` which provides methods to read structured data. """ class Unaligned(RuntimeError): pass def __init__(self, data: T, bigendian: bool = False): super().__init__(data) self._bbits = 0 self._nbits = 0 self.bigendian = bigendian def __enter__(self) -> StructReader: return super().__enter__() @property @contextlib.contextmanager def be(self): self.bigendian = True try: yield self finally: self.bigendian = False @property def byteorder_format(self) -> str: return '>' if self.bigendian else '<' @property def byteorder_name(self): return 'big' if self.bigendian else 'little' def seek(self, offset, whence=io.SEEK_SET) -> int: self._bbits = 0 self._nbits = 0 return super().seek(offset, whence) def read_exactly(self, size: Optional[int] = None, peek: bool = False) -> B: """ Read bytes from the underlying stream. Raises a `RuntimeError` when the stream is not currently byte-aligned, i.e. when `refinery.lib.structures.StructReader.byte_aligned` is `False`. Raises an exception of type `refinery.lib.structures.EOF` when fewer data is available in the stream than requested via the `size` parameter. The remaining data can be extracted from the exception. Use `refinery.lib.structures.StructReader.read_bytes` to read bytes from the stream when it is not byte-aligned. """ if not self.byte_aligned: raise StructReader.Unaligned('buffer is not byte-aligned') data = self.read1(size, peek) if size and len(data) < size: raise EOF(data) return data @property def byte_aligned(self) -> bool: """ This property is `True` if and only if there are currently no bits still waiting in the internal bit buffer. """ return not self._nbits def byte_align(self, blocksize: int = 1) -> Tuple[int, int]: """ This method clears the internal bit buffer and moves the cursor to the next byte. It returns a tuple containing the size and contents of the bit buffer. """ nbits = self._nbits bbits = self._bbits self._nbits = 0 self._bbits = 0 mod = self._cursor % blocksize if mod: self.seekrel(blocksize - mod) return nbits, bbits @property def remaining_bits(self) -> int: return 8 * self.remaining_bytes + self._nbits def read_integer(self, length: Optional[int] = None, peek: bool = False, bigendian: Optional[bool] = None) -> int: """ Read `length` many bits from the underlying stream as an integer. """ if length is None: length = self.remaining_bits if bigendian is None: bigendian = self.bigendian if length < self._nbits: new_count = self._nbits - length if bigendian: result = self._bbits >> new_count if not peek: self._bbits ^= result << new_count else: result = self._bbits & 2 ** length - 1 if not peek: self._bbits >>= length if not peek: self._nbits = new_count return result nbits, bbits = self._nbits, self._bbits number_of_missing_bits = length - nbits bytecount, rest = divmod(number_of_missing_bits, 8) if rest: bytecount += 1 rest = 8 - rest bb = self.read1(bytecount, True) if len(bb) != bytecount: raise EOFError if not peek: self.seekrel(bytecount) if bytecount == 1: result, = bb else: result = int.from_bytes(bb, self.byteorder_name) if not nbits and not rest: return result if bigendian: rbmask = 2 ** rest - 1 # noqa excess = result & rbmask # noqa result >>= rest # noqa result ^= bbits << number_of_missing_bits # noqa else: excess = result >> number_of_missing_bits # noqa result ^= excess << number_of_missing_bits # noqa result <<= nbits # noqa result |= bbits # noqa assert excess.bit_length() <= rest if not peek: self._nbits = rest self._bbits = excess return result def read_bytes(self, size: int, peek: bool = False) -> bytes: """ The method reads `size` many bytes from the underlying stream starting at the current bit. """ if self.byte_aligned: data = self.read_exactly(size, peek) if not isinstance(data, bytes): data = bytes(data) return data else: return self.read_integer(size * 8, peek).to_bytes(size, self.byteorder_name) def read_bit(self) -> int: """ This function is a shortcut for calling `refinery.lib.structures.StructReader.read_integer` with an argument of `1`, i.e. this reads the next bit from the stream. The bits of any byte in the stream are read from least significant to most significant. """ return self.read_integer(1) def read_bits(self, nbits: int, bigendian: Optional[bool] = None) -> Iterable[int]: """ This method returns the bits of `refinery.lib.structures.StructReader.read_integer` one by one. """ if bigendian is None: bigendian = self.bigendian chunk = self.read_integer(nbits, bigendian=bigendian) it = range(nbits - 1, -1, -1) if bigendian else range(nbits) for k in it: yield chunk >> k & 1 def read_flags(self, nbits: int, reverse=False) -> Iterable[bool]: """ Identical to `refinery.lib.structures.StructReader.read_bits` with every bit value cast to a boolean. """ bits = list(self.read_bits(nbits)) if reverse: bits.reverse() for bit in bits: yield bool(bit) def read_one_struct(self, spec: str, peek=False) -> UnpackType: item, = self.read_struct(spec, peek=peek) return item def read_struct(self, spec: str, peek=False) -> List[UnpackType]: """ Read structured data from the stream in any format supported by the `struct` module. The `format` argument can be used to override the current byte ordering. If the `unwrap` parameter is `True`, a single unpacked value will be returned as a scalar, not as a tuple with one element. """ if not spec: raise ValueError('no format specified') byteorder = spec[:1] if byteorder in '<!=@>': spec = spec[1:] else: byteorder = self.byteorder_format data = [] current_cursor = self.tell() # reserved struct characters: xcbB?hHiIlLqQnNefdspP for k, part in enumerate(re.split('(\\d*[auwgk])', spec)): if k % 2 == 1: count = 1 if len(part) == 1 else int(part[:~0]) part = part[~0] for _ in range(count): if part == 'a': data.append(self.read_c_string()) elif part == 'g': data.append(self.read_guid()) elif part == 'u': data.append(self.read_w_string()) elif part == 'w': data.append(codecs.decode(self.read_w_string(), 'utf-16le')) elif part == 'k': data.append(self.read_7bit_encoded_int()) continue else: part = F'{byteorder}{part}' data.extend(struct.unpack(part, self.read_bytes(struct.calcsize(part)))) if peek: self.seekset(current_cursor) return data def read_nibble(self, peek: bool = False) -> int: """ Calls `refinery.lib.structures.StructReader.read_integer` with an argument of `4`. """ return self.read_integer(4, peek) def u8(self, peek: bool = False) -> int: return self.read_integer(8, peek) def i8(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8) def u16(self, peek: bool = False) -> int: return self.read_integer(16, peek) def u32(self, peek: bool = False) -> int: return self.read_integer(32, peek) def u64(self, peek: bool = False) -> int: return self.read_integer(64, peek) def i16(self, peek: bool = False) -> int: return signed(self.read_integer(16, peek), 16) def i32(self, peek: bool = False) -> int: return signed(self.read_integer(32, peek), 32) def i64(self, peek: bool = False) -> int: return signed(self.read_integer(64, peek), 64) def f32(self, peek: bool = False) -> float: return cast(float, self.read_one_struct('f', peek=peek)) def f64(self, peek: bool = False) -> float: return cast(float, self.read_one_struct('d', peek=peek)) def u8fast(self): try: b = self._data[self._cursor] except IndexError: raise EOFError else: self._cursor += 1 return b def read_byte(self, peek: bool = False) -> int: return self.read_integer(8, peek) def read_char(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8) def read_terminated_array(self, terminator: bytes, alignment: int = 1) -> bytearray: buf = self.getvalue() pos = self.tell() if isinstance(buf, memoryview): def find(whence: int): n = len(terminator) for k in range(whence, len(buf)): if buf[k:k + n] == terminator: return k return -1 else: def find(whence: int): return buf.find(terminator, whence) try: end = pos - 1 while True: end = find(end + 1) if end < 0 or not (end - pos) % alignment: break except AttributeError: result = bytearray() while not self.eof: result.extend(self.read_bytes(alignment)) if result.endswith(terminator): return result[:-len(terminator)] self.seek(pos) raise EOF else: data = self.read_exactly(end - pos) self.seekrel(len(terminator)) return bytearray(data) def read_guid(self) -> UUID: return UUID(bytes_le=self.read_bytes(16)) def read_uuid(self) -> UUID: return UUID(bytes=self.read_bytes(16)) @overload def read_c_string(self) -> bytearray: ... @overload def read_c_string(self, encoding: str) -> str: ... def read_c_string(self, encoding=None) -> Union[str, bytearray]: data = self.read_terminated_array(B'\0') if encoding is not None: data = codecs.decode(data, encoding) return data @overload def read_w_string(self) -> bytearray: ... @overload def read_w_string(self, encoding: str) -> str: ... def read_w_string(self, encoding=None) -> Union[str, bytearray]: data = self.read_terminated_array(B'\0\0', 2) if encoding is not None: data = codecs.decode(data, encoding) return data def read_length_prefixed_ascii(self, prefix_size: int = 32): return self.read_length_prefixed(prefix_size, 'latin1') def read_length_prefixed_utf8(self, prefix_size: int = 32): return self.read_length_prefixed(prefix_size, 'utf8') def read_length_prefixed_utf16(self, prefix_size: int = 32, bytecount: bool = False): block_size = 1 if bytecount else 2 return self.read_length_prefixed(prefix_size, 'utf-16le', block_size) @overload def read_length_prefixed(self, *, encoding: str, prefix_size: int = 32, block_size: int = 1) -> str: ... @overload def read_length_prefixed(self, prefix_size: int, encoding: str, block_size: int = 1) -> str: ... @overload def read_length_prefixed(self, *, prefix_size: int = 32, block_size: int = 1) -> T: ... @overload def read_length_prefixed(self, prefix_size: int, *, block_size: int = 1) -> T: ... def read_length_prefixed(self, prefix_size: int = 32, encoding: Optional[str] = None, block_size: int = 1) -> Union[T, str]: prefix = self.read_integer(prefix_size) * block_size data = self.read(prefix) if encoding is not None: data = codecs.decode(data, encoding) return data def read_7bit_encoded_int(self, max_bits: int = 0, bigendian: bool | None = None) -> int: value = 0 shift = 0 if bigendian is None: bigendian = self.bigendian while True: b = self.u8fast() if bigendian: value <<= 7 value |= (b & 0x7F) else: value |= (b & 0x7F) << shift if not b & 0x80: return value if (shift := shift + 7) > max_bits > 0: raise OverflowError('Maximum bits were exceeded by encoded integer.')
Ancestors
- MemoryFile
- MemoryFileMethods
- typing.Generic
- _io.BytesIO
- _io._BufferedIOBase
- _io._IOBase
Subclasses
Class variables
var Unaligned
-
Unspecified run-time error.
Instance variables
var be
-
Expand source code Browse git
@property @contextlib.contextmanager def be(self): self.bigendian = True try: yield self finally: self.bigendian = False
var byteorder_format
-
Expand source code Browse git
@property def byteorder_format(self) -> str: return '>' if self.bigendian else '<'
var byteorder_name
-
Expand source code Browse git
@property def byteorder_name(self): return 'big' if self.bigendian else 'little'
var byte_aligned
-
This property is
True
if and only if there are currently no bits still waiting in the internal bit buffer.Expand source code Browse git
@property def byte_aligned(self) -> bool: """ This property is `True` if and only if there are currently no bits still waiting in the internal bit buffer. """ return not self._nbits
var remaining_bits
-
Expand source code Browse git
@property def remaining_bits(self) -> int: return 8 * self.remaining_bytes + self._nbits
Methods
def seek(self, offset, whence=0)
-
Change stream position.
Seek to byte offset pos relative to position indicated by whence: 0 Start of stream (the default). pos should be >= 0; 1 Current position - pos may be negative; 2 End of stream - pos usually negative. Returns the new absolute position.
Expand source code Browse git
def seek(self, offset, whence=io.SEEK_SET) -> int: self._bbits = 0 self._nbits = 0 return super().seek(offset, whence)
def read_exactly(self, size=None, peek=False)
-
Read bytes from the underlying stream. Raises a
RuntimeError
when the stream is not currently byte-aligned, i.e. whenStructReader.byte_aligned
isFalse
. Raises an exception of typeEOF
when fewer data is available in the stream than requested via thesize
parameter. The remaining data can be extracted from the exception. UseStructReader.read_bytes()
to read bytes from the stream when it is not byte-aligned.Expand source code Browse git
def read_exactly(self, size: Optional[int] = None, peek: bool = False) -> B: """ Read bytes from the underlying stream. Raises a `RuntimeError` when the stream is not currently byte-aligned, i.e. when `refinery.lib.structures.StructReader.byte_aligned` is `False`. Raises an exception of type `refinery.lib.structures.EOF` when fewer data is available in the stream than requested via the `size` parameter. The remaining data can be extracted from the exception. Use `refinery.lib.structures.StructReader.read_bytes` to read bytes from the stream when it is not byte-aligned. """ if not self.byte_aligned: raise StructReader.Unaligned('buffer is not byte-aligned') data = self.read1(size, peek) if size and len(data) < size: raise EOF(data) return data
def byte_align(self, blocksize=1)
-
This method clears the internal bit buffer and moves the cursor to the next byte. It returns a tuple containing the size and contents of the bit buffer.
Expand source code Browse git
def byte_align(self, blocksize: int = 1) -> Tuple[int, int]: """ This method clears the internal bit buffer and moves the cursor to the next byte. It returns a tuple containing the size and contents of the bit buffer. """ nbits = self._nbits bbits = self._bbits self._nbits = 0 self._bbits = 0 mod = self._cursor % blocksize if mod: self.seekrel(blocksize - mod) return nbits, bbits
def read_integer(self, length=None, peek=False, bigendian=None)
-
Read
length
many bits from the underlying stream as an integer.Expand source code Browse git
def read_integer(self, length: Optional[int] = None, peek: bool = False, bigendian: Optional[bool] = None) -> int: """ Read `length` many bits from the underlying stream as an integer. """ if length is None: length = self.remaining_bits if bigendian is None: bigendian = self.bigendian if length < self._nbits: new_count = self._nbits - length if bigendian: result = self._bbits >> new_count if not peek: self._bbits ^= result << new_count else: result = self._bbits & 2 ** length - 1 if not peek: self._bbits >>= length if not peek: self._nbits = new_count return result nbits, bbits = self._nbits, self._bbits number_of_missing_bits = length - nbits bytecount, rest = divmod(number_of_missing_bits, 8) if rest: bytecount += 1 rest = 8 - rest bb = self.read1(bytecount, True) if len(bb) != bytecount: raise EOFError if not peek: self.seekrel(bytecount) if bytecount == 1: result, = bb else: result = int.from_bytes(bb, self.byteorder_name) if not nbits and not rest: return result if bigendian: rbmask = 2 ** rest - 1 # noqa excess = result & rbmask # noqa result >>= rest # noqa result ^= bbits << number_of_missing_bits # noqa else: excess = result >> number_of_missing_bits # noqa result ^= excess << number_of_missing_bits # noqa result <<= nbits # noqa result |= bbits # noqa assert excess.bit_length() <= rest if not peek: self._nbits = rest self._bbits = excess return result
def read_bytes(self, size, peek=False)
-
The method reads
size
many bytes from the underlying stream starting at the current bit.Expand source code Browse git
def read_bytes(self, size: int, peek: bool = False) -> bytes: """ The method reads `size` many bytes from the underlying stream starting at the current bit. """ if self.byte_aligned: data = self.read_exactly(size, peek) if not isinstance(data, bytes): data = bytes(data) return data else: return self.read_integer(size * 8, peek).to_bytes(size, self.byteorder_name)
def read_bit(self)
-
This function is a shortcut for calling
StructReader.read_integer()
with an argument of1
, i.e. this reads the next bit from the stream. The bits of any byte in the stream are read from least significant to most significant.Expand source code Browse git
def read_bit(self) -> int: """ This function is a shortcut for calling `refinery.lib.structures.StructReader.read_integer` with an argument of `1`, i.e. this reads the next bit from the stream. The bits of any byte in the stream are read from least significant to most significant. """ return self.read_integer(1)
def read_bits(self, nbits, bigendian=None)
-
This method returns the bits of
StructReader.read_integer()
one by one.Expand source code Browse git
def read_bits(self, nbits: int, bigendian: Optional[bool] = None) -> Iterable[int]: """ This method returns the bits of `refinery.lib.structures.StructReader.read_integer` one by one. """ if bigendian is None: bigendian = self.bigendian chunk = self.read_integer(nbits, bigendian=bigendian) it = range(nbits - 1, -1, -1) if bigendian else range(nbits) for k in it: yield chunk >> k & 1
def read_flags(self, nbits, reverse=False)
-
Identical to
StructReader.read_bits()
with every bit value cast to a boolean.Expand source code Browse git
def read_flags(self, nbits: int, reverse=False) -> Iterable[bool]: """ Identical to `refinery.lib.structures.StructReader.read_bits` with every bit value cast to a boolean. """ bits = list(self.read_bits(nbits)) if reverse: bits.reverse() for bit in bits: yield bool(bit)
def read_one_struct(self, spec, peek=False)
-
Expand source code Browse git
def read_one_struct(self, spec: str, peek=False) -> UnpackType: item, = self.read_struct(spec, peek=peek) return item
def read_struct(self, spec, peek=False)
-
Read structured data from the stream in any format supported by the
struct
module. Theformat
argument can be used to override the current byte ordering. If theunwrap
parameter isTrue
, a single unpacked value will be returned as a scalar, not as a tuple with one element.Expand source code Browse git
def read_struct(self, spec: str, peek=False) -> List[UnpackType]: """ Read structured data from the stream in any format supported by the `struct` module. The `format` argument can be used to override the current byte ordering. If the `unwrap` parameter is `True`, a single unpacked value will be returned as a scalar, not as a tuple with one element. """ if not spec: raise ValueError('no format specified') byteorder = spec[:1] if byteorder in '<!=@>': spec = spec[1:] else: byteorder = self.byteorder_format data = [] current_cursor = self.tell() # reserved struct characters: xcbB?hHiIlLqQnNefdspP for k, part in enumerate(re.split('(\\d*[auwgk])', spec)): if k % 2 == 1: count = 1 if len(part) == 1 else int(part[:~0]) part = part[~0] for _ in range(count): if part == 'a': data.append(self.read_c_string()) elif part == 'g': data.append(self.read_guid()) elif part == 'u': data.append(self.read_w_string()) elif part == 'w': data.append(codecs.decode(self.read_w_string(), 'utf-16le')) elif part == 'k': data.append(self.read_7bit_encoded_int()) continue else: part = F'{byteorder}{part}' data.extend(struct.unpack(part, self.read_bytes(struct.calcsize(part)))) if peek: self.seekset(current_cursor) return data
def read_nibble(self, peek=False)
-
Calls
StructReader.read_integer()
with an argument of4
.Expand source code Browse git
def read_nibble(self, peek: bool = False) -> int: """ Calls `refinery.lib.structures.StructReader.read_integer` with an argument of `4`. """ return self.read_integer(4, peek)
def u8(self, peek=False)
-
Expand source code Browse git
def u8(self, peek: bool = False) -> int: return self.read_integer(8, peek)
def i8(self, peek=False)
-
Expand source code Browse git
def i8(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8)
def u16(self, peek=False)
-
Expand source code Browse git
def u16(self, peek: bool = False) -> int: return self.read_integer(16, peek)
def u32(self, peek=False)
-
Expand source code Browse git
def u32(self, peek: bool = False) -> int: return self.read_integer(32, peek)
def u64(self, peek=False)
-
Expand source code Browse git
def u64(self, peek: bool = False) -> int: return self.read_integer(64, peek)
def i16(self, peek=False)
-
Expand source code Browse git
def i16(self, peek: bool = False) -> int: return signed(self.read_integer(16, peek), 16)
def i32(self, peek=False)
-
Expand source code Browse git
def i32(self, peek: bool = False) -> int: return signed(self.read_integer(32, peek), 32)
def i64(self, peek=False)
-
Expand source code Browse git
def i64(self, peek: bool = False) -> int: return signed(self.read_integer(64, peek), 64)
def f32(self, peek=False)
-
Expand source code Browse git
def f32(self, peek: bool = False) -> float: return cast(float, self.read_one_struct('f', peek=peek))
def f64(self, peek=False)
-
Expand source code Browse git
def f64(self, peek: bool = False) -> float: return cast(float, self.read_one_struct('d', peek=peek))
def u8fast(self)
-
Expand source code Browse git
def u8fast(self): try: b = self._data[self._cursor] except IndexError: raise EOFError else: self._cursor += 1 return b
def read_byte(self, peek=False)
-
Expand source code Browse git
def read_byte(self, peek: bool = False) -> int: return self.read_integer(8, peek)
def read_char(self, peek=False)
-
Expand source code Browse git
def read_char(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8)
def read_terminated_array(self, terminator, alignment=1)
-
Expand source code Browse git
def read_terminated_array(self, terminator: bytes, alignment: int = 1) -> bytearray: buf = self.getvalue() pos = self.tell() if isinstance(buf, memoryview): def find(whence: int): n = len(terminator) for k in range(whence, len(buf)): if buf[k:k + n] == terminator: return k return -1 else: def find(whence: int): return buf.find(terminator, whence) try: end = pos - 1 while True: end = find(end + 1) if end < 0 or not (end - pos) % alignment: break except AttributeError: result = bytearray() while not self.eof: result.extend(self.read_bytes(alignment)) if result.endswith(terminator): return result[:-len(terminator)] self.seek(pos) raise EOF else: data = self.read_exactly(end - pos) self.seekrel(len(terminator)) return bytearray(data)
def read_guid(self)
-
Expand source code Browse git
def read_guid(self) -> UUID: return UUID(bytes_le=self.read_bytes(16))
def read_uuid(self)
-
Expand source code Browse git
def read_uuid(self) -> UUID: return UUID(bytes=self.read_bytes(16))
def read_c_string(self, encoding=None)
-
Expand source code Browse git
def read_c_string(self, encoding=None) -> Union[str, bytearray]: data = self.read_terminated_array(B'\0') if encoding is not None: data = codecs.decode(data, encoding) return data
def read_w_string(self, encoding=None)
-
Expand source code Browse git
def read_w_string(self, encoding=None) -> Union[str, bytearray]: data = self.read_terminated_array(B'\0\0', 2) if encoding is not None: data = codecs.decode(data, encoding) return data
def read_length_prefixed_ascii(self, prefix_size=32)
-
Expand source code Browse git
def read_length_prefixed_ascii(self, prefix_size: int = 32): return self.read_length_prefixed(prefix_size, 'latin1')
def read_length_prefixed_utf8(self, prefix_size=32)
-
Expand source code Browse git
def read_length_prefixed_utf8(self, prefix_size: int = 32): return self.read_length_prefixed(prefix_size, 'utf8')
def read_length_prefixed_utf16(self, prefix_size=32, bytecount=False)
-
Expand source code Browse git
def read_length_prefixed_utf16(self, prefix_size: int = 32, bytecount: bool = False): block_size = 1 if bytecount else 2 return self.read_length_prefixed(prefix_size, 'utf-16le', block_size)
def read_length_prefixed(self, prefix_size=32, encoding=None, block_size=1)
-
Expand source code Browse git
def read_length_prefixed(self, prefix_size: int = 32, encoding: Optional[str] = None, block_size: int = 1) -> Union[T, str]: prefix = self.read_integer(prefix_size) * block_size data = self.read(prefix) if encoding is not None: data = codecs.decode(data, encoding) return data
def read_7bit_encoded_int(self, max_bits=0, bigendian=None)
-
Expand source code Browse git
def read_7bit_encoded_int(self, max_bits: int = 0, bigendian: bool | None = None) -> int: value = 0 shift = 0 if bigendian is None: bigendian = self.bigendian while True: b = self.u8fast() if bigendian: value <<= 7 value |= (b & 0x7F) else: value |= (b & 0x7F) << shift if not b & 0x80: return value if (shift := shift + 7) > max_bits > 0: raise OverflowError('Maximum bits were exceeded by encoded integer.')
Inherited members
class StructMeta (name, bases, nmspc, parser=refinery.lib.structures.StructReader)
-
A metaclass to facilitate the behavior outlined for
Struct
.Expand source code Browse git
class StructMeta(type): """ A metaclass to facilitate the behavior outlined for `refinery.lib.structures.Struct`. """ def __new__(mcls, name, bases, nmspc, parser=StructReader): return type.__new__(mcls, name, bases, nmspc) def __init__(cls, name, bases, nmspc, parser=StructReader): super(StructMeta, cls).__init__(name, bases, nmspc) original__init__ = cls.__init__ @functools.wraps(original__init__) def wrapped__init__(self: Struct, reader, *args, **kwargs): if not isinstance(reader, parser): if issubclass(parser, reader.__class__): raise ValueError( F'A reader of type {reader.__class__.__name__} was passed to {cls.__name__}, ' F'but a {parser.__name__} is required.') reader = parser(reader) start = reader.tell() view = reader.getbuffer() original__init__(self, reader, *args, **kwargs) self._data = view[start:reader.tell()] del view setattr(cls, '__init__', wrapped__init__)
Ancestors
- builtins.type
class Struct (reader, *args, **kwargs)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class Struct(metaclass=StructMeta): """ A class to parse structured data. A `refinery.lib.structures.Struct` class can be instantiated as follows: foo = Struct(data, bar=29) The initialization routine of the structure will be called with a single argument `reader`. If the object `data` is already a `refinery.lib.structures.StructReader`, then it will be passed as `reader`. Otherwise, the argument will be wrapped in a `refinery.lib.structures.StructReader`. Additional arguments to the struct are passed through. """ _data: Union[memoryview, bytearray] def __len__(self): return len(self._data) def __bytes__(self): return bytes(self._data) def get_data(self, decouple=False): if decouple and isinstance(self._data, memoryview): self._data = bytearray(self._data) return self._data def __init__(self, reader: StructReader, *args, **kwargs): pass
Subclasses
- CabCompressedBlock
- CabDisk
- CabFile
- CabFolder
- CHM
- ChmStruct
- ContentSections
- ContentSectionsName
- ContentSectionsResetTable
- DirectoryListingEntry
- QuickRefArea
- SectionHeader
- DexFile
- JsonStruct
- TSetupOffsets
- IFPSFile
- JvAccessFlags
- JvClassFile
- JvCode
- JvException
- JvOpCode
- refinery.lib.java._HasPoolAndTag
- BCRYPT_RSAKEY_BLOB
- BLOBHEADER
- CRYPTOKEY
- DHPUBKEY
- PLAINTEXTKEYBLOB
- PRIVATEKEYBLOB
- RSAPUBKEY
- SIMPLEBLOB
- LZFHeader
- LZGStream
- RangeDecoder
- LZO
- LZOChunk
- A3xRecord
- A3xScript
- AsarHeader
- CPIOEntry
- GzipHeader
- FatArch
- NSArchive
- NSBlockHeaderOffset
- NSHeader
- NSScriptExtendedInstruction
- NSScriptInstruction
- PYZ
- PiTOCEntry
- PyInstallerArchiveEpilogue
- refinery.units.formats.archive.xtzip._FileRecord
- BlobIndex
- CodeDirectoryBlob
- SuperBlob
- GRPICONDIR
- GRPICONDIRENTRY
- ZipCentralDirectory
- ZipEndOfCentralDirectory
Methods
def get_data(self, decouple=False)
-
Expand source code Browse git
def get_data(self, decouple=False): if decouple and isinstance(self._data, memoryview): self._data = bytearray(self._data) return self._data
class PerInstanceAttribute
-
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Expand source code Browse git
def __get__(self, parent, tp=None) -> AttrType: pid = id(parent) if pid not in self.__get: try: seed = self.__set[pid] except KeyError as K: raise AttributeError from K self.__get[pid] = self.resolve(parent, seed) return self.__get[pid]
Ancestors
- typing.Generic
Subclasses
- refinery.lib.java.Index
Methods
def resolve(self, parent, value)
-
Expand source code Browse git
def resolve(self, parent, value: Any) -> AttrType: return value