Module refinery.lib.structures

Interfaces and classes to read structured data.

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Interfaces and classes to read structured data.
"""
from __future__ import annotations

import contextlib
import itertools
import enum
import functools
import io
import re
import struct
import weakref

from typing import (
    Any,
    ByteString,
    Dict,
    Generic,
    Iterable,
    List,
    Optional,
    Tuple,
    Type,
    TypeVar,
    Union,
)


T = TypeVar('T', bound=Union[bytearray, bytes, memoryview])
C = TypeVar('C', bound=Union[bytearray, bytes, memoryview])
UnpackType = Union[int, bool, float, bytes]


def signed(k: int, bitsize: int):
    """
    If `k` is an integer of the given bit size, cast it to a signed one.
    """
    M = 1 << bitsize
    k = k & (M - 1)
    return k - M if k >> (bitsize - 1) else k


class EOF(EOFError):
    """
    While reading from a `refinery.lib.structures.MemoryFile`, less bytes were available than
    requested. The exception contains the data from the incomplete read.
    """
    def __init__(self, rest: ByteString = B''):
        super().__init__('Unexpected end of buffer.')
        self.rest = rest

    def __bytes__(self):
        return bytes(self.rest)


class StreamDetour:
    """
    A stream detour is used as a context manager to temporarily read from a different location
    in the stream and then return to the original offset when the context ends.
    """
    def __init__(self, stream: io.IOBase, offset: Optional[int] = None, whence: int = io.SEEK_SET):
        self.stream = stream
        self.offset = offset
        self.whence = whence

    def __enter__(self):
        self.cursor = self.stream.tell()
        if self.offset is not None:
            self.stream.seek(self.offset, self.whence)
        return self

    def __exit__(self, *args):
        self.stream.seek(self.cursor, io.SEEK_SET)


class MemoryFileMethods(Generic[T]):
    """
    A thin wrapper around (potentially mutable) byte sequences which gives it the features of a
    file-like object.
    """
    closed: bool
    read_as_bytes: bool

    _data: T
    _cursor: int
    _closed: bool

    class SEEK(int, enum.Enum):
        CUR = io.SEEK_CUR
        END = io.SEEK_END
        SET = io.SEEK_SET

    def __init__(
        self,
        data: Optional[T] = None,
        read_as_bytes=False,
        fileno: Optional[int] = None,
        size_limit: Optional[int] = None,
    ) -> None:
        if data is None:
            data = bytearray()
        elif size_limit is not None and len(data) > size_limit:
            raise ValueError('Initial data exceeds size limit')
        self._data = data
        self._cursor = 0
        self._closed = False
        self._fileno = fileno
        self.read_as_bytes = read_as_bytes
        self._size_limit = size_limit

    def close(self) -> None:
        self._closed = True

    @property
    def closed(self) -> bool:
        return self._closed

    def __enter__(self) -> MemoryFile:
        return self

    def __exit__(self, ex_type, ex_value, trace) -> bool:
        return False

    def flush(self) -> None:
        pass

    def isatty(self) -> bool:
        return False

    def __iter__(self):
        return self

    def __len__(self):
        return len(self._data)

    def __next__(self):
        line = self.readline()
        if not line:
            raise StopIteration
        return line

    def fileno(self) -> int:
        if self._fileno is None:
            raise OSError
        return self._fileno

    def readable(self) -> bool:
        return not self._closed

    def seekable(self) -> bool:
        return not self._closed

    @property
    def eof(self) -> bool:
        return self._closed or self._cursor >= len(self._data)

    @property
    def remaining_bytes(self) -> int:
        return len(self._data) - self.tell()

    def detour(self, offset: Optional[int] = None, whence: int = io.SEEK_SET):
        return StreamDetour(self, offset, whence=whence)

    def writable(self) -> bool:
        if self._closed:
            return False
        if isinstance(self._data, memoryview):
            return not self._data.readonly
        return isinstance(self._data, bytearray)

    def read_as(self, cast: Type[C], size: int = -1, peek: bool = False) -> C:
        out = self.read(size, peek)
        if not isinstance(out, cast):
            out = cast(out)
        return out

    def read(self, size: int = -1, peek: bool = False) -> T:
        beginning = self._cursor
        if size is None or size < 0:
            end = len(self._data)
        else:
            end = min(self._cursor + size, len(self._data))
        result = self._data[beginning:end]
        if self.read_as_bytes and not isinstance(result, bytes):
            result = bytes(result)
        if not peek:
            self._cursor = end
        return result

    def peek(self, size: int = -1) -> memoryview:
        cursor = self._cursor
        mv = memoryview(self._data)
        if size is None or size < 0:
            return mv[cursor:]
        return mv[cursor:cursor + size]

    def read1(self, size: int = -1, peek: bool = False) -> T:
        return self.read(size, peek)

    def _find_linebreak(self, beginning: int, end: int) -> int:
        if not isinstance(self._data, memoryview):
            return self._data.find(B'\n', beginning, end)
        for k in range(beginning, end):
            if self._data[k] == 0xA: return k
        return -1

    def readline(self, size: int = -1) -> T:
        beginning, end = self._cursor, len(self._data)
        if size is not None and size >= 0:
            end = beginning + size
        p = self._find_linebreak(beginning, end)
        self._cursor = end if p < 0 else p + 1
        result = self._data[beginning:self._cursor]
        if self.read_as_bytes and not isinstance(result, bytes):
            result = bytes(result)
        return result

    def readlines(self, hint: int = -1) -> Iterable[T]:
        if hint is None or hint < 0:
            yield from self
        else:
            total = 0
            while total < hint:
                line = next(self)
                total += len(line)
                yield line

    def readinto1(self, b) -> int:
        data = self.read(len(b))
        size = len(data)
        b[:size] = data
        return size

    def readinto(self, b) -> int:
        return self.readinto1(b)

    def tell(self) -> int:
        return self._cursor

    def seekrel(self, offset: int) -> int:
        return self.seek(offset, io.SEEK_CUR)

    def seekset(self, offset: int) -> int:
        if offset < 0:
            return self.seek(offset, io.SEEK_END)
        else:
            return self.seek(offset, io.SEEK_SET)

    def getbuffer(self) -> T:
        return self._data

    def getvalue(self) -> T:
        return self._data

    def seek(self, offset: int, whence=io.SEEK_SET) -> int:
        if whence == io.SEEK_SET:
            if offset < 0:
                raise ValueError('no negative offsets allowed for SEEK_SET.')
            self._cursor = offset
        elif whence == io.SEEK_CUR:
            self._cursor += offset
        elif whence == io.SEEK_END:
            self._cursor = len(self._data) + offset
        self._cursor = max(self._cursor, 0)
        self._cursor = min(self._cursor, len(self._data))
        return self._cursor

    def writelines(self, lines: Iterable[ByteString]) -> None:
        for line in lines:
            self.write(line)

    def truncate(self, size=None) -> None:
        if size is not None:
            if not (0 <= size <= len(self._data)):
                raise ValueError('invalid size value')
            self._cursor = size
        del self._data[self._cursor:]

    def write_byte(self, byte: int) -> None:
        limit = self._size_limit
        cc = self._cursor
        nc = cc + 1
        if limit and nc > limit:
            raise EOF(bytes((byte,)))
        try:
            if cc < len(self._data):
                self._data[cc] = byte
            else:
                self._data.append(byte)
        except Exception as T:
            raise OSError(str(T)) from T
        else:
            self._cursor = nc

    def write(self, data: Iterable[int]) -> int:
        out = self._data
        end = len(out)
        beginning = self._cursor
        limit = self._size_limit

        if limit is None and beginning == end:
            out[end:] = data
            self._cursor = end = len(out)
            return end - beginning
        try:
            size = len(data)
        except Exception:
            it = iter(data)
            for cursor, b in enumerate(it, beginning):
                out[cursor] = b
                if cursor >= end - 1:
                    break
            else:
                cursor += 1
                self._cursor = cursor
                return cursor - beginning
            if limit is None:
                out[end:] = it
            else:
                out[end:limit] = itertools.islice(it, 0, limit - end)
                try:
                    b = next(it)
                except StopIteration:
                    self._cursor = limit
                    return limit - beginning
                else:
                    rest = bytearray((b,))
                    rest[1:] = it
                    raise EOF(rest)
        else:
            if limit and size + beginning > limit:
                raise EOF(data)
            self._cursor += size
            try:
                self._data[beginning:self._cursor] = data
            except Exception as T:
                self._cursor = beginning
                raise OSError(str(T)) from T
            return size
        self._cursor = end = len(out)
        return end - beginning

    def __getitem__(self, slice):
        result = self._data[slice]
        if self.read_as_bytes and not isinstance(result, bytes):
            result = bytes(result)
        return result

    def replay(self, offset: int, length: int):
        if offset not in range(self._cursor + 1):
            raise ValueError(F'The supplied delta {offset} is not in the valid range [0,{self._cursor}].')
        rep, r = divmod(length, offset)
        offset = -offset - len(self) + self._cursor
        replay = self._data[offset:offset + r]
        if rep > 0:
            replay = bytes(self._data[offset:self._cursor]) * rep + replay
        self.write(replay)


class MemoryFile(MemoryFileMethods[T], io.BytesIO):
    pass


class order(str, enum.Enum):
    big = '>'
    little = '<'


class StructReader(MemoryFile[T]):
    """
    An extension of a `refinery.lib.structures.MemoryFile` which provides methods to read
    structured data.
    """

    class Unaligned(RuntimeError):
        pass

    def __init__(self, data: T, bigendian: bool = False):
        super().__init__(data)
        self._bbits = 0
        self._nbits = 0
        self.bigendian = bigendian

    def __enter__(self) -> StructReader:
        return super().__enter__()

    @property
    @contextlib.contextmanager
    def be(self):
        self.bigendian = True
        try:
            yield self
        finally:
            self.bigendian = False

    @property
    def byteorder_format(self) -> str:
        return '>' if self.bigendian else '<'

    @property
    def byteorder_name(self) -> str:
        return 'big' if self.bigendian else 'little'

    def seek(self, offset, whence=io.SEEK_SET) -> int:
        self._bbits = 0
        self._nbits = 0
        return super().seek(offset, whence)

    def read_exactly(self, size: Optional[int] = None, peek: bool = False) -> T:
        """
        Read bytes from the underlying stream. Raises a `RuntimeError` when the stream is not currently
        byte-aligned, i.e. when `refinery.lib.structures.StructReader.byte_aligned` is `False`. Raises
        an exception of type `refinery.lib.structures.EOF` when fewer data is available in the stream than
        requested via the `size` parameter. The remaining data can be extracted from the exception.
        Use `refinery.lib.structures.StructReader.read_bytes` to read bytes from the stream when it is
        not byte-aligned.
        """
        if not self.byte_aligned:
            raise StructReader.Unaligned('buffer is not byte-aligned')
        data = self.read1(size, peek)
        if size and len(data) < size:
            raise EOF(data)
        return data

    @property
    def byte_aligned(self) -> bool:
        """
        This property is `True` if and only if there are currently no bits still waiting in the internal
        bit buffer.
        """
        return not self._nbits

    def byte_align(self, blocksize: int = 1) -> Tuple[int, int]:
        """
        This method clears the internal bit buffer and moves the cursor to the next byte. It returns a
        tuple containing the size and contents of the bit buffer.
        """
        nbits = self._nbits
        bbits = self._bbits
        self._nbits = 0
        self._bbits = 0
        mod = self._cursor % blocksize
        if mod:
            self.seekrel(blocksize - mod)
        return nbits, bbits

    @property
    def remaining_bits(self) -> int:
        return 8 * self.remaining_bytes + self._nbits

    def read_integer(self, length: Optional[int] = None, peek: bool = False) -> int:
        """
        Read `length` many bits from the underlying stream as an integer.
        """
        if length is None:
            length = self.remaining_bits
        if length < 0:
            raise ValueError
        if length < self._nbits:
            new_count = self._nbits - length
            if self.bigendian:
                result = self._bbits >> new_count
                if not peek:
                    self._bbits ^= result << new_count
            else:
                result = self._bbits & 2 ** length - 1
                if not peek:
                    self._bbits >>= length
            if not peek:
                self._nbits = new_count
            return result

        nbits, bbits = self._nbits, self._bbits
        number_of_missing_bits = length - nbits
        bytecount, rest = divmod(number_of_missing_bits, 8)
        if rest:
            bytecount += 1
            rest = 8 - rest
        bb = self.read1(bytecount, True)
        if len(bb) != bytecount:
            raise EOFError
        if not peek:
            self.seekrel(bytecount)
        if bytecount == 1:
            result, = bb
        else:
            result = int.from_bytes(bb, self.byteorder_name)
        if not nbits and not rest:
            return result
        if self.bigendian:
            rbmask   = 2 ** rest - 1       # noqa
            excess   = result & rbmask     # noqa
            result >>= rest                # noqa
            result  ^= bbits << number_of_missing_bits   # noqa
        else:
            excess   = result >> number_of_missing_bits  # noqa
            result  ^= excess << number_of_missing_bits  # noqa
            result <<= nbits               # noqa
            result  |= bbits               # noqa
        assert excess.bit_length() <= rest
        if not peek:
            self._nbits = rest
            self._bbits = excess
        return result

    def read_bytes(self, size: int, peek: bool = False) -> bytes:
        """
        The method reads `size` many bytes from the underlying stream starting at the current bit.
        """
        if self.byte_aligned:
            data = self.read_exactly(size, peek)
            if not isinstance(data, bytes):
                data = bytes(data)
            return data
        else:
            return self.read_integer(size * 8, peek).to_bytes(size, self.byteorder_name)

    def read_bit(self) -> int:
        """
        This function is a shortcut for calling `refinery.lib.structures.StructReader.read_integer` with
        an argument of `1`, i.e. this reads the next bit from the stream. The bits of any byte in the stream
        are read from least significant to most significant.
        """
        return self.read_integer(1)

    def read_bits(self, nbits: int) -> Iterable[int]:
        """
        This method returns the bits of `refinery.lib.structures.StructReader.read_integer` as an iterable
        from least to most significant.
        """
        chunk = self.read_integer(nbits)
        for k in range(nbits - 1, -1, -1):
            yield chunk >> k & 1

    def read_flags(self, nbits: int, reverse=False) -> Iterable[bool]:
        """
        Identical to `refinery.lib.structures.StructReader.read_bits` with every bit value cast to a boolean.
        """
        bits = list(self.read_bits(nbits))
        if reverse:
            bits.reverse()
        for bit in bits:
            yield bool(bit)

    def read_struct(self, spec: str, unwrap=False, peek=False) -> Union[List[UnpackType], UnpackType]:
        """
        Read structured data from the stream in any format supported by the `struct` module. The `format`
        argument can be used to override the current byte ordering. If the `unwrap` parameter is `True`, a
        single unpacked value will be returned as a scalar, not as a tuple with one element.
        """
        if not spec:
            raise ValueError('no format specified')
        byteorder = spec[:1]
        if byteorder in '<!=@>':
            spec = spec[1:]
        else:
            byteorder = self.byteorder_format
        data = []
        current_cursor = self.tell()

        # reserved struct characters: xcbB?hHiIlLqQnNefdspP
        for k, part in enumerate(re.split('(\\d*[auwgE])', spec)):
            if k % 2 == 1:
                count = 1 if len(part) == 1 else int(part[:~0])
                part = part[~0]
                for _ in range(count):
                    if part == 'a':
                        data.append(self.read_c_string())
                    elif part == 'g':
                        data.append(self.read_guid())
                    elif part == 'u':
                        data.append(self.read_w_string())
                    elif part == 'w':
                        data.append(self.read_w_string().decode('utf-16le'))
                    elif part == 'E':
                        data.append(self.read_7bit_encoded_int())
                continue
            else:
                part = F'{byteorder}{part}'
                data.extend(struct.unpack(part, self.read_bytes(struct.calcsize(part))))
        if unwrap and len(data) == 1:
            return data[0]
        if peek:
            self.seekset(current_cursor)
        return data

    def read_nibble(self, peek: bool = False) -> int:
        """
        Calls `refinery.lib.structures.StructReader.read_integer` with an argument of `4`.
        """
        return self.read_integer(4, peek)

    def u8(self, peek: bool = False) -> int: return self.read_integer(8, peek)
    def i8(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8)

    def u16(self, peek: bool = False) -> int: return self.read_integer(16, peek)
    def u32(self, peek: bool = False) -> int: return self.read_integer(32, peek)
    def u64(self, peek: bool = False) -> int: return self.read_integer(64, peek)
    def i16(self, peek: bool = False) -> int: return signed(self.read_integer(16, peek), 16)
    def i32(self, peek: bool = False) -> int: return signed(self.read_integer(32, peek), 32)
    def i64(self, peek: bool = False) -> int: return signed(self.read_integer(64, peek), 64)

    def f32(self, peek: bool = False) -> float: return self.read_struct('f', unwrap=True, peek=peek)
    def f64(self, peek: bool = False) -> float: return self.read_struct('d', unwrap=True, peek=peek)

    def read_byte(self, peek: bool = False) -> int: return self.read_integer(8, peek)
    def read_char(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8)

    def read_terminated_array(self, terminator: bytes, alignment: int = 1) -> bytearray:
        pos = self.tell()
        buf = self.getbuffer()
        try:
            end = pos - 1
            while True:
                end = buf.find(terminator, end + 1)
                if end < 0 or not (end - pos) % alignment:
                    break
        except AttributeError:
            result = bytearray()
            while not self.eof:
                result.extend(self.read_bytes(alignment))
                if result.endswith(terminator):
                    return result[:-len(terminator)]
            self.seek(pos)
            raise EOF
        else:
            data = self.read_exactly(end - pos)
            self.seekrel(len(terminator))
            return bytearray(data)

    def read_guid(self) -> str:
        _mode = self.bigendian
        self.bigendian = False
        try:
            a = self.u32()
            b = self.u16()
            c = self.u16()
            d = self.read(2).hex().upper()
            e = self.read(6).hex().upper()
        except Exception:
            raise
        else:
            return F'{a:08X}-{b:04X}-{c:04X}-{d}-{e}'
        finally:
            self.bigendian = _mode

    def read_c_string(self, encoding=None) -> Union[str, bytearray]:
        data = self.read_terminated_array(B'\0')
        if encoding is not None:
            data = data.decode(encoding)
        return data

    def read_w_string(self, encoding=None) -> Union[str, bytearray]:
        data = self.read_terminated_array(B'\0\0', 2)
        if encoding is not None:
            data = data.decode(encoding)
        return data

    def read_length_prefixed_ascii(self, prefix_size: int = 32):
        return self.read_length_prefixed(prefix_size, 'latin1')

    def read_length_prefixed_utf8(self, prefix_size: int = 32):
        return self.read_length_prefixed(prefix_size, 'utf8')

    def read_length_prefixed_utf16(self, prefix_size: int = 32, bytecount: bool = False):
        block_size = 1 if bytecount else 2
        return self.read_length_prefixed(prefix_size, 'utf-16le', block_size)

    def read_length_prefixed(self, prefix_size: int = 32, encoding: Optional[str] = None, block_size: int = 1) -> Union[T, str]:
        prefix = self.read_integer(prefix_size) * block_size
        data = self.read(prefix)
        if encoding is not None:
            data = data.decode(encoding)
        return data

    def read_7bit_encoded_int(self, max_bits: int = 0) -> int:
        value = 0
        for shift in itertools.count(0, step=7):
            b = self.read_byte()
            value |= (b & 0x7F) << shift
            if not b & 0x80:
                return value
            if shift > max_bits > 0:
                raise RuntimeError('Maximum bits were exceeded by encoded integer.')


class StructMeta(type):
    """
    A metaclass to facilitate the behavior outlined for `refinery.lib.structures.Struct`.
    """
    def __new__(mcls, name, bases, nmspc, parser=StructReader):
        return type.__new__(mcls, name, bases, nmspc)

    def __init__(cls, name, bases, nmspc, parser=StructReader):
        super(StructMeta, cls).__init__(name, bases, nmspc)
        original__init__ = cls.__init__

        @functools.wraps(original__init__)
        def wrapped__init__(self: Struct, reader, *args, **kwargs):
            if not isinstance(reader, parser):
                if issubclass(parser, reader.__class__):
                    raise ValueError(
                        F'A reader of type {reader.__class__.__name__} was passed to {cls.__name__}, '
                        F'but a {parser.__name__} is required.')
                reader = parser(reader)
            start = reader.tell()
            view = memoryview(reader.getbuffer())
            original__init__(self, reader, *args, **kwargs)
            self._data = view[start:reader.tell()]

        cls.__init__ = wrapped__init__


class Struct(metaclass=StructMeta):
    """
    A class to parse structured data. A `refinery.lib.structures.Struct` class can be instantiated
    as follows:

        foo = Struct(data, bar=29)

    The initialization routine of the structure will be called with a single argument `reader`. If
    the object `data` is already a `refinery.lib.structures.StructReader`, then it will be passed
    as `reader`. Otherwise, the argument will be wrapped in a `refinery.lib.structures.StructReader`.
    Additional arguments to the struct are passed through.
    """
    _data: Union[memoryview, bytearray]

    def __len__(self):
        return len(self._data)

    def __bytes__(self):
        return bytes(self._data)

    def get_data(self, decouple=False):
        if decouple and isinstance(self._data, memoryview):
            self._data = bytearray(self._data)
        return self._data

    def __init__(self, reader: StructReader, *args, **kwargs):
        pass


AttrType = TypeVar('AttrType')


class PerInstanceAttribute(Generic[AttrType]):
    def resolve(self, parent, value: Any) -> AttrType:
        return value

    def __init__(self):
        self.__set: Dict[int, Any] = {}
        self.__get: Dict[int, AttrType] = {}

    def __set__(self, parent: Any, value: Any) -> None:
        pid = id(parent)
        if pid not in self.__set:
            def cleanup(self, pid):
                self.__set.pop(pid, None)
                self.__get.pop(pid, None)
            self.__set[pid] = value
            weakref.finalize(parent, cleanup, self, id(parent))

    def __get__(self, parent, tp=None) -> AttrType:
        pid = id(parent)
        if pid not in self.__get:
            try:
                seed = self.__set[pid]
            except KeyError as K:
                raise AttributeError from K
            self.__get[pid] = self.resolve(parent, seed)
        return self.__get[pid]

Functions

def signed(k, bitsize)

If k is an integer of the given bit size, cast it to a signed one.

Expand source code Browse git
def signed(k: int, bitsize: int):
    """
    If `k` is an integer of the given bit size, cast it to a signed one.
    """
    M = 1 << bitsize
    k = k & (M - 1)
    return k - M if k >> (bitsize - 1) else k

Classes

class EOF (rest=b'')

While reading from a MemoryFile, less bytes were available than requested. The exception contains the data from the incomplete read.

Expand source code Browse git
class EOF(EOFError):
    """
    While reading from a `refinery.lib.structures.MemoryFile`, less bytes were available than
    requested. The exception contains the data from the incomplete read.
    """
    def __init__(self, rest: ByteString = B''):
        super().__init__('Unexpected end of buffer.')
        self.rest = rest

    def __bytes__(self):
        return bytes(self.rest)

Ancestors

  • builtins.EOFError
  • builtins.Exception
  • builtins.BaseException
class StreamDetour (stream, offset=None, whence=0)

A stream detour is used as a context manager to temporarily read from a different location in the stream and then return to the original offset when the context ends.

Expand source code Browse git
class StreamDetour:
    """
    A stream detour is used as a context manager to temporarily read from a different location
    in the stream and then return to the original offset when the context ends.
    """
    def __init__(self, stream: io.IOBase, offset: Optional[int] = None, whence: int = io.SEEK_SET):
        self.stream = stream
        self.offset = offset
        self.whence = whence

    def __enter__(self):
        self.cursor = self.stream.tell()
        if self.offset is not None:
            self.stream.seek(self.offset, self.whence)
        return self

    def __exit__(self, *args):
        self.stream.seek(self.cursor, io.SEEK_SET)
class MemoryFileMethods (data=None, read_as_bytes=False, fileno=None, size_limit=None)

A thin wrapper around (potentially mutable) byte sequences which gives it the features of a file-like object.

Expand source code Browse git
class MemoryFileMethods(Generic[T]):
    """
    A thin wrapper around (potentially mutable) byte sequences which gives it the features of a
    file-like object.
    """
    closed: bool
    read_as_bytes: bool

    _data: T
    _cursor: int
    _closed: bool

    class SEEK(int, enum.Enum):
        CUR = io.SEEK_CUR
        END = io.SEEK_END
        SET = io.SEEK_SET

    def __init__(
        self,
        data: Optional[T] = None,
        read_as_bytes=False,
        fileno: Optional[int] = None,
        size_limit: Optional[int] = None,
    ) -> None:
        if data is None:
            data = bytearray()
        elif size_limit is not None and len(data) > size_limit:
            raise ValueError('Initial data exceeds size limit')
        self._data = data
        self._cursor = 0
        self._closed = False
        self._fileno = fileno
        self.read_as_bytes = read_as_bytes
        self._size_limit = size_limit

    def close(self) -> None:
        self._closed = True

    @property
    def closed(self) -> bool:
        return self._closed

    def __enter__(self) -> MemoryFile:
        return self

    def __exit__(self, ex_type, ex_value, trace) -> bool:
        return False

    def flush(self) -> None:
        pass

    def isatty(self) -> bool:
        return False

    def __iter__(self):
        return self

    def __len__(self):
        return len(self._data)

    def __next__(self):
        line = self.readline()
        if not line:
            raise StopIteration
        return line

    def fileno(self) -> int:
        if self._fileno is None:
            raise OSError
        return self._fileno

    def readable(self) -> bool:
        return not self._closed

    def seekable(self) -> bool:
        return not self._closed

    @property
    def eof(self) -> bool:
        return self._closed or self._cursor >= len(self._data)

    @property
    def remaining_bytes(self) -> int:
        return len(self._data) - self.tell()

    def detour(self, offset: Optional[int] = None, whence: int = io.SEEK_SET):
        return StreamDetour(self, offset, whence=whence)

    def writable(self) -> bool:
        if self._closed:
            return False
        if isinstance(self._data, memoryview):
            return not self._data.readonly
        return isinstance(self._data, bytearray)

    def read_as(self, cast: Type[C], size: int = -1, peek: bool = False) -> C:
        out = self.read(size, peek)
        if not isinstance(out, cast):
            out = cast(out)
        return out

    def read(self, size: int = -1, peek: bool = False) -> T:
        beginning = self._cursor
        if size is None or size < 0:
            end = len(self._data)
        else:
            end = min(self._cursor + size, len(self._data))
        result = self._data[beginning:end]
        if self.read_as_bytes and not isinstance(result, bytes):
            result = bytes(result)
        if not peek:
            self._cursor = end
        return result

    def peek(self, size: int = -1) -> memoryview:
        cursor = self._cursor
        mv = memoryview(self._data)
        if size is None or size < 0:
            return mv[cursor:]
        return mv[cursor:cursor + size]

    def read1(self, size: int = -1, peek: bool = False) -> T:
        return self.read(size, peek)

    def _find_linebreak(self, beginning: int, end: int) -> int:
        if not isinstance(self._data, memoryview):
            return self._data.find(B'\n', beginning, end)
        for k in range(beginning, end):
            if self._data[k] == 0xA: return k
        return -1

    def readline(self, size: int = -1) -> T:
        beginning, end = self._cursor, len(self._data)
        if size is not None and size >= 0:
            end = beginning + size
        p = self._find_linebreak(beginning, end)
        self._cursor = end if p < 0 else p + 1
        result = self._data[beginning:self._cursor]
        if self.read_as_bytes and not isinstance(result, bytes):
            result = bytes(result)
        return result

    def readlines(self, hint: int = -1) -> Iterable[T]:
        if hint is None or hint < 0:
            yield from self
        else:
            total = 0
            while total < hint:
                line = next(self)
                total += len(line)
                yield line

    def readinto1(self, b) -> int:
        data = self.read(len(b))
        size = len(data)
        b[:size] = data
        return size

    def readinto(self, b) -> int:
        return self.readinto1(b)

    def tell(self) -> int:
        return self._cursor

    def seekrel(self, offset: int) -> int:
        return self.seek(offset, io.SEEK_CUR)

    def seekset(self, offset: int) -> int:
        if offset < 0:
            return self.seek(offset, io.SEEK_END)
        else:
            return self.seek(offset, io.SEEK_SET)

    def getbuffer(self) -> T:
        return self._data

    def getvalue(self) -> T:
        return self._data

    def seek(self, offset: int, whence=io.SEEK_SET) -> int:
        if whence == io.SEEK_SET:
            if offset < 0:
                raise ValueError('no negative offsets allowed for SEEK_SET.')
            self._cursor = offset
        elif whence == io.SEEK_CUR:
            self._cursor += offset
        elif whence == io.SEEK_END:
            self._cursor = len(self._data) + offset
        self._cursor = max(self._cursor, 0)
        self._cursor = min(self._cursor, len(self._data))
        return self._cursor

    def writelines(self, lines: Iterable[ByteString]) -> None:
        for line in lines:
            self.write(line)

    def truncate(self, size=None) -> None:
        if size is not None:
            if not (0 <= size <= len(self._data)):
                raise ValueError('invalid size value')
            self._cursor = size
        del self._data[self._cursor:]

    def write_byte(self, byte: int) -> None:
        limit = self._size_limit
        cc = self._cursor
        nc = cc + 1
        if limit and nc > limit:
            raise EOF(bytes((byte,)))
        try:
            if cc < len(self._data):
                self._data[cc] = byte
            else:
                self._data.append(byte)
        except Exception as T:
            raise OSError(str(T)) from T
        else:
            self._cursor = nc

    def write(self, data: Iterable[int]) -> int:
        out = self._data
        end = len(out)
        beginning = self._cursor
        limit = self._size_limit

        if limit is None and beginning == end:
            out[end:] = data
            self._cursor = end = len(out)
            return end - beginning
        try:
            size = len(data)
        except Exception:
            it = iter(data)
            for cursor, b in enumerate(it, beginning):
                out[cursor] = b
                if cursor >= end - 1:
                    break
            else:
                cursor += 1
                self._cursor = cursor
                return cursor - beginning
            if limit is None:
                out[end:] = it
            else:
                out[end:limit] = itertools.islice(it, 0, limit - end)
                try:
                    b = next(it)
                except StopIteration:
                    self._cursor = limit
                    return limit - beginning
                else:
                    rest = bytearray((b,))
                    rest[1:] = it
                    raise EOF(rest)
        else:
            if limit and size + beginning > limit:
                raise EOF(data)
            self._cursor += size
            try:
                self._data[beginning:self._cursor] = data
            except Exception as T:
                self._cursor = beginning
                raise OSError(str(T)) from T
            return size
        self._cursor = end = len(out)
        return end - beginning

    def __getitem__(self, slice):
        result = self._data[slice]
        if self.read_as_bytes and not isinstance(result, bytes):
            result = bytes(result)
        return result

    def replay(self, offset: int, length: int):
        if offset not in range(self._cursor + 1):
            raise ValueError(F'The supplied delta {offset} is not in the valid range [0,{self._cursor}].')
        rep, r = divmod(length, offset)
        offset = -offset - len(self) + self._cursor
        replay = self._data[offset:offset + r]
        if rep > 0:
            replay = bytes(self._data[offset:self._cursor]) * rep + replay
        self.write(replay)

Ancestors

  • typing.Generic

Subclasses

Class variables

var read_as_bytes
var SEEK

An enumeration.

Instance variables

var closed
Expand source code Browse git
@property
def closed(self) -> bool:
    return self._closed
var eof
Expand source code Browse git
@property
def eof(self) -> bool:
    return self._closed or self._cursor >= len(self._data)
var remaining_bytes
Expand source code Browse git
@property
def remaining_bytes(self) -> int:
    return len(self._data) - self.tell()

Methods

def close(self)
Expand source code Browse git
def close(self) -> None:
    self._closed = True
def flush(self)
Expand source code Browse git
def flush(self) -> None:
    pass
def isatty(self)
Expand source code Browse git
def isatty(self) -> bool:
    return False
def fileno(self)
Expand source code Browse git
def fileno(self) -> int:
    if self._fileno is None:
        raise OSError
    return self._fileno
def readable(self)
Expand source code Browse git
def readable(self) -> bool:
    return not self._closed
def seekable(self)
Expand source code Browse git
def seekable(self) -> bool:
    return not self._closed
def detour(self, offset=None, whence=0)
Expand source code Browse git
def detour(self, offset: Optional[int] = None, whence: int = io.SEEK_SET):
    return StreamDetour(self, offset, whence=whence)
def writable(self)
Expand source code Browse git
def writable(self) -> bool:
    if self._closed:
        return False
    if isinstance(self._data, memoryview):
        return not self._data.readonly
    return isinstance(self._data, bytearray)
def read_as(self, cast, size=-1, peek=False)
Expand source code Browse git
def read_as(self, cast: Type[C], size: int = -1, peek: bool = False) -> C:
    out = self.read(size, peek)
    if not isinstance(out, cast):
        out = cast(out)
    return out
def read(self, size=-1, peek=False)
Expand source code Browse git
def read(self, size: int = -1, peek: bool = False) -> T:
    beginning = self._cursor
    if size is None or size < 0:
        end = len(self._data)
    else:
        end = min(self._cursor + size, len(self._data))
    result = self._data[beginning:end]
    if self.read_as_bytes and not isinstance(result, bytes):
        result = bytes(result)
    if not peek:
        self._cursor = end
    return result
def peek(self, size=-1)
Expand source code Browse git
def peek(self, size: int = -1) -> memoryview:
    cursor = self._cursor
    mv = memoryview(self._data)
    if size is None or size < 0:
        return mv[cursor:]
    return mv[cursor:cursor + size]
def read1(self, size=-1, peek=False)
Expand source code Browse git
def read1(self, size: int = -1, peek: bool = False) -> T:
    return self.read(size, peek)
def readline(self, size=-1)
Expand source code Browse git
def readline(self, size: int = -1) -> T:
    beginning, end = self._cursor, len(self._data)
    if size is not None and size >= 0:
        end = beginning + size
    p = self._find_linebreak(beginning, end)
    self._cursor = end if p < 0 else p + 1
    result = self._data[beginning:self._cursor]
    if self.read_as_bytes and not isinstance(result, bytes):
        result = bytes(result)
    return result
def readlines(self, hint=-1)
Expand source code Browse git
def readlines(self, hint: int = -1) -> Iterable[T]:
    if hint is None or hint < 0:
        yield from self
    else:
        total = 0
        while total < hint:
            line = next(self)
            total += len(line)
            yield line
def readinto1(self, b)
Expand source code Browse git
def readinto1(self, b) -> int:
    data = self.read(len(b))
    size = len(data)
    b[:size] = data
    return size
def readinto(self, b)
Expand source code Browse git
def readinto(self, b) -> int:
    return self.readinto1(b)
def tell(self)
Expand source code Browse git
def tell(self) -> int:
    return self._cursor
def seekrel(self, offset)
Expand source code Browse git
def seekrel(self, offset: int) -> int:
    return self.seek(offset, io.SEEK_CUR)
def seekset(self, offset)
Expand source code Browse git
def seekset(self, offset: int) -> int:
    if offset < 0:
        return self.seek(offset, io.SEEK_END)
    else:
        return self.seek(offset, io.SEEK_SET)
def getbuffer(self)
Expand source code Browse git
def getbuffer(self) -> T:
    return self._data
def getvalue(self)
Expand source code Browse git
def getvalue(self) -> T:
    return self._data
def seek(self, offset, whence=0)
Expand source code Browse git
def seek(self, offset: int, whence=io.SEEK_SET) -> int:
    if whence == io.SEEK_SET:
        if offset < 0:
            raise ValueError('no negative offsets allowed for SEEK_SET.')
        self._cursor = offset
    elif whence == io.SEEK_CUR:
        self._cursor += offset
    elif whence == io.SEEK_END:
        self._cursor = len(self._data) + offset
    self._cursor = max(self._cursor, 0)
    self._cursor = min(self._cursor, len(self._data))
    return self._cursor
def writelines(self, lines)
Expand source code Browse git
def writelines(self, lines: Iterable[ByteString]) -> None:
    for line in lines:
        self.write(line)
def truncate(self, size=None)
Expand source code Browse git
def truncate(self, size=None) -> None:
    if size is not None:
        if not (0 <= size <= len(self._data)):
            raise ValueError('invalid size value')
        self._cursor = size
    del self._data[self._cursor:]
def write_byte(self, byte)
Expand source code Browse git
def write_byte(self, byte: int) -> None:
    limit = self._size_limit
    cc = self._cursor
    nc = cc + 1
    if limit and nc > limit:
        raise EOF(bytes((byte,)))
    try:
        if cc < len(self._data):
            self._data[cc] = byte
        else:
            self._data.append(byte)
    except Exception as T:
        raise OSError(str(T)) from T
    else:
        self._cursor = nc
def write(self, data)
Expand source code Browse git
def write(self, data: Iterable[int]) -> int:
    out = self._data
    end = len(out)
    beginning = self._cursor
    limit = self._size_limit

    if limit is None and beginning == end:
        out[end:] = data
        self._cursor = end = len(out)
        return end - beginning
    try:
        size = len(data)
    except Exception:
        it = iter(data)
        for cursor, b in enumerate(it, beginning):
            out[cursor] = b
            if cursor >= end - 1:
                break
        else:
            cursor += 1
            self._cursor = cursor
            return cursor - beginning
        if limit is None:
            out[end:] = it
        else:
            out[end:limit] = itertools.islice(it, 0, limit - end)
            try:
                b = next(it)
            except StopIteration:
                self._cursor = limit
                return limit - beginning
            else:
                rest = bytearray((b,))
                rest[1:] = it
                raise EOF(rest)
    else:
        if limit and size + beginning > limit:
            raise EOF(data)
        self._cursor += size
        try:
            self._data[beginning:self._cursor] = data
        except Exception as T:
            self._cursor = beginning
            raise OSError(str(T)) from T
        return size
    self._cursor = end = len(out)
    return end - beginning
def replay(self, offset, length)
Expand source code Browse git
def replay(self, offset: int, length: int):
    if offset not in range(self._cursor + 1):
        raise ValueError(F'The supplied delta {offset} is not in the valid range [0,{self._cursor}].')
    rep, r = divmod(length, offset)
    offset = -offset - len(self) + self._cursor
    replay = self._data[offset:offset + r]
    if rep > 0:
        replay = bytes(self._data[offset:self._cursor]) * rep + replay
    self.write(replay)
class MemoryFile (data=None, read_as_bytes=False, fileno=None, size_limit=None)

A thin wrapper around (potentially mutable) byte sequences which gives it the features of a file-like object.

Expand source code Browse git
class MemoryFile(MemoryFileMethods[T], io.BytesIO):
    pass

Ancestors

Subclasses

Class variables

var read_as_bytes

Instance variables

var closed
Expand source code Browse git
@property
def closed(self) -> bool:
    return self._closed

Inherited members

class order (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code Browse git
class order(str, enum.Enum):
    big = '>'
    little = '<'

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var big
var little
class StructReader (data, bigendian=False)

An extension of a MemoryFile which provides methods to read structured data.

Expand source code Browse git
class StructReader(MemoryFile[T]):
    """
    An extension of a `refinery.lib.structures.MemoryFile` which provides methods to read
    structured data.
    """

    class Unaligned(RuntimeError):
        pass

    def __init__(self, data: T, bigendian: bool = False):
        super().__init__(data)
        self._bbits = 0
        self._nbits = 0
        self.bigendian = bigendian

    def __enter__(self) -> StructReader:
        return super().__enter__()

    @property
    @contextlib.contextmanager
    def be(self):
        self.bigendian = True
        try:
            yield self
        finally:
            self.bigendian = False

    @property
    def byteorder_format(self) -> str:
        return '>' if self.bigendian else '<'

    @property
    def byteorder_name(self) -> str:
        return 'big' if self.bigendian else 'little'

    def seek(self, offset, whence=io.SEEK_SET) -> int:
        self._bbits = 0
        self._nbits = 0
        return super().seek(offset, whence)

    def read_exactly(self, size: Optional[int] = None, peek: bool = False) -> T:
        """
        Read bytes from the underlying stream. Raises a `RuntimeError` when the stream is not currently
        byte-aligned, i.e. when `refinery.lib.structures.StructReader.byte_aligned` is `False`. Raises
        an exception of type `refinery.lib.structures.EOF` when fewer data is available in the stream than
        requested via the `size` parameter. The remaining data can be extracted from the exception.
        Use `refinery.lib.structures.StructReader.read_bytes` to read bytes from the stream when it is
        not byte-aligned.
        """
        if not self.byte_aligned:
            raise StructReader.Unaligned('buffer is not byte-aligned')
        data = self.read1(size, peek)
        if size and len(data) < size:
            raise EOF(data)
        return data

    @property
    def byte_aligned(self) -> bool:
        """
        This property is `True` if and only if there are currently no bits still waiting in the internal
        bit buffer.
        """
        return not self._nbits

    def byte_align(self, blocksize: int = 1) -> Tuple[int, int]:
        """
        This method clears the internal bit buffer and moves the cursor to the next byte. It returns a
        tuple containing the size and contents of the bit buffer.
        """
        nbits = self._nbits
        bbits = self._bbits
        self._nbits = 0
        self._bbits = 0
        mod = self._cursor % blocksize
        if mod:
            self.seekrel(blocksize - mod)
        return nbits, bbits

    @property
    def remaining_bits(self) -> int:
        return 8 * self.remaining_bytes + self._nbits

    def read_integer(self, length: Optional[int] = None, peek: bool = False) -> int:
        """
        Read `length` many bits from the underlying stream as an integer.
        """
        if length is None:
            length = self.remaining_bits
        if length < 0:
            raise ValueError
        if length < self._nbits:
            new_count = self._nbits - length
            if self.bigendian:
                result = self._bbits >> new_count
                if not peek:
                    self._bbits ^= result << new_count
            else:
                result = self._bbits & 2 ** length - 1
                if not peek:
                    self._bbits >>= length
            if not peek:
                self._nbits = new_count
            return result

        nbits, bbits = self._nbits, self._bbits
        number_of_missing_bits = length - nbits
        bytecount, rest = divmod(number_of_missing_bits, 8)
        if rest:
            bytecount += 1
            rest = 8 - rest
        bb = self.read1(bytecount, True)
        if len(bb) != bytecount:
            raise EOFError
        if not peek:
            self.seekrel(bytecount)
        if bytecount == 1:
            result, = bb
        else:
            result = int.from_bytes(bb, self.byteorder_name)
        if not nbits and not rest:
            return result
        if self.bigendian:
            rbmask   = 2 ** rest - 1       # noqa
            excess   = result & rbmask     # noqa
            result >>= rest                # noqa
            result  ^= bbits << number_of_missing_bits   # noqa
        else:
            excess   = result >> number_of_missing_bits  # noqa
            result  ^= excess << number_of_missing_bits  # noqa
            result <<= nbits               # noqa
            result  |= bbits               # noqa
        assert excess.bit_length() <= rest
        if not peek:
            self._nbits = rest
            self._bbits = excess
        return result

    def read_bytes(self, size: int, peek: bool = False) -> bytes:
        """
        The method reads `size` many bytes from the underlying stream starting at the current bit.
        """
        if self.byte_aligned:
            data = self.read_exactly(size, peek)
            if not isinstance(data, bytes):
                data = bytes(data)
            return data
        else:
            return self.read_integer(size * 8, peek).to_bytes(size, self.byteorder_name)

    def read_bit(self) -> int:
        """
        This function is a shortcut for calling `refinery.lib.structures.StructReader.read_integer` with
        an argument of `1`, i.e. this reads the next bit from the stream. The bits of any byte in the stream
        are read from least significant to most significant.
        """
        return self.read_integer(1)

    def read_bits(self, nbits: int) -> Iterable[int]:
        """
        This method returns the bits of `refinery.lib.structures.StructReader.read_integer` as an iterable
        from least to most significant.
        """
        chunk = self.read_integer(nbits)
        for k in range(nbits - 1, -1, -1):
            yield chunk >> k & 1

    def read_flags(self, nbits: int, reverse=False) -> Iterable[bool]:
        """
        Identical to `refinery.lib.structures.StructReader.read_bits` with every bit value cast to a boolean.
        """
        bits = list(self.read_bits(nbits))
        if reverse:
            bits.reverse()
        for bit in bits:
            yield bool(bit)

    def read_struct(self, spec: str, unwrap=False, peek=False) -> Union[List[UnpackType], UnpackType]:
        """
        Read structured data from the stream in any format supported by the `struct` module. The `format`
        argument can be used to override the current byte ordering. If the `unwrap` parameter is `True`, a
        single unpacked value will be returned as a scalar, not as a tuple with one element.
        """
        if not spec:
            raise ValueError('no format specified')
        byteorder = spec[:1]
        if byteorder in '<!=@>':
            spec = spec[1:]
        else:
            byteorder = self.byteorder_format
        data = []
        current_cursor = self.tell()

        # reserved struct characters: xcbB?hHiIlLqQnNefdspP
        for k, part in enumerate(re.split('(\\d*[auwgE])', spec)):
            if k % 2 == 1:
                count = 1 if len(part) == 1 else int(part[:~0])
                part = part[~0]
                for _ in range(count):
                    if part == 'a':
                        data.append(self.read_c_string())
                    elif part == 'g':
                        data.append(self.read_guid())
                    elif part == 'u':
                        data.append(self.read_w_string())
                    elif part == 'w':
                        data.append(self.read_w_string().decode('utf-16le'))
                    elif part == 'E':
                        data.append(self.read_7bit_encoded_int())
                continue
            else:
                part = F'{byteorder}{part}'
                data.extend(struct.unpack(part, self.read_bytes(struct.calcsize(part))))
        if unwrap and len(data) == 1:
            return data[0]
        if peek:
            self.seekset(current_cursor)
        return data

    def read_nibble(self, peek: bool = False) -> int:
        """
        Calls `refinery.lib.structures.StructReader.read_integer` with an argument of `4`.
        """
        return self.read_integer(4, peek)

    def u8(self, peek: bool = False) -> int: return self.read_integer(8, peek)
    def i8(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8)

    def u16(self, peek: bool = False) -> int: return self.read_integer(16, peek)
    def u32(self, peek: bool = False) -> int: return self.read_integer(32, peek)
    def u64(self, peek: bool = False) -> int: return self.read_integer(64, peek)
    def i16(self, peek: bool = False) -> int: return signed(self.read_integer(16, peek), 16)
    def i32(self, peek: bool = False) -> int: return signed(self.read_integer(32, peek), 32)
    def i64(self, peek: bool = False) -> int: return signed(self.read_integer(64, peek), 64)

    def f32(self, peek: bool = False) -> float: return self.read_struct('f', unwrap=True, peek=peek)
    def f64(self, peek: bool = False) -> float: return self.read_struct('d', unwrap=True, peek=peek)

    def read_byte(self, peek: bool = False) -> int: return self.read_integer(8, peek)
    def read_char(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8)

    def read_terminated_array(self, terminator: bytes, alignment: int = 1) -> bytearray:
        pos = self.tell()
        buf = self.getbuffer()
        try:
            end = pos - 1
            while True:
                end = buf.find(terminator, end + 1)
                if end < 0 or not (end - pos) % alignment:
                    break
        except AttributeError:
            result = bytearray()
            while not self.eof:
                result.extend(self.read_bytes(alignment))
                if result.endswith(terminator):
                    return result[:-len(terminator)]
            self.seek(pos)
            raise EOF
        else:
            data = self.read_exactly(end - pos)
            self.seekrel(len(terminator))
            return bytearray(data)

    def read_guid(self) -> str:
        _mode = self.bigendian
        self.bigendian = False
        try:
            a = self.u32()
            b = self.u16()
            c = self.u16()
            d = self.read(2).hex().upper()
            e = self.read(6).hex().upper()
        except Exception:
            raise
        else:
            return F'{a:08X}-{b:04X}-{c:04X}-{d}-{e}'
        finally:
            self.bigendian = _mode

    def read_c_string(self, encoding=None) -> Union[str, bytearray]:
        data = self.read_terminated_array(B'\0')
        if encoding is not None:
            data = data.decode(encoding)
        return data

    def read_w_string(self, encoding=None) -> Union[str, bytearray]:
        data = self.read_terminated_array(B'\0\0', 2)
        if encoding is not None:
            data = data.decode(encoding)
        return data

    def read_length_prefixed_ascii(self, prefix_size: int = 32):
        return self.read_length_prefixed(prefix_size, 'latin1')

    def read_length_prefixed_utf8(self, prefix_size: int = 32):
        return self.read_length_prefixed(prefix_size, 'utf8')

    def read_length_prefixed_utf16(self, prefix_size: int = 32, bytecount: bool = False):
        block_size = 1 if bytecount else 2
        return self.read_length_prefixed(prefix_size, 'utf-16le', block_size)

    def read_length_prefixed(self, prefix_size: int = 32, encoding: Optional[str] = None, block_size: int = 1) -> Union[T, str]:
        prefix = self.read_integer(prefix_size) * block_size
        data = self.read(prefix)
        if encoding is not None:
            data = data.decode(encoding)
        return data

    def read_7bit_encoded_int(self, max_bits: int = 0) -> int:
        value = 0
        for shift in itertools.count(0, step=7):
            b = self.read_byte()
            value |= (b & 0x7F) << shift
            if not b & 0x80:
                return value
            if shift > max_bits > 0:
                raise RuntimeError('Maximum bits were exceeded by encoded integer.')

Ancestors

Subclasses

Class variables

var read_as_bytes
var Unaligned

Unspecified run-time error.

Instance variables

var closed
Expand source code Browse git
@property
def closed(self) -> bool:
    return self._closed
var be
Expand source code Browse git
@property
@contextlib.contextmanager
def be(self):
    self.bigendian = True
    try:
        yield self
    finally:
        self.bigendian = False
var byteorder_format
Expand source code Browse git
@property
def byteorder_format(self) -> str:
    return '>' if self.bigendian else '<'
var byteorder_name
Expand source code Browse git
@property
def byteorder_name(self) -> str:
    return 'big' if self.bigendian else 'little'
var byte_aligned

This property is True if and only if there are currently no bits still waiting in the internal bit buffer.

Expand source code Browse git
@property
def byte_aligned(self) -> bool:
    """
    This property is `True` if and only if there are currently no bits still waiting in the internal
    bit buffer.
    """
    return not self._nbits
var remaining_bits
Expand source code Browse git
@property
def remaining_bits(self) -> int:
    return 8 * self.remaining_bytes + self._nbits

Methods

def seek(self, offset, whence=0)

Change stream position.

Seek to byte offset pos relative to position indicated by whence: 0 Start of stream (the default). pos should be >= 0; 1 Current position - pos may be negative; 2 End of stream - pos usually negative. Returns the new absolute position.

Expand source code Browse git
def seek(self, offset, whence=io.SEEK_SET) -> int:
    self._bbits = 0
    self._nbits = 0
    return super().seek(offset, whence)
def read_exactly(self, size=None, peek=False)

Read bytes from the underlying stream. Raises a RuntimeError when the stream is not currently byte-aligned, i.e. when StructReader.byte_aligned is False. Raises an exception of type EOF when fewer data is available in the stream than requested via the size parameter. The remaining data can be extracted from the exception. Use StructReader.read_bytes() to read bytes from the stream when it is not byte-aligned.

Expand source code Browse git
def read_exactly(self, size: Optional[int] = None, peek: bool = False) -> T:
    """
    Read bytes from the underlying stream. Raises a `RuntimeError` when the stream is not currently
    byte-aligned, i.e. when `refinery.lib.structures.StructReader.byte_aligned` is `False`. Raises
    an exception of type `refinery.lib.structures.EOF` when fewer data is available in the stream than
    requested via the `size` parameter. The remaining data can be extracted from the exception.
    Use `refinery.lib.structures.StructReader.read_bytes` to read bytes from the stream when it is
    not byte-aligned.
    """
    if not self.byte_aligned:
        raise StructReader.Unaligned('buffer is not byte-aligned')
    data = self.read1(size, peek)
    if size and len(data) < size:
        raise EOF(data)
    return data
def byte_align(self, blocksize=1)

This method clears the internal bit buffer and moves the cursor to the next byte. It returns a tuple containing the size and contents of the bit buffer.

Expand source code Browse git
def byte_align(self, blocksize: int = 1) -> Tuple[int, int]:
    """
    This method clears the internal bit buffer and moves the cursor to the next byte. It returns a
    tuple containing the size and contents of the bit buffer.
    """
    nbits = self._nbits
    bbits = self._bbits
    self._nbits = 0
    self._bbits = 0
    mod = self._cursor % blocksize
    if mod:
        self.seekrel(blocksize - mod)
    return nbits, bbits
def read_integer(self, length=None, peek=False)

Read length many bits from the underlying stream as an integer.

Expand source code Browse git
def read_integer(self, length: Optional[int] = None, peek: bool = False) -> int:
    """
    Read `length` many bits from the underlying stream as an integer.
    """
    if length is None:
        length = self.remaining_bits
    if length < 0:
        raise ValueError
    if length < self._nbits:
        new_count = self._nbits - length
        if self.bigendian:
            result = self._bbits >> new_count
            if not peek:
                self._bbits ^= result << new_count
        else:
            result = self._bbits & 2 ** length - 1
            if not peek:
                self._bbits >>= length
        if not peek:
            self._nbits = new_count
        return result

    nbits, bbits = self._nbits, self._bbits
    number_of_missing_bits = length - nbits
    bytecount, rest = divmod(number_of_missing_bits, 8)
    if rest:
        bytecount += 1
        rest = 8 - rest
    bb = self.read1(bytecount, True)
    if len(bb) != bytecount:
        raise EOFError
    if not peek:
        self.seekrel(bytecount)
    if bytecount == 1:
        result, = bb
    else:
        result = int.from_bytes(bb, self.byteorder_name)
    if not nbits and not rest:
        return result
    if self.bigendian:
        rbmask   = 2 ** rest - 1       # noqa
        excess   = result & rbmask     # noqa
        result >>= rest                # noqa
        result  ^= bbits << number_of_missing_bits   # noqa
    else:
        excess   = result >> number_of_missing_bits  # noqa
        result  ^= excess << number_of_missing_bits  # noqa
        result <<= nbits               # noqa
        result  |= bbits               # noqa
    assert excess.bit_length() <= rest
    if not peek:
        self._nbits = rest
        self._bbits = excess
    return result
def read_bytes(self, size, peek=False)

The method reads size many bytes from the underlying stream starting at the current bit.

Expand source code Browse git
def read_bytes(self, size: int, peek: bool = False) -> bytes:
    """
    The method reads `size` many bytes from the underlying stream starting at the current bit.
    """
    if self.byte_aligned:
        data = self.read_exactly(size, peek)
        if not isinstance(data, bytes):
            data = bytes(data)
        return data
    else:
        return self.read_integer(size * 8, peek).to_bytes(size, self.byteorder_name)
def read_bit(self)

This function is a shortcut for calling StructReader.read_integer() with an argument of 1, i.e. this reads the next bit from the stream. The bits of any byte in the stream are read from least significant to most significant.

Expand source code Browse git
def read_bit(self) -> int:
    """
    This function is a shortcut for calling `refinery.lib.structures.StructReader.read_integer` with
    an argument of `1`, i.e. this reads the next bit from the stream. The bits of any byte in the stream
    are read from least significant to most significant.
    """
    return self.read_integer(1)
def read_bits(self, nbits)

This method returns the bits of StructReader.read_integer() as an iterable from least to most significant.

Expand source code Browse git
def read_bits(self, nbits: int) -> Iterable[int]:
    """
    This method returns the bits of `refinery.lib.structures.StructReader.read_integer` as an iterable
    from least to most significant.
    """
    chunk = self.read_integer(nbits)
    for k in range(nbits - 1, -1, -1):
        yield chunk >> k & 1
def read_flags(self, nbits, reverse=False)

Identical to StructReader.read_bits() with every bit value cast to a boolean.

Expand source code Browse git
def read_flags(self, nbits: int, reverse=False) -> Iterable[bool]:
    """
    Identical to `refinery.lib.structures.StructReader.read_bits` with every bit value cast to a boolean.
    """
    bits = list(self.read_bits(nbits))
    if reverse:
        bits.reverse()
    for bit in bits:
        yield bool(bit)
def read_struct(self, spec, unwrap=False, peek=False)

Read structured data from the stream in any format supported by the struct module. The format argument can be used to override the current byte ordering. If the unwrap parameter is True, a single unpacked value will be returned as a scalar, not as a tuple with one element.

Expand source code Browse git
def read_struct(self, spec: str, unwrap=False, peek=False) -> Union[List[UnpackType], UnpackType]:
    """
    Read structured data from the stream in any format supported by the `struct` module. The `format`
    argument can be used to override the current byte ordering. If the `unwrap` parameter is `True`, a
    single unpacked value will be returned as a scalar, not as a tuple with one element.
    """
    if not spec:
        raise ValueError('no format specified')
    byteorder = spec[:1]
    if byteorder in '<!=@>':
        spec = spec[1:]
    else:
        byteorder = self.byteorder_format
    data = []
    current_cursor = self.tell()

    # reserved struct characters: xcbB?hHiIlLqQnNefdspP
    for k, part in enumerate(re.split('(\\d*[auwgE])', spec)):
        if k % 2 == 1:
            count = 1 if len(part) == 1 else int(part[:~0])
            part = part[~0]
            for _ in range(count):
                if part == 'a':
                    data.append(self.read_c_string())
                elif part == 'g':
                    data.append(self.read_guid())
                elif part == 'u':
                    data.append(self.read_w_string())
                elif part == 'w':
                    data.append(self.read_w_string().decode('utf-16le'))
                elif part == 'E':
                    data.append(self.read_7bit_encoded_int())
            continue
        else:
            part = F'{byteorder}{part}'
            data.extend(struct.unpack(part, self.read_bytes(struct.calcsize(part))))
    if unwrap and len(data) == 1:
        return data[0]
    if peek:
        self.seekset(current_cursor)
    return data
def read_nibble(self, peek=False)

Calls StructReader.read_integer() with an argument of 4.

Expand source code Browse git
def read_nibble(self, peek: bool = False) -> int:
    """
    Calls `refinery.lib.structures.StructReader.read_integer` with an argument of `4`.
    """
    return self.read_integer(4, peek)
def u8(self, peek=False)
Expand source code Browse git
def u8(self, peek: bool = False) -> int: return self.read_integer(8, peek)
def i8(self, peek=False)
Expand source code Browse git
def i8(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8)
def u16(self, peek=False)
Expand source code Browse git
def u16(self, peek: bool = False) -> int: return self.read_integer(16, peek)
def u32(self, peek=False)
Expand source code Browse git
def u32(self, peek: bool = False) -> int: return self.read_integer(32, peek)
def u64(self, peek=False)
Expand source code Browse git
def u64(self, peek: bool = False) -> int: return self.read_integer(64, peek)
def i16(self, peek=False)
Expand source code Browse git
def i16(self, peek: bool = False) -> int: return signed(self.read_integer(16, peek), 16)
def i32(self, peek=False)
Expand source code Browse git
def i32(self, peek: bool = False) -> int: return signed(self.read_integer(32, peek), 32)
def i64(self, peek=False)
Expand source code Browse git
def i64(self, peek: bool = False) -> int: return signed(self.read_integer(64, peek), 64)
def f32(self, peek=False)
Expand source code Browse git
def f32(self, peek: bool = False) -> float: return self.read_struct('f', unwrap=True, peek=peek)
def f64(self, peek=False)
Expand source code Browse git
def f64(self, peek: bool = False) -> float: return self.read_struct('d', unwrap=True, peek=peek)
def read_byte(self, peek=False)
Expand source code Browse git
def read_byte(self, peek: bool = False) -> int: return self.read_integer(8, peek)
def read_char(self, peek=False)
Expand source code Browse git
def read_char(self, peek: bool = False) -> int: return signed(self.read_integer(8, peek), 8)
def read_terminated_array(self, terminator, alignment=1)
Expand source code Browse git
def read_terminated_array(self, terminator: bytes, alignment: int = 1) -> bytearray:
    pos = self.tell()
    buf = self.getbuffer()
    try:
        end = pos - 1
        while True:
            end = buf.find(terminator, end + 1)
            if end < 0 or not (end - pos) % alignment:
                break
    except AttributeError:
        result = bytearray()
        while not self.eof:
            result.extend(self.read_bytes(alignment))
            if result.endswith(terminator):
                return result[:-len(terminator)]
        self.seek(pos)
        raise EOF
    else:
        data = self.read_exactly(end - pos)
        self.seekrel(len(terminator))
        return bytearray(data)
def read_guid(self)
Expand source code Browse git
def read_guid(self) -> str:
    _mode = self.bigendian
    self.bigendian = False
    try:
        a = self.u32()
        b = self.u16()
        c = self.u16()
        d = self.read(2).hex().upper()
        e = self.read(6).hex().upper()
    except Exception:
        raise
    else:
        return F'{a:08X}-{b:04X}-{c:04X}-{d}-{e}'
    finally:
        self.bigendian = _mode
def read_c_string(self, encoding=None)
Expand source code Browse git
def read_c_string(self, encoding=None) -> Union[str, bytearray]:
    data = self.read_terminated_array(B'\0')
    if encoding is not None:
        data = data.decode(encoding)
    return data
def read_w_string(self, encoding=None)
Expand source code Browse git
def read_w_string(self, encoding=None) -> Union[str, bytearray]:
    data = self.read_terminated_array(B'\0\0', 2)
    if encoding is not None:
        data = data.decode(encoding)
    return data
def read_length_prefixed_ascii(self, prefix_size=32)
Expand source code Browse git
def read_length_prefixed_ascii(self, prefix_size: int = 32):
    return self.read_length_prefixed(prefix_size, 'latin1')
def read_length_prefixed_utf8(self, prefix_size=32)
Expand source code Browse git
def read_length_prefixed_utf8(self, prefix_size: int = 32):
    return self.read_length_prefixed(prefix_size, 'utf8')
def read_length_prefixed_utf16(self, prefix_size=32, bytecount=False)
Expand source code Browse git
def read_length_prefixed_utf16(self, prefix_size: int = 32, bytecount: bool = False):
    block_size = 1 if bytecount else 2
    return self.read_length_prefixed(prefix_size, 'utf-16le', block_size)
def read_length_prefixed(self, prefix_size=32, encoding=None, block_size=1)
Expand source code Browse git
def read_length_prefixed(self, prefix_size: int = 32, encoding: Optional[str] = None, block_size: int = 1) -> Union[T, str]:
    prefix = self.read_integer(prefix_size) * block_size
    data = self.read(prefix)
    if encoding is not None:
        data = data.decode(encoding)
    return data
def read_7bit_encoded_int(self, max_bits=0)
Expand source code Browse git
def read_7bit_encoded_int(self, max_bits: int = 0) -> int:
    value = 0
    for shift in itertools.count(0, step=7):
        b = self.read_byte()
        value |= (b & 0x7F) << shift
        if not b & 0x80:
            return value
        if shift > max_bits > 0:
            raise RuntimeError('Maximum bits were exceeded by encoded integer.')

Inherited members

class StructMeta (name, bases, nmspc, parser=refinery.lib.structures.StructReader)

A metaclass to facilitate the behavior outlined for Struct.

Expand source code Browse git
class StructMeta(type):
    """
    A metaclass to facilitate the behavior outlined for `refinery.lib.structures.Struct`.
    """
    def __new__(mcls, name, bases, nmspc, parser=StructReader):
        return type.__new__(mcls, name, bases, nmspc)

    def __init__(cls, name, bases, nmspc, parser=StructReader):
        super(StructMeta, cls).__init__(name, bases, nmspc)
        original__init__ = cls.__init__

        @functools.wraps(original__init__)
        def wrapped__init__(self: Struct, reader, *args, **kwargs):
            if not isinstance(reader, parser):
                if issubclass(parser, reader.__class__):
                    raise ValueError(
                        F'A reader of type {reader.__class__.__name__} was passed to {cls.__name__}, '
                        F'but a {parser.__name__} is required.')
                reader = parser(reader)
            start = reader.tell()
            view = memoryview(reader.getbuffer())
            original__init__(self, reader, *args, **kwargs)
            self._data = view[start:reader.tell()]

        cls.__init__ = wrapped__init__

Ancestors

  • builtins.type
class Struct (reader, *args, **kwargs)

A class to parse structured data. A Struct class can be instantiated as follows:

foo = Struct(data, bar=29)

The initialization routine of the structure will be called with a single argument reader. If the object data is already a StructReader, then it will be passed as reader. Otherwise, the argument will be wrapped in a StructReader. Additional arguments to the struct are passed through.

Expand source code Browse git
class Struct(metaclass=StructMeta):
    """
    A class to parse structured data. A `refinery.lib.structures.Struct` class can be instantiated
    as follows:

        foo = Struct(data, bar=29)

    The initialization routine of the structure will be called with a single argument `reader`. If
    the object `data` is already a `refinery.lib.structures.StructReader`, then it will be passed
    as `reader`. Otherwise, the argument will be wrapped in a `refinery.lib.structures.StructReader`.
    Additional arguments to the struct are passed through.
    """
    _data: Union[memoryview, bytearray]

    def __len__(self):
        return len(self._data)

    def __bytes__(self):
        return bytes(self._data)

    def get_data(self, decouple=False):
        if decouple and isinstance(self._data, memoryview):
            self._data = bytearray(self._data)
        return self._data

    def __init__(self, reader: StructReader, *args, **kwargs):
        pass

Subclasses

Methods

def get_data(self, decouple=False)
Expand source code Browse git
def get_data(self, decouple=False):
    if decouple and isinstance(self._data, memoryview):
        self._data = bytearray(self._data)
    return self._data
class PerInstanceAttribute

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def getitem(self, key: KT) -> VT: … # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

Expand source code Browse git
class PerInstanceAttribute(Generic[AttrType]):
    def resolve(self, parent, value: Any) -> AttrType:
        return value

    def __init__(self):
        self.__set: Dict[int, Any] = {}
        self.__get: Dict[int, AttrType] = {}

    def __set__(self, parent: Any, value: Any) -> None:
        pid = id(parent)
        if pid not in self.__set:
            def cleanup(self, pid):
                self.__set.pop(pid, None)
                self.__get.pop(pid, None)
            self.__set[pid] = value
            weakref.finalize(parent, cleanup, self, id(parent))

    def __get__(self, parent, tp=None) -> AttrType:
        pid = id(parent)
        if pid not in self.__get:
            try:
                seed = self.__set[pid]
            except KeyError as K:
                raise AttributeError from K
            self.__get[pid] = self.resolve(parent, seed)
        return self.__get[pid]

Ancestors

  • typing.Generic

Subclasses

  • refinery.lib.java.Index

Methods

def resolve(self, parent, value)
Expand source code Browse git
def resolve(self, parent, value: Any) -> AttrType:
    return value