Module refinery.lib.structures
Interfaces and classes to read structured data.
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Interfaces and classes to read structured data.
"""
from __future__ import annotations
import contextlib
import itertools
import enum
import functools
import io
import re
import struct
import weakref
from typing import (
Any,
ByteString,
Dict,
Generic,
Iterable,
List,
Optional,
Tuple,
Type,
TypeVar,
Union,
)
T = TypeVar('T', bound=Union[bytearray, bytes, memoryview])
C = TypeVar('C', bound=Union[bytearray, bytes, memoryview])
UnpackType = Union[int, bool, float, bytes]
def signed(k: int, bitsize: int):
"""
If `k` is an integer of the given bit size, cast it to a signed one.
"""
M = 1 << bitsize
k = k & (M - 1)
return k - M if k >> (bitsize - 1) else k
class EOF(EOFError):
"""
While reading from a `refinery.lib.structures.MemoryFile`, less bytes were available than
requested. The exception contains the data from the incomplete read.
"""
def __init__(self, rest: ByteString = B''):
super().__init__('Unexpected end of buffer.')
self.rest = rest
def __bytes__(self):
return bytes(self.rest)
class StreamDetour:
"""
A stream detour is used as a context manager to temporarily read from a different location
in the stream and then return to the original offset when the context ends.
"""
def __init__(self, stream: io.IOBase, offset: Optional[int] = None, whence: int = io.SEEK_SET):
self.stream = stream
self.offset = offset
self.whence = whence
def __enter__(self):
self.cursor = self.stream.tell()
if self.offset is not None:
self.stream.seek(self.offset, self.whence)
return self
def __exit__(self, *args):
self.stream.seek(self.cursor, io.SEEK_SET)
class MemoryFileMethods(Generic[T]):
"""
A thin wrapper around (potentially mutable) byte sequences which gives it the features of a
file-like object.
"""
closed: bool
read_as_bytes: bool
_data: T
_cursor: int
_closed: bool
class SEEK(int, enum.Enum):
CUR = io.SEEK_CUR
END = io.SEEK_END
SET = io.SEEK_SET
def __init__(
self,
data: Optional[T] = None,
read_as_bytes=False,
fileno: Optional[int] = None,
size_limit: Optional[int] = None,
) -> None:
if data is None:
data = bytearray()
elif size_limit is not None and len(data) > size_limit:
raise ValueError('Initial data exceeds size limit')
self._data = data
self._cursor = 0
self._closed = False
self._fileno = fileno
self.read_as_bytes = read_as_bytes
self._size_limit = size_limit
def close(self) -> None:
self._closed = True
@property
def closed(self) -> bool:
return self._closed
def __enter__(self) -> MemoryFile:
return self
def __exit__(self, ex_type, ex_value, trace) -> bool:
return False
def flush(self) -> None:
pass
def isatty(self) -> bool:
return False
def __iter__(self):
return self
def __len__(self):
return len(self._data)
def __next__(self):
line = self.readline()
if not line:
raise StopIteration
return line
def fileno(self) -> int:
if self._fileno is None:
raise OSError
return self._fileno
def readable(self) -> bool:
return not self._closed
def seekable(self) -> bool:
return not self._closed
@property
def eof(self) -> bool:
return self._closed or self._cursor >= len(self._data)
@property
def remaining_bytes(self) -> int:
return len(self._data) - self.tell()
def detour(self, offset: Optional[int] = None, whence: int = io.SEEK_SET):
return StreamDetour(self, offset, whence=whence)
def writable(self) -> bool:
if self._closed:
return False
if isinstance(self._data, memoryview):
return not self._data.readonly
return isinstance(self._data, bytearray)
def read_as(self, cast: Type[C], size: int = -1, peek: bool = False) -> C:
out = self.read(size, peek)
if not isinstance(out, cast):
out = cast(out)
return out
def read(self, size: int = -1, peek: bool = False) -> T:
beginning = self._cursor
if size is None or size < 0:
end = len(self._data)
else:
end = min(self._cursor + size, len(self._data))
result = self._data[beginning:end]
if self.read_as_bytes and not isinstance(result, bytes):
result = bytes(result)
if not peek:
self._cursor = end
return result
def peek(self, size: int = -1) -> memoryview:
cursor = self._cursor
mv = memoryview(self._data)
if size is None or size < 0:
return mv[cursor:]
return mv[cursor:cursor + size]
def read1(self, size: int = -1, peek: bool = False) -> T:
return self.read(size, peek)
def _find_linebreak(self, beginning: int, end: int) -> int:
if not isinstance(self._data, memoryview):
return self._data.find(B'\n', beginning, end)
for k in range(beginning, end):
if self._data[k] == 0xA: return k
return -1
def readline(self, size: int = -1) -> T:
beginning, end = self._cursor, len(self._data)
if size is not None and size >= 0:
end = beginning + size
p = self._find_linebreak(beginning, end)
self._cursor = end if p < 0 else p + 1
result = self._data[beginning:self._cursor]
if self.read_as_bytes and not isinstance(result, bytes):
result = bytes(result)
return result
def readlines(self, hint: int = -1) -> Iterable[T]:
if hint is None or hint < 0:
yield from self
else:
total = 0
while total < hint:
line = next(self)
total += len(line)
yield line
def readinto1(self, b) -> int:
data = self.read(len(b))
size = len(data)
b[:size] = data
return size
def readinto(self, b) -> int:
return self.readinto1(b)
def tell(self) -> int:
return self._cursor
def seekrel(self, offset: int) -> int:
return self.seek(offset, io.SEEK_CUR)
def seekset(self, offset: int) -> int:
if offset < 0:
return self.seek(offset, io.SEEK_END)
else:
return self.seek(offset, io.SEEK_SET)
def getbuffer(self) -> T:
return self._data
def getvalue(self) -> T:
return self._data
def seek(self, offset: int, whence=io.SEEK_SET) -> int:
if whence == io.SEEK_SET:
if offset < 0:
raise ValueError('no negative offsets allowed for SEEK_SET.')
self._cursor = offset
elif whence == io.SEEK_CUR:
self._cursor += offset
elif whence == io.SEEK_END:
self._cursor = len(self._data) + offset
self._cursor = max(self._cursor, 0)
self._cursor = min(self._cursor, len(self._data))
return self._cursor
def writelines(self, lines: Iterable[ByteString]) -> None:
for line in lines:
self.write(line)
def truncate(self, size=None) -> None:
if size is not None:
if not (0 <= size <= len(self._data)):
raise ValueError('invalid size value')
self._cursor = size
del self._data[self._cursor:]
def write_byte(self, byte: int) -> None:
limit = self._size_limit
cc = self._cursor
nc = cc + 1
if limit and nc > limit:
raise EOF(bytes((byte,)))
try:
if cc < len(self._data):
self._data[cc] = byte
else:
self._data.append(byte)
except Exception as T:
raise OSError(str(T)) from T
else:
self._cursor = nc
def write(self, data: Iterable[int]) -> int:
out = self._data
end = len(out)
beginning = self._cursor
limit = self._size_limit
if limit is None and beginning == end:
out[end:] = data
self._cursor = end = len(out)
return end - beginning
try:
size = len(data)
except Exception:
it = iter(data)
for cursor, b in enumerate(it, beginning):
out[cursor] = b
if cursor >= end - 1:
break
else:
cursor += 1
self._cursor = cursor
return cursor - beginning
if limit is None:
out[end:] = it
else:
out[end:limit] = itertools.islice(it, 0, limit - end)
try:
b = next(it)
except StopIteration:
self._cursor = limit
return limit - beginning
else:
rest = bytearray((b,))
rest[1:] = it
raise EOF(rest)
else:
if limit and size + beginning > limit:
raise EOF(data)
self._cursor += size
try:
self._data[beginning:self._cursor] = data
except Exception as T:
self._cursor = beginning
raise OSError(str(T)) from T
return size
self._cursor = end = len(out)
return end - beginning
def __getitem__(self, slice):
result = self._data[slice]
if self.read_as_bytes and not isinstance(result, bytes):
result = bytes(result)
return result
def replay(self, offset: int, length: int):
if offset not in range(self._cursor + 1):
raise ValueError(F'The supplied delta {offset} is not in the valid range [0,{self._cursor}].')
rep, r = divmod(length, offset)
offset = -offset - len(self) + self._cursor
replay = self._data[offset:offset + r]
if rep > 0:
replay = bytes(self._data[offset:self._cursor]) * rep + replay
self.write(replay)
class MemoryFile(MemoryFileMethods[T], io.BytesIO):
pass
class order(str, enum.Enum):
big = '>'
little = '<'
class StructReader(MemoryFile[T]):
"""
An extension of a `refinery.lib.structures.MemoryFile` which provides methods to read
structured data.
"""
class Unaligned(RuntimeError):
pass
def __init__(self, data: T, bigendian: bool = False):
super().__init__(data)
self._bbits = 0
self._nbits = 0
self.bigendian = bigendian
def __enter__(self) -> StructReader:
return super().__enter__()
@property
@contextlib.contextmanager
def be(self):
self.bigendian = True
try:
yield self
finally:
self.bigendian = False
@property
def byteorder_format(self) -> str:
return '>' if self.bigendian else '<'
@property
def byteorder_name(self) -> str:
return 'big' if self.bigendian else 'little'
def seek(self, offset, whence=io.SEEK_SET) -> int:
self._bbits = 0
self._nbits = 0
return super().seek(offset, whence)
def read_exactly(self, size: Optional[int] = None, peek: bool = False) -> T:
"""
Read bytes from the underlying stream. Raises a `RuntimeError` when the stream is not currently
byte-aligned, i.e. when `refinery.lib.structures.StructReader.byte_aligned` is `False`. Raises
an exception of type `refinery.lib.structures.EOF` when fewer data is available in the stream than
requested via the `size` parameter. The remaining data can be extracted from the exception.
Use `refinery.lib.structures.StructReader.read_bytes` to read bytes from the stream when it is
not byte-aligned.
"""
if not self.byte_aligned:
raise StructReader.Unaligned('buffer is not byte-aligned')
data = self.read1(size, peek)
if size and len(data) < size:
raise EOF(data)
return data
@property
def byte_aligned(self) -> bool:
"""
This property is `True` if and only if there are currently no bits still waiting in the internal
bit buffer.
"""
return not self._nbits
def byte_align(self, blocksize: int = 1) -> Tuple[int, int]:
"""
This method clears the internal bit buffer and moves the cursor to the next byte. It returns a
tuple containing the size and contents of the bit buffer.
"""
nbits = self._nbits
bbits = self._bbits
self._nbits = 0
self._bbits = 0
mod = self._cursor % blocksize
if mod:
self.seekrel(blocksize - mod)
return nbits, bbits
@property
def remaining_bits(self) -> int:
return 8 * self.remaining_bytes + self._nbits
def read_integer(self, length: Optional[int] = None, peek: bool = False) -> int:
"""
Read `length` many bits from the underlying stream as an integer.
"""
if length is None:
length = self.remaining_bits
if length < 0:
raise ValueError
if length < self._nbits:
new_count = self._nbits - length
if self.bigendian:
result = self._bbits >> new_count
if not peek:
self._bbits ^= result << new_count
else:
result = self._bbits & 2 ** length - 1
if not peek:
self._bbits >>= length
if not peek:
self._nbits = new_count
return result
nbits, bbits = self._nbits, self._bbits
number_of_missing_bits = length - nbits
bytecount, rest = divmod(number_of_missing_bits, 8)
if rest:
bytecount += 1
rest = 8 - rest
bb = self.read1(bytecount, True)
if len(bb) != bytecount:
raise EOFError
if not peek:
self.seekrel(bytecount)
if bytecount == 1:
result, = bb
else:
result = int.from_bytes(bb, self.byteorder_name)
if not nbits and not rest:
return result
if self.bigendian:
rbmask = 2 ** rest - 1 # noqa
excess = result & rbmask # noqa
result >>= rest # noqa
result ^= bbits << number_of_missing_bits # noqa
else:
excess = result >> number_of_missing_bits # noqa
result ^= excess << number_of_missing_bits # noqa
result <<= nbits # noqa
result |= bbits # noqa
assert excess.bit_length() <= rest
if not peek:
self._nbits = rest
self._bbits = excess
return result
def read_bytes(self, size: int, peek: bool = False) -> bytes:
"""
The method reads `size` many bytes from the underlying stream starting at the current bit.
"""
if self.byte_aligned:
data = self.read_exactly(size, peek)
if not isinstance(data, bytes):
data = bytes(data)
return data
else:
return self.read_integer(size * 8, peek).to_bytes(size, self.byteorder_name)
def read_bit(self) -> int:
"""
This function is a shortcut for calling `refinery.lib.structures.StructReader.read_integer` with
an argument of `1`, i.e. this reads the next bit from the stream. The bits of any byte in the stream
are read from least significant to most significant.
"""
return self.read_integer(1)
def read_bits(self, nbits: int) -> Iterable[int]:
"""
This method returns the bits of `refinery.lib.structures.StructReader.read_integer` as an iterable
from least to most significant.
"""
chunk = self.read_integer(nbits)
for k in range(nbits - 1, -1, -1):
yield chunk >> k & 1
def read_flags(self, nbits: int, reverse=False) -> Iterable[bool]:
"""
Identical to `refinery.lib.structures.StructReader.read_bits` with every bit value cast to a boolean.
"""
bits = list(self.read_bits(nbits))
if reverse:
bits.reverse()
for bit in bits:
yield bool(bit)
def read_struct(self, spec: str, unwrap=False, peek=False) -> Union[List[UnpackType], UnpackType]:
"""
Read structured data from the stream in any format supported by the `struct` module. The `format`
argument can be used to override the current byte ordering. If the `unwrap` parameter is `True`, a
single unpacked value will be returned as a scalar, not as a tuple with one element.
"""
if not spec:
raise ValueError('no format specified')
byteorder = spec[:1]
if byteorder in '<!=@>':
spec = spec[1:]
else:
byteorder = self.byteorder_format
data = []
current_cursor = self.tell()
# reserved struct characters: xcbB?hHiIlLqQnNefdspP
for k, part in enumerate(re.split('(\\d*[auwgE])', spec)):
if k % 2 == 1:
count = 1 if len(part) == 1 else int(part[:~0])
part = part[~0]
for _ in range(count):
if part == 'a':
data.append(self.read_c_string())
elif part == 'g':
data.append(self.read_guid())
elif part == 'u':
data.append(self.read_w_string())
elif part == 'w':
data.append(self.read_w_string().decode('utf-16le'))
elif part == 'E':
data.append(self.read_7bit_encoded_int())
continue
else:
part = F'{byteorder}{part}'
data.extend(struct.unpack(part, self.read_bytes(struct.calcsize(part))))
if unwrap and len(data) == 1:
return data[0]
if peek:
self.seekset(current_cursor)
return data
def read_nibble(self, peek: bool = False) -> int:
"""
Calls `refinery.lib.structures.StructReader.read_integer` with an argument of `4`.
"""
return self.read_integer(4, peek)
def u8(self, peek: bool = False) -> int: return self.read_integer(8, peek)
def i8(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8)
def u16(self, peek: bool = False) -> int: return self.read_integer(16, peek)
def u32(self, peek: bool = False) -> int: return self.read_integer(32, peek)
def u64(self, peek: bool = False) -> int: return self.read_integer(64, peek)
def i16(self, peek: bool = False) -> int: return signed(self.read_integer(16, peek), 16)
def i32(self, peek: bool = False) -> int: return signed(self.read_integer(32, peek), 32)
def i64(self, peek: bool = False) -> int: return signed(self.read_integer(64, peek), 64)
def f32(self, peek: bool = False) -> float: return self.read_struct('f', unwrap=True, peek=peek)
def f64(self, peek: bool = False) -> float: return self.read_struct('d', unwrap=True, peek=peek)
def read_byte(self, peek: bool = False) -> int: return self.read_integer(8, peek)
def read_char(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8)
def read_terminated_array(self, terminator: bytes, alignment: int = 1) -> bytearray:
pos = self.tell()
buf = self.getbuffer()
try:
end = pos - 1
while True:
end = buf.find(terminator, end + 1)
if end < 0 or not (end - pos) % alignment:
break
except AttributeError:
result = bytearray()
while not self.eof:
result.extend(self.read_bytes(alignment))
if result.endswith(terminator):
return result[:-len(terminator)]
self.seek(pos)
raise EOF
else:
data = self.read_exactly(end - pos)
self.seekrel(len(terminator))
return bytearray(data)
def read_guid(self) -> str:
_mode = self.bigendian
self.bigendian = False
try:
a = self.u32()
b = self.u16()
c = self.u16()
d = self.read(2).hex().upper()
e = self.read(6).hex().upper()
except Exception:
raise
else:
return F'{a:08X}-{b:04X}-{c:04X}-{d}-{e}'
finally:
self.bigendian = _mode
def read_c_string(self, encoding=None) -> Union[str, bytearray]:
data = self.read_terminated_array(B'\0')
if encoding is not None:
data = data.decode(encoding)
return data
def read_w_string(self, encoding=None) -> Union[str, bytearray]:
data = self.read_terminated_array(B'\0\0', 2)
if encoding is not None:
data = data.decode(encoding)
return data
def read_length_prefixed_ascii(self, prefix_size: int = 32):
return self.read_length_prefixed(prefix_size, 'latin1')
def read_length_prefixed_utf8(self, prefix_size: int = 32):
return self.read_length_prefixed(prefix_size, 'utf8')
def read_length_prefixed_utf16(self, prefix_size: int = 32, bytecount: bool = False):
block_size = 1 if bytecount else 2
return self.read_length_prefixed(prefix_size, 'utf-16le', block_size)
def read_length_prefixed(self, prefix_size: int = 32, encoding: Optional[str] = None, block_size: int = 1) -> Union[T, str]:
prefix = self.read_integer(prefix_size) * block_size
data = self.read(prefix)
if encoding is not None:
data = data.decode(encoding)
return data
def read_7bit_encoded_int(self, max_bits: int = 0) -> int:
value = 0
for shift in itertools.count(0, step=7):
b = self.read_byte()
value |= (b & 0x7F) << shift
if not b & 0x80:
return value
if shift > max_bits > 0:
raise RuntimeError('Maximum bits were exceeded by encoded integer.')
class StructMeta(type):
"""
A metaclass to facilitate the behavior outlined for `refinery.lib.structures.Struct`.
"""
def __new__(mcls, name, bases, nmspc, parser=StructReader):
return type.__new__(mcls, name, bases, nmspc)
def __init__(cls, name, bases, nmspc, parser=StructReader):
super(StructMeta, cls).__init__(name, bases, nmspc)
original__init__ = cls.__init__
@functools.wraps(original__init__)
def wrapped__init__(self: Struct, reader, *args, **kwargs):
if not isinstance(reader, parser):
if issubclass(parser, reader.__class__):
raise ValueError(
F'A reader of type {reader.__class__.__name__} was passed to {cls.__name__}, '
F'but a {parser.__name__} is required.')
reader = parser(reader)
start = reader.tell()
view = memoryview(reader.getbuffer())
original__init__(self, reader, *args, **kwargs)
self._data = view[start:reader.tell()]
cls.__init__ = wrapped__init__
class Struct(metaclass=StructMeta):
"""
A class to parse structured data. A `refinery.lib.structures.Struct` class can be instantiated
as follows:
foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument `reader`. If
the object `data` is already a `refinery.lib.structures.StructReader`, then it will be passed
as `reader`. Otherwise, the argument will be wrapped in a `refinery.lib.structures.StructReader`.
Additional arguments to the struct are passed through.
"""
_data: Union[memoryview, bytearray]
def __len__(self):
return len(self._data)
def __bytes__(self):
return bytes(self._data)
def get_data(self, decouple=False):
if decouple and isinstance(self._data, memoryview):
self._data = bytearray(self._data)
return self._data
def __init__(self, reader: StructReader, *args, **kwargs):
pass
AttrType = TypeVar('AttrType')
class PerInstanceAttribute(Generic[AttrType]):
def resolve(self, parent, value: Any) -> AttrType:
return value
def __init__(self):
self.__set: Dict[int, Any] = {}
self.__get: Dict[int, AttrType] = {}
def __set__(self, parent: Any, value: Any) -> None:
pid = id(parent)
if pid not in self.__set:
def cleanup(self, pid):
self.__set.pop(pid, None)
self.__get.pop(pid, None)
self.__set[pid] = value
weakref.finalize(parent, cleanup, self, id(parent))
def __get__(self, parent, tp=None) -> AttrType:
pid = id(parent)
if pid not in self.__get:
try:
seed = self.__set[pid]
except KeyError as K:
raise AttributeError from K
self.__get[pid] = self.resolve(parent, seed)
return self.__get[pid]
Functions
def signed(k, bitsize)
-
If
k
is an integer of the given bit size, cast it to a signed one.Expand source code Browse git
def signed(k: int, bitsize: int): """ If `k` is an integer of the given bit size, cast it to a signed one. """ M = 1 << bitsize k = k & (M - 1) return k - M if k >> (bitsize - 1) else k
Classes
class EOF (rest=b'')
-
While reading from a
MemoryFile
, less bytes were available than requested. The exception contains the data from the incomplete read.Expand source code Browse git
class EOF(EOFError): """ While reading from a `refinery.lib.structures.MemoryFile`, less bytes were available than requested. The exception contains the data from the incomplete read. """ def __init__(self, rest: ByteString = B''): super().__init__('Unexpected end of buffer.') self.rest = rest def __bytes__(self): return bytes(self.rest)
Ancestors
- builtins.EOFError
- builtins.Exception
- builtins.BaseException
class StreamDetour (stream, offset=None, whence=0)
-
A stream detour is used as a context manager to temporarily read from a different location in the stream and then return to the original offset when the context ends.
Expand source code Browse git
class StreamDetour: """ A stream detour is used as a context manager to temporarily read from a different location in the stream and then return to the original offset when the context ends. """ def __init__(self, stream: io.IOBase, offset: Optional[int] = None, whence: int = io.SEEK_SET): self.stream = stream self.offset = offset self.whence = whence def __enter__(self): self.cursor = self.stream.tell() if self.offset is not None: self.stream.seek(self.offset, self.whence) return self def __exit__(self, *args): self.stream.seek(self.cursor, io.SEEK_SET)
class MemoryFileMethods (data=None, read_as_bytes=False, fileno=None, size_limit=None)
-
A thin wrapper around (potentially mutable) byte sequences which gives it the features of a file-like object.
Expand source code Browse git
class MemoryFileMethods(Generic[T]): """ A thin wrapper around (potentially mutable) byte sequences which gives it the features of a file-like object. """ closed: bool read_as_bytes: bool _data: T _cursor: int _closed: bool class SEEK(int, enum.Enum): CUR = io.SEEK_CUR END = io.SEEK_END SET = io.SEEK_SET def __init__( self, data: Optional[T] = None, read_as_bytes=False, fileno: Optional[int] = None, size_limit: Optional[int] = None, ) -> None: if data is None: data = bytearray() elif size_limit is not None and len(data) > size_limit: raise ValueError('Initial data exceeds size limit') self._data = data self._cursor = 0 self._closed = False self._fileno = fileno self.read_as_bytes = read_as_bytes self._size_limit = size_limit def close(self) -> None: self._closed = True @property def closed(self) -> bool: return self._closed def __enter__(self) -> MemoryFile: return self def __exit__(self, ex_type, ex_value, trace) -> bool: return False def flush(self) -> None: pass def isatty(self) -> bool: return False def __iter__(self): return self def __len__(self): return len(self._data) def __next__(self): line = self.readline() if not line: raise StopIteration return line def fileno(self) -> int: if self._fileno is None: raise OSError return self._fileno def readable(self) -> bool: return not self._closed def seekable(self) -> bool: return not self._closed @property def eof(self) -> bool: return self._closed or self._cursor >= len(self._data) @property def remaining_bytes(self) -> int: return len(self._data) - self.tell() def detour(self, offset: Optional[int] = None, whence: int = io.SEEK_SET): return StreamDetour(self, offset, whence=whence) def writable(self) -> bool: if self._closed: return False if isinstance(self._data, memoryview): return not self._data.readonly return isinstance(self._data, bytearray) def read_as(self, cast: Type[C], size: int = -1, peek: bool = False) -> C: out = self.read(size, peek) if not isinstance(out, cast): out = cast(out) return out def read(self, size: int = -1, peek: bool = False) -> T: beginning = self._cursor if size is None or size < 0: end = len(self._data) else: end = min(self._cursor + size, len(self._data)) result = self._data[beginning:end] if self.read_as_bytes and not isinstance(result, bytes): result = bytes(result) if not peek: self._cursor = end return result def peek(self, size: int = -1) -> memoryview: cursor = self._cursor mv = memoryview(self._data) if size is None or size < 0: return mv[cursor:] return mv[cursor:cursor + size] def read1(self, size: int = -1, peek: bool = False) -> T: return self.read(size, peek) def _find_linebreak(self, beginning: int, end: int) -> int: if not isinstance(self._data, memoryview): return self._data.find(B'\n', beginning, end) for k in range(beginning, end): if self._data[k] == 0xA: return k return -1 def readline(self, size: int = -1) -> T: beginning, end = self._cursor, len(self._data) if size is not None and size >= 0: end = beginning + size p = self._find_linebreak(beginning, end) self._cursor = end if p < 0 else p + 1 result = self._data[beginning:self._cursor] if self.read_as_bytes and not isinstance(result, bytes): result = bytes(result) return result def readlines(self, hint: int = -1) -> Iterable[T]: if hint is None or hint < 0: yield from self else: total = 0 while total < hint: line = next(self) total += len(line) yield line def readinto1(self, b) -> int: data = self.read(len(b)) size = len(data) b[:size] = data return size def readinto(self, b) -> int: return self.readinto1(b) def tell(self) -> int: return self._cursor def seekrel(self, offset: int) -> int: return self.seek(offset, io.SEEK_CUR) def seekset(self, offset: int) -> int: if offset < 0: return self.seek(offset, io.SEEK_END) else: return self.seek(offset, io.SEEK_SET) def getbuffer(self) -> T: return self._data def getvalue(self) -> T: return self._data def seek(self, offset: int, whence=io.SEEK_SET) -> int: if whence == io.SEEK_SET: if offset < 0: raise ValueError('no negative offsets allowed for SEEK_SET.') self._cursor = offset elif whence == io.SEEK_CUR: self._cursor += offset elif whence == io.SEEK_END: self._cursor = len(self._data) + offset self._cursor = max(self._cursor, 0) self._cursor = min(self._cursor, len(self._data)) return self._cursor def writelines(self, lines: Iterable[ByteString]) -> None: for line in lines: self.write(line) def truncate(self, size=None) -> None: if size is not None: if not (0 <= size <= len(self._data)): raise ValueError('invalid size value') self._cursor = size del self._data[self._cursor:] def write_byte(self, byte: int) -> None: limit = self._size_limit cc = self._cursor nc = cc + 1 if limit and nc > limit: raise EOF(bytes((byte,))) try: if cc < len(self._data): self._data[cc] = byte else: self._data.append(byte) except Exception as T: raise OSError(str(T)) from T else: self._cursor = nc def write(self, data: Iterable[int]) -> int: out = self._data end = len(out) beginning = self._cursor limit = self._size_limit if limit is None and beginning == end: out[end:] = data self._cursor = end = len(out) return end - beginning try: size = len(data) except Exception: it = iter(data) for cursor, b in enumerate(it, beginning): out[cursor] = b if cursor >= end - 1: break else: cursor += 1 self._cursor = cursor return cursor - beginning if limit is None: out[end:] = it else: out[end:limit] = itertools.islice(it, 0, limit - end) try: b = next(it) except StopIteration: self._cursor = limit return limit - beginning else: rest = bytearray((b,)) rest[1:] = it raise EOF(rest) else: if limit and size + beginning > limit: raise EOF(data) self._cursor += size try: self._data[beginning:self._cursor] = data except Exception as T: self._cursor = beginning raise OSError(str(T)) from T return size self._cursor = end = len(out) return end - beginning def __getitem__(self, slice): result = self._data[slice] if self.read_as_bytes and not isinstance(result, bytes): result = bytes(result) return result def replay(self, offset: int, length: int): if offset not in range(self._cursor + 1): raise ValueError(F'The supplied delta {offset} is not in the valid range [0,{self._cursor}].') rep, r = divmod(length, offset) offset = -offset - len(self) + self._cursor replay = self._data[offset:offset + r] if rep > 0: replay = bytes(self._data[offset:self._cursor]) * rep + replay self.write(replay)
Ancestors
- typing.Generic
Subclasses
Class variables
var read_as_bytes
var SEEK
-
An enumeration.
Instance variables
var closed
-
Expand source code Browse git
@property def closed(self) -> bool: return self._closed
var eof
-
Expand source code Browse git
@property def eof(self) -> bool: return self._closed or self._cursor >= len(self._data)
var remaining_bytes
-
Expand source code Browse git
@property def remaining_bytes(self) -> int: return len(self._data) - self.tell()
Methods
def close(self)
-
Expand source code Browse git
def close(self) -> None: self._closed = True
def flush(self)
-
Expand source code Browse git
def flush(self) -> None: pass
def isatty(self)
-
Expand source code Browse git
def isatty(self) -> bool: return False
def fileno(self)
-
Expand source code Browse git
def fileno(self) -> int: if self._fileno is None: raise OSError return self._fileno
def readable(self)
-
Expand source code Browse git
def readable(self) -> bool: return not self._closed
def seekable(self)
-
Expand source code Browse git
def seekable(self) -> bool: return not self._closed
def detour(self, offset=None, whence=0)
-
Expand source code Browse git
def detour(self, offset: Optional[int] = None, whence: int = io.SEEK_SET): return StreamDetour(self, offset, whence=whence)
def writable(self)
-
Expand source code Browse git
def writable(self) -> bool: if self._closed: return False if isinstance(self._data, memoryview): return not self._data.readonly return isinstance(self._data, bytearray)
def read_as(self, cast, size=-1, peek=False)
-
Expand source code Browse git
def read_as(self, cast: Type[C], size: int = -1, peek: bool = False) -> C: out = self.read(size, peek) if not isinstance(out, cast): out = cast(out) return out
def read(self, size=-1, peek=False)
-
Expand source code Browse git
def read(self, size: int = -1, peek: bool = False) -> T: beginning = self._cursor if size is None or size < 0: end = len(self._data) else: end = min(self._cursor + size, len(self._data)) result = self._data[beginning:end] if self.read_as_bytes and not isinstance(result, bytes): result = bytes(result) if not peek: self._cursor = end return result
def peek(self, size=-1)
-
Expand source code Browse git
def peek(self, size: int = -1) -> memoryview: cursor = self._cursor mv = memoryview(self._data) if size is None or size < 0: return mv[cursor:] return mv[cursor:cursor + size]
def read1(self, size=-1, peek=False)
-
Expand source code Browse git
def read1(self, size: int = -1, peek: bool = False) -> T: return self.read(size, peek)
def readline(self, size=-1)
-
Expand source code Browse git
def readline(self, size: int = -1) -> T: beginning, end = self._cursor, len(self._data) if size is not None and size >= 0: end = beginning + size p = self._find_linebreak(beginning, end) self._cursor = end if p < 0 else p + 1 result = self._data[beginning:self._cursor] if self.read_as_bytes and not isinstance(result, bytes): result = bytes(result) return result
def readlines(self, hint=-1)
-
Expand source code Browse git
def readlines(self, hint: int = -1) -> Iterable[T]: if hint is None or hint < 0: yield from self else: total = 0 while total < hint: line = next(self) total += len(line) yield line
def readinto1(self, b)
-
Expand source code Browse git
def readinto1(self, b) -> int: data = self.read(len(b)) size = len(data) b[:size] = data return size
def readinto(self, b)
-
Expand source code Browse git
def readinto(self, b) -> int: return self.readinto1(b)
def tell(self)
-
Expand source code Browse git
def tell(self) -> int: return self._cursor
def seekrel(self, offset)
-
Expand source code Browse git
def seekrel(self, offset: int) -> int: return self.seek(offset, io.SEEK_CUR)
def seekset(self, offset)
-
Expand source code Browse git
def seekset(self, offset: int) -> int: if offset < 0: return self.seek(offset, io.SEEK_END) else: return self.seek(offset, io.SEEK_SET)
def getbuffer(self)
-
Expand source code Browse git
def getbuffer(self) -> T: return self._data
def getvalue(self)
-
Expand source code Browse git
def getvalue(self) -> T: return self._data
def seek(self, offset, whence=0)
-
Expand source code Browse git
def seek(self, offset: int, whence=io.SEEK_SET) -> int: if whence == io.SEEK_SET: if offset < 0: raise ValueError('no negative offsets allowed for SEEK_SET.') self._cursor = offset elif whence == io.SEEK_CUR: self._cursor += offset elif whence == io.SEEK_END: self._cursor = len(self._data) + offset self._cursor = max(self._cursor, 0) self._cursor = min(self._cursor, len(self._data)) return self._cursor
def writelines(self, lines)
-
Expand source code Browse git
def writelines(self, lines: Iterable[ByteString]) -> None: for line in lines: self.write(line)
def truncate(self, size=None)
-
Expand source code Browse git
def truncate(self, size=None) -> None: if size is not None: if not (0 <= size <= len(self._data)): raise ValueError('invalid size value') self._cursor = size del self._data[self._cursor:]
def write_byte(self, byte)
-
Expand source code Browse git
def write_byte(self, byte: int) -> None: limit = self._size_limit cc = self._cursor nc = cc + 1 if limit and nc > limit: raise EOF(bytes((byte,))) try: if cc < len(self._data): self._data[cc] = byte else: self._data.append(byte) except Exception as T: raise OSError(str(T)) from T else: self._cursor = nc
def write(self, data)
-
Expand source code Browse git
def write(self, data: Iterable[int]) -> int: out = self._data end = len(out) beginning = self._cursor limit = self._size_limit if limit is None and beginning == end: out[end:] = data self._cursor = end = len(out) return end - beginning try: size = len(data) except Exception: it = iter(data) for cursor, b in enumerate(it, beginning): out[cursor] = b if cursor >= end - 1: break else: cursor += 1 self._cursor = cursor return cursor - beginning if limit is None: out[end:] = it else: out[end:limit] = itertools.islice(it, 0, limit - end) try: b = next(it) except StopIteration: self._cursor = limit return limit - beginning else: rest = bytearray((b,)) rest[1:] = it raise EOF(rest) else: if limit and size + beginning > limit: raise EOF(data) self._cursor += size try: self._data[beginning:self._cursor] = data except Exception as T: self._cursor = beginning raise OSError(str(T)) from T return size self._cursor = end = len(out) return end - beginning
def replay(self, offset, length)
-
Expand source code Browse git
def replay(self, offset: int, length: int): if offset not in range(self._cursor + 1): raise ValueError(F'The supplied delta {offset} is not in the valid range [0,{self._cursor}].') rep, r = divmod(length, offset) offset = -offset - len(self) + self._cursor replay = self._data[offset:offset + r] if rep > 0: replay = bytes(self._data[offset:self._cursor]) * rep + replay self.write(replay)
class MemoryFile (data=None, read_as_bytes=False, fileno=None, size_limit=None)
-
A thin wrapper around (potentially mutable) byte sequences which gives it the features of a file-like object.
Expand source code Browse git
class MemoryFile(MemoryFileMethods[T], io.BytesIO): pass
Ancestors
- MemoryFileMethods
- typing.Generic
- _io.BytesIO
- _io._BufferedIOBase
- _io._IOBase
Subclasses
Class variables
var read_as_bytes
Instance variables
var closed
-
Expand source code Browse git
@property def closed(self) -> bool: return self._closed
Inherited members
class order (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code Browse git
class order(str, enum.Enum): big = '>' little = '<'
Ancestors
- builtins.str
- enum.Enum
Class variables
var big
var little
class StructReader (data, bigendian=False)
-
An extension of a
MemoryFile
which provides methods to read structured data.Expand source code Browse git
class StructReader(MemoryFile[T]): """ An extension of a `refinery.lib.structures.MemoryFile` which provides methods to read structured data. """ class Unaligned(RuntimeError): pass def __init__(self, data: T, bigendian: bool = False): super().__init__(data) self._bbits = 0 self._nbits = 0 self.bigendian = bigendian def __enter__(self) -> StructReader: return super().__enter__() @property @contextlib.contextmanager def be(self): self.bigendian = True try: yield self finally: self.bigendian = False @property def byteorder_format(self) -> str: return '>' if self.bigendian else '<' @property def byteorder_name(self) -> str: return 'big' if self.bigendian else 'little' def seek(self, offset, whence=io.SEEK_SET) -> int: self._bbits = 0 self._nbits = 0 return super().seek(offset, whence) def read_exactly(self, size: Optional[int] = None, peek: bool = False) -> T: """ Read bytes from the underlying stream. Raises a `RuntimeError` when the stream is not currently byte-aligned, i.e. when `refinery.lib.structures.StructReader.byte_aligned` is `False`. Raises an exception of type `refinery.lib.structures.EOF` when fewer data is available in the stream than requested via the `size` parameter. The remaining data can be extracted from the exception. Use `refinery.lib.structures.StructReader.read_bytes` to read bytes from the stream when it is not byte-aligned. """ if not self.byte_aligned: raise StructReader.Unaligned('buffer is not byte-aligned') data = self.read1(size, peek) if size and len(data) < size: raise EOF(data) return data @property def byte_aligned(self) -> bool: """ This property is `True` if and only if there are currently no bits still waiting in the internal bit buffer. """ return not self._nbits def byte_align(self, blocksize: int = 1) -> Tuple[int, int]: """ This method clears the internal bit buffer and moves the cursor to the next byte. It returns a tuple containing the size and contents of the bit buffer. """ nbits = self._nbits bbits = self._bbits self._nbits = 0 self._bbits = 0 mod = self._cursor % blocksize if mod: self.seekrel(blocksize - mod) return nbits, bbits @property def remaining_bits(self) -> int: return 8 * self.remaining_bytes + self._nbits def read_integer(self, length: Optional[int] = None, peek: bool = False) -> int: """ Read `length` many bits from the underlying stream as an integer. """ if length is None: length = self.remaining_bits if length < 0: raise ValueError if length < self._nbits: new_count = self._nbits - length if self.bigendian: result = self._bbits >> new_count if not peek: self._bbits ^= result << new_count else: result = self._bbits & 2 ** length - 1 if not peek: self._bbits >>= length if not peek: self._nbits = new_count return result nbits, bbits = self._nbits, self._bbits number_of_missing_bits = length - nbits bytecount, rest = divmod(number_of_missing_bits, 8) if rest: bytecount += 1 rest = 8 - rest bb = self.read1(bytecount, True) if len(bb) != bytecount: raise EOFError if not peek: self.seekrel(bytecount) if bytecount == 1: result, = bb else: result = int.from_bytes(bb, self.byteorder_name) if not nbits and not rest: return result if self.bigendian: rbmask = 2 ** rest - 1 # noqa excess = result & rbmask # noqa result >>= rest # noqa result ^= bbits << number_of_missing_bits # noqa else: excess = result >> number_of_missing_bits # noqa result ^= excess << number_of_missing_bits # noqa result <<= nbits # noqa result |= bbits # noqa assert excess.bit_length() <= rest if not peek: self._nbits = rest self._bbits = excess return result def read_bytes(self, size: int, peek: bool = False) -> bytes: """ The method reads `size` many bytes from the underlying stream starting at the current bit. """ if self.byte_aligned: data = self.read_exactly(size, peek) if not isinstance(data, bytes): data = bytes(data) return data else: return self.read_integer(size * 8, peek).to_bytes(size, self.byteorder_name) def read_bit(self) -> int: """ This function is a shortcut for calling `refinery.lib.structures.StructReader.read_integer` with an argument of `1`, i.e. this reads the next bit from the stream. The bits of any byte in the stream are read from least significant to most significant. """ return self.read_integer(1) def read_bits(self, nbits: int) -> Iterable[int]: """ This method returns the bits of `refinery.lib.structures.StructReader.read_integer` as an iterable from least to most significant. """ chunk = self.read_integer(nbits) for k in range(nbits - 1, -1, -1): yield chunk >> k & 1 def read_flags(self, nbits: int, reverse=False) -> Iterable[bool]: """ Identical to `refinery.lib.structures.StructReader.read_bits` with every bit value cast to a boolean. """ bits = list(self.read_bits(nbits)) if reverse: bits.reverse() for bit in bits: yield bool(bit) def read_struct(self, spec: str, unwrap=False, peek=False) -> Union[List[UnpackType], UnpackType]: """ Read structured data from the stream in any format supported by the `struct` module. The `format` argument can be used to override the current byte ordering. If the `unwrap` parameter is `True`, a single unpacked value will be returned as a scalar, not as a tuple with one element. """ if not spec: raise ValueError('no format specified') byteorder = spec[:1] if byteorder in '<!=@>': spec = spec[1:] else: byteorder = self.byteorder_format data = [] current_cursor = self.tell() # reserved struct characters: xcbB?hHiIlLqQnNefdspP for k, part in enumerate(re.split('(\\d*[auwgE])', spec)): if k % 2 == 1: count = 1 if len(part) == 1 else int(part[:~0]) part = part[~0] for _ in range(count): if part == 'a': data.append(self.read_c_string()) elif part == 'g': data.append(self.read_guid()) elif part == 'u': data.append(self.read_w_string()) elif part == 'w': data.append(self.read_w_string().decode('utf-16le')) elif part == 'E': data.append(self.read_7bit_encoded_int()) continue else: part = F'{byteorder}{part}' data.extend(struct.unpack(part, self.read_bytes(struct.calcsize(part)))) if unwrap and len(data) == 1: return data[0] if peek: self.seekset(current_cursor) return data def read_nibble(self, peek: bool = False) -> int: """ Calls `refinery.lib.structures.StructReader.read_integer` with an argument of `4`. """ return self.read_integer(4, peek) def u8(self, peek: bool = False) -> int: return self.read_integer(8, peek) def i8(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8) def u16(self, peek: bool = False) -> int: return self.read_integer(16, peek) def u32(self, peek: bool = False) -> int: return self.read_integer(32, peek) def u64(self, peek: bool = False) -> int: return self.read_integer(64, peek) def i16(self, peek: bool = False) -> int: return signed(self.read_integer(16, peek), 16) def i32(self, peek: bool = False) -> int: return signed(self.read_integer(32, peek), 32) def i64(self, peek: bool = False) -> int: return signed(self.read_integer(64, peek), 64) def f32(self, peek: bool = False) -> float: return self.read_struct('f', unwrap=True, peek=peek) def f64(self, peek: bool = False) -> float: return self.read_struct('d', unwrap=True, peek=peek) def read_byte(self, peek: bool = False) -> int: return self.read_integer(8, peek) def read_char(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8) def read_terminated_array(self, terminator: bytes, alignment: int = 1) -> bytearray: pos = self.tell() buf = self.getbuffer() try: end = pos - 1 while True: end = buf.find(terminator, end + 1) if end < 0 or not (end - pos) % alignment: break except AttributeError: result = bytearray() while not self.eof: result.extend(self.read_bytes(alignment)) if result.endswith(terminator): return result[:-len(terminator)] self.seek(pos) raise EOF else: data = self.read_exactly(end - pos) self.seekrel(len(terminator)) return bytearray(data) def read_guid(self) -> str: _mode = self.bigendian self.bigendian = False try: a = self.u32() b = self.u16() c = self.u16() d = self.read(2).hex().upper() e = self.read(6).hex().upper() except Exception: raise else: return F'{a:08X}-{b:04X}-{c:04X}-{d}-{e}' finally: self.bigendian = _mode def read_c_string(self, encoding=None) -> Union[str, bytearray]: data = self.read_terminated_array(B'\0') if encoding is not None: data = data.decode(encoding) return data def read_w_string(self, encoding=None) -> Union[str, bytearray]: data = self.read_terminated_array(B'\0\0', 2) if encoding is not None: data = data.decode(encoding) return data def read_length_prefixed_ascii(self, prefix_size: int = 32): return self.read_length_prefixed(prefix_size, 'latin1') def read_length_prefixed_utf8(self, prefix_size: int = 32): return self.read_length_prefixed(prefix_size, 'utf8') def read_length_prefixed_utf16(self, prefix_size: int = 32, bytecount: bool = False): block_size = 1 if bytecount else 2 return self.read_length_prefixed(prefix_size, 'utf-16le', block_size) def read_length_prefixed(self, prefix_size: int = 32, encoding: Optional[str] = None, block_size: int = 1) -> Union[T, str]: prefix = self.read_integer(prefix_size) * block_size data = self.read(prefix) if encoding is not None: data = data.decode(encoding) return data def read_7bit_encoded_int(self, max_bits: int = 0) -> int: value = 0 for shift in itertools.count(0, step=7): b = self.read_byte() value |= (b & 0x7F) << shift if not b & 0x80: return value if shift > max_bits > 0: raise RuntimeError('Maximum bits were exceeded by encoded integer.')
Ancestors
- MemoryFile
- MemoryFileMethods
- typing.Generic
- _io.BytesIO
- _io._BufferedIOBase
- _io._IOBase
Subclasses
Class variables
var read_as_bytes
var Unaligned
-
Unspecified run-time error.
Instance variables
var closed
-
Expand source code Browse git
@property def closed(self) -> bool: return self._closed
var be
-
Expand source code Browse git
@property @contextlib.contextmanager def be(self): self.bigendian = True try: yield self finally: self.bigendian = False
var byteorder_format
-
Expand source code Browse git
@property def byteorder_format(self) -> str: return '>' if self.bigendian else '<'
var byteorder_name
-
Expand source code Browse git
@property def byteorder_name(self) -> str: return 'big' if self.bigendian else 'little'
var byte_aligned
-
This property is
True
if and only if there are currently no bits still waiting in the internal bit buffer.Expand source code Browse git
@property def byte_aligned(self) -> bool: """ This property is `True` if and only if there are currently no bits still waiting in the internal bit buffer. """ return not self._nbits
var remaining_bits
-
Expand source code Browse git
@property def remaining_bits(self) -> int: return 8 * self.remaining_bytes + self._nbits
Methods
def seek(self, offset, whence=0)
-
Change stream position.
Seek to byte offset pos relative to position indicated by whence: 0 Start of stream (the default). pos should be >= 0; 1 Current position - pos may be negative; 2 End of stream - pos usually negative. Returns the new absolute position.
Expand source code Browse git
def seek(self, offset, whence=io.SEEK_SET) -> int: self._bbits = 0 self._nbits = 0 return super().seek(offset, whence)
def read_exactly(self, size=None, peek=False)
-
Read bytes from the underlying stream. Raises a
RuntimeError
when the stream is not currently byte-aligned, i.e. whenStructReader.byte_aligned
isFalse
. Raises an exception of typeEOF
when fewer data is available in the stream than requested via thesize
parameter. The remaining data can be extracted from the exception. UseStructReader.read_bytes()
to read bytes from the stream when it is not byte-aligned.Expand source code Browse git
def read_exactly(self, size: Optional[int] = None, peek: bool = False) -> T: """ Read bytes from the underlying stream. Raises a `RuntimeError` when the stream is not currently byte-aligned, i.e. when `refinery.lib.structures.StructReader.byte_aligned` is `False`. Raises an exception of type `refinery.lib.structures.EOF` when fewer data is available in the stream than requested via the `size` parameter. The remaining data can be extracted from the exception. Use `refinery.lib.structures.StructReader.read_bytes` to read bytes from the stream when it is not byte-aligned. """ if not self.byte_aligned: raise StructReader.Unaligned('buffer is not byte-aligned') data = self.read1(size, peek) if size and len(data) < size: raise EOF(data) return data
def byte_align(self, blocksize=1)
-
This method clears the internal bit buffer and moves the cursor to the next byte. It returns a tuple containing the size and contents of the bit buffer.
Expand source code Browse git
def byte_align(self, blocksize: int = 1) -> Tuple[int, int]: """ This method clears the internal bit buffer and moves the cursor to the next byte. It returns a tuple containing the size and contents of the bit buffer. """ nbits = self._nbits bbits = self._bbits self._nbits = 0 self._bbits = 0 mod = self._cursor % blocksize if mod: self.seekrel(blocksize - mod) return nbits, bbits
def read_integer(self, length=None, peek=False)
-
Read
length
many bits from the underlying stream as an integer.Expand source code Browse git
def read_integer(self, length: Optional[int] = None, peek: bool = False) -> int: """ Read `length` many bits from the underlying stream as an integer. """ if length is None: length = self.remaining_bits if length < 0: raise ValueError if length < self._nbits: new_count = self._nbits - length if self.bigendian: result = self._bbits >> new_count if not peek: self._bbits ^= result << new_count else: result = self._bbits & 2 ** length - 1 if not peek: self._bbits >>= length if not peek: self._nbits = new_count return result nbits, bbits = self._nbits, self._bbits number_of_missing_bits = length - nbits bytecount, rest = divmod(number_of_missing_bits, 8) if rest: bytecount += 1 rest = 8 - rest bb = self.read1(bytecount, True) if len(bb) != bytecount: raise EOFError if not peek: self.seekrel(bytecount) if bytecount == 1: result, = bb else: result = int.from_bytes(bb, self.byteorder_name) if not nbits and not rest: return result if self.bigendian: rbmask = 2 ** rest - 1 # noqa excess = result & rbmask # noqa result >>= rest # noqa result ^= bbits << number_of_missing_bits # noqa else: excess = result >> number_of_missing_bits # noqa result ^= excess << number_of_missing_bits # noqa result <<= nbits # noqa result |= bbits # noqa assert excess.bit_length() <= rest if not peek: self._nbits = rest self._bbits = excess return result
def read_bytes(self, size, peek=False)
-
The method reads
size
many bytes from the underlying stream starting at the current bit.Expand source code Browse git
def read_bytes(self, size: int, peek: bool = False) -> bytes: """ The method reads `size` many bytes from the underlying stream starting at the current bit. """ if self.byte_aligned: data = self.read_exactly(size, peek) if not isinstance(data, bytes): data = bytes(data) return data else: return self.read_integer(size * 8, peek).to_bytes(size, self.byteorder_name)
def read_bit(self)
-
This function is a shortcut for calling
StructReader.read_integer()
with an argument of1
, i.e. this reads the next bit from the stream. The bits of any byte in the stream are read from least significant to most significant.Expand source code Browse git
def read_bit(self) -> int: """ This function is a shortcut for calling `refinery.lib.structures.StructReader.read_integer` with an argument of `1`, i.e. this reads the next bit from the stream. The bits of any byte in the stream are read from least significant to most significant. """ return self.read_integer(1)
def read_bits(self, nbits)
-
This method returns the bits of
StructReader.read_integer()
as an iterable from least to most significant.Expand source code Browse git
def read_bits(self, nbits: int) -> Iterable[int]: """ This method returns the bits of `refinery.lib.structures.StructReader.read_integer` as an iterable from least to most significant. """ chunk = self.read_integer(nbits) for k in range(nbits - 1, -1, -1): yield chunk >> k & 1
def read_flags(self, nbits, reverse=False)
-
Identical to
StructReader.read_bits()
with every bit value cast to a boolean.Expand source code Browse git
def read_flags(self, nbits: int, reverse=False) -> Iterable[bool]: """ Identical to `refinery.lib.structures.StructReader.read_bits` with every bit value cast to a boolean. """ bits = list(self.read_bits(nbits)) if reverse: bits.reverse() for bit in bits: yield bool(bit)
def read_struct(self, spec, unwrap=False, peek=False)
-
Read structured data from the stream in any format supported by the
struct
module. Theformat
argument can be used to override the current byte ordering. If theunwrap
parameter isTrue
, a single unpacked value will be returned as a scalar, not as a tuple with one element.Expand source code Browse git
def read_struct(self, spec: str, unwrap=False, peek=False) -> Union[List[UnpackType], UnpackType]: """ Read structured data from the stream in any format supported by the `struct` module. The `format` argument can be used to override the current byte ordering. If the `unwrap` parameter is `True`, a single unpacked value will be returned as a scalar, not as a tuple with one element. """ if not spec: raise ValueError('no format specified') byteorder = spec[:1] if byteorder in '<!=@>': spec = spec[1:] else: byteorder = self.byteorder_format data = [] current_cursor = self.tell() # reserved struct characters: xcbB?hHiIlLqQnNefdspP for k, part in enumerate(re.split('(\\d*[auwgE])', spec)): if k % 2 == 1: count = 1 if len(part) == 1 else int(part[:~0]) part = part[~0] for _ in range(count): if part == 'a': data.append(self.read_c_string()) elif part == 'g': data.append(self.read_guid()) elif part == 'u': data.append(self.read_w_string()) elif part == 'w': data.append(self.read_w_string().decode('utf-16le')) elif part == 'E': data.append(self.read_7bit_encoded_int()) continue else: part = F'{byteorder}{part}' data.extend(struct.unpack(part, self.read_bytes(struct.calcsize(part)))) if unwrap and len(data) == 1: return data[0] if peek: self.seekset(current_cursor) return data
def read_nibble(self, peek=False)
-
Calls
StructReader.read_integer()
with an argument of4
.Expand source code Browse git
def read_nibble(self, peek: bool = False) -> int: """ Calls `refinery.lib.structures.StructReader.read_integer` with an argument of `4`. """ return self.read_integer(4, peek)
def u8(self, peek=False)
-
Expand source code Browse git
def u8(self, peek: bool = False) -> int: return self.read_integer(8, peek)
def i8(self, peek=False)
-
Expand source code Browse git
def i8(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8)
def u16(self, peek=False)
-
Expand source code Browse git
def u16(self, peek: bool = False) -> int: return self.read_integer(16, peek)
def u32(self, peek=False)
-
Expand source code Browse git
def u32(self, peek: bool = False) -> int: return self.read_integer(32, peek)
def u64(self, peek=False)
-
Expand source code Browse git
def u64(self, peek: bool = False) -> int: return self.read_integer(64, peek)
def i16(self, peek=False)
-
Expand source code Browse git
def i16(self, peek: bool = False) -> int: return signed(self.read_integer(16, peek), 16)
def i32(self, peek=False)
-
Expand source code Browse git
def i32(self, peek: bool = False) -> int: return signed(self.read_integer(32, peek), 32)
def i64(self, peek=False)
-
Expand source code Browse git
def i64(self, peek: bool = False) -> int: return signed(self.read_integer(64, peek), 64)
def f32(self, peek=False)
-
Expand source code Browse git
def f32(self, peek: bool = False) -> float: return self.read_struct('f', unwrap=True, peek=peek)
def f64(self, peek=False)
-
Expand source code Browse git
def f64(self, peek: bool = False) -> float: return self.read_struct('d', unwrap=True, peek=peek)
def read_byte(self, peek=False)
-
Expand source code Browse git
def read_byte(self, peek: bool = False) -> int: return self.read_integer(8, peek)
def read_char(self, peek=False)
-
Expand source code Browse git
def read_char(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8)
def read_terminated_array(self, terminator, alignment=1)
-
Expand source code Browse git
def read_terminated_array(self, terminator: bytes, alignment: int = 1) -> bytearray: pos = self.tell() buf = self.getbuffer() try: end = pos - 1 while True: end = buf.find(terminator, end + 1) if end < 0 or not (end - pos) % alignment: break except AttributeError: result = bytearray() while not self.eof: result.extend(self.read_bytes(alignment)) if result.endswith(terminator): return result[:-len(terminator)] self.seek(pos) raise EOF else: data = self.read_exactly(end - pos) self.seekrel(len(terminator)) return bytearray(data)
def read_guid(self)
-
Expand source code Browse git
def read_guid(self) -> str: _mode = self.bigendian self.bigendian = False try: a = self.u32() b = self.u16() c = self.u16() d = self.read(2).hex().upper() e = self.read(6).hex().upper() except Exception: raise else: return F'{a:08X}-{b:04X}-{c:04X}-{d}-{e}' finally: self.bigendian = _mode
def read_c_string(self, encoding=None)
-
Expand source code Browse git
def read_c_string(self, encoding=None) -> Union[str, bytearray]: data = self.read_terminated_array(B'\0') if encoding is not None: data = data.decode(encoding) return data
def read_w_string(self, encoding=None)
-
Expand source code Browse git
def read_w_string(self, encoding=None) -> Union[str, bytearray]: data = self.read_terminated_array(B'\0\0', 2) if encoding is not None: data = data.decode(encoding) return data
def read_length_prefixed_ascii(self, prefix_size=32)
-
Expand source code Browse git
def read_length_prefixed_ascii(self, prefix_size: int = 32): return self.read_length_prefixed(prefix_size, 'latin1')
def read_length_prefixed_utf8(self, prefix_size=32)
-
Expand source code Browse git
def read_length_prefixed_utf8(self, prefix_size: int = 32): return self.read_length_prefixed(prefix_size, 'utf8')
def read_length_prefixed_utf16(self, prefix_size=32, bytecount=False)
-
Expand source code Browse git
def read_length_prefixed_utf16(self, prefix_size: int = 32, bytecount: bool = False): block_size = 1 if bytecount else 2 return self.read_length_prefixed(prefix_size, 'utf-16le', block_size)
def read_length_prefixed(self, prefix_size=32, encoding=None, block_size=1)
-
Expand source code Browse git
def read_length_prefixed(self, prefix_size: int = 32, encoding: Optional[str] = None, block_size: int = 1) -> Union[T, str]: prefix = self.read_integer(prefix_size) * block_size data = self.read(prefix) if encoding is not None: data = data.decode(encoding) return data
def read_7bit_encoded_int(self, max_bits=0)
-
Expand source code Browse git
def read_7bit_encoded_int(self, max_bits: int = 0) -> int: value = 0 for shift in itertools.count(0, step=7): b = self.read_byte() value |= (b & 0x7F) << shift if not b & 0x80: return value if shift > max_bits > 0: raise RuntimeError('Maximum bits were exceeded by encoded integer.')
Inherited members
class StructMeta (name, bases, nmspc, parser=refinery.lib.structures.StructReader)
-
A metaclass to facilitate the behavior outlined for
Struct
.Expand source code Browse git
class StructMeta(type): """ A metaclass to facilitate the behavior outlined for `refinery.lib.structures.Struct`. """ def __new__(mcls, name, bases, nmspc, parser=StructReader): return type.__new__(mcls, name, bases, nmspc) def __init__(cls, name, bases, nmspc, parser=StructReader): super(StructMeta, cls).__init__(name, bases, nmspc) original__init__ = cls.__init__ @functools.wraps(original__init__) def wrapped__init__(self: Struct, reader, *args, **kwargs): if not isinstance(reader, parser): if issubclass(parser, reader.__class__): raise ValueError( F'A reader of type {reader.__class__.__name__} was passed to {cls.__name__}, ' F'but a {parser.__name__} is required.') reader = parser(reader) start = reader.tell() view = memoryview(reader.getbuffer()) original__init__(self, reader, *args, **kwargs) self._data = view[start:reader.tell()] cls.__init__ = wrapped__init__
Ancestors
- builtins.type
class Struct (reader, *args, **kwargs)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class Struct(metaclass=StructMeta): """ A class to parse structured data. A `refinery.lib.structures.Struct` class can be instantiated as follows: foo = Struct(data, bar=29) The initialization routine of the structure will be called with a single argument `reader`. If the object `data` is already a `refinery.lib.structures.StructReader`, then it will be passed as `reader`. Otherwise, the argument will be wrapped in a `refinery.lib.structures.StructReader`. Additional arguments to the struct are passed through. """ _data: Union[memoryview, bytearray] def __len__(self): return len(self._data) def __bytes__(self): return bytes(self._data) def get_data(self, decouple=False): if decouple and isinstance(self._data, memoryview): self._data = bytearray(self._data) return self._data def __init__(self, reader: StructReader, *args, **kwargs): pass
Subclasses
- DexFile
- JvAccessFlags
- JvClassFile
- JvCode
- JvException
- JvOpCode
- refinery.lib.java._HasPoolAndTag
- BCRYPT_RSAKEY_BLOB
- BLOBHEADER
- CRYPTOKEY
- DHPUBKEY
- PLAINTEXTKEYBLOB
- PRIVATEKEYBLOB
- RSAPUBKEY
- SIMPLEBLOB
- LZFHeader
- LZGStream
- RangeDecoder
- LZO
- LZOChunk
- A3xRecord
- A3xScript
- AsarHeader
- CPIOEntry
- GzipHeader
- FatArch
- NSArchive
- NSBlockHeaderOffset
- NSHeader
- NSScriptExtendedInstruction
- NSScriptInstruction
- PYZ
- PiTOCEntry
- PyInstallerArchiveEpilogue
- IFPSFile
- GRPICONDIR
- GRPICONDIRENTRY
- ZipCentralDirectory
- ZipEndOfCentralDirectory
Methods
def get_data(self, decouple=False)
-
Expand source code Browse git
def get_data(self, decouple=False): if decouple and isinstance(self._data, memoryview): self._data = bytearray(self._data) return self._data
class PerInstanceAttribute
-
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def getitem(self, key: KT) -> VT: … # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Expand source code Browse git
class PerInstanceAttribute(Generic[AttrType]): def resolve(self, parent, value: Any) -> AttrType: return value def __init__(self): self.__set: Dict[int, Any] = {} self.__get: Dict[int, AttrType] = {} def __set__(self, parent: Any, value: Any) -> None: pid = id(parent) if pid not in self.__set: def cleanup(self, pid): self.__set.pop(pid, None) self.__get.pop(pid, None) self.__set[pid] = value weakref.finalize(parent, cleanup, self, id(parent)) def __get__(self, parent, tp=None) -> AttrType: pid = id(parent) if pid not in self.__get: try: seed = self.__set[pid] except KeyError as K: raise AttributeError from K self.__get[pid] = self.resolve(parent, seed) return self.__get[pid]
Ancestors
- typing.Generic
Subclasses
- refinery.lib.java.Index
Methods
def resolve(self, parent, value)
-
Expand source code Browse git
def resolve(self, parent, value: Any) -> AttrType: return value