Module refinery.lib.un7z.headers

Low-level 7z format header parsing using StructReader.

Expand source code Browse git
"""
Low-level 7z format header parsing using StructReader.
"""
from __future__ import annotations

import dataclasses
import enum
import zlib

from datetime import datetime, timedelta, timezone

from refinery.lib.structures import EOF, StructReader

SIGNATURE = B'7z\xBC\xAF\x27\x1C'
SIGNATURE_HEADER_SIZE = 32


class SzException(Exception):
    pass


class SzCannotUnpack(SzException):
    pass


class SzPasswordRequired(SzCannotUnpack):
    pass


class SzInvalidPassword(SzCannotUnpack):
    pass


class SzCorruptArchive(SzCannotUnpack):
    pass


class SzUnsupportedMethod(SzException):
    pass


class PropertyID(int, enum.Enum):
    END = 0x00
    HEADER = 0x01
    ARCHIVE_PROPS = 0x02
    ADDITIONAL_STREAM = 0x03
    MAIN_STREAM = 0x04
    FILES_INFO = 0x05
    PACK_INFO = 0x06
    UNPACK_INFO = 0x07
    SUBSTREAMS_INFO = 0x08
    SIZE = 0x09
    CRC = 0x0A
    FOLDER = 0x0B
    CODER_UNPACK_SIZE = 0x0C
    NUM_UNPACK_STREAM = 0x0D
    EMPTY_STREAM = 0x0E
    EMPTY_FILE = 0x0F
    ANTI = 0x10
    NAME = 0x11
    CTIME = 0x12
    ATIME = 0x13
    MTIME = 0x14
    WIN_ATTRIBUTES = 0x15
    COMMENT = 0x16
    ENCODED_HEADER = 0x17
    START_POS = 0x18
    DUMMY = 0x19


@dataclasses.dataclass
class SignatureHeader:
    major_version: int
    minor_version: int
    start_header_crc: int
    next_header_offset: int
    next_header_size: int
    next_header_crc: int

    @property
    def archive_size(self) -> int:
        return SIGNATURE_HEADER_SIZE + self.next_header_offset + self.next_header_size


@dataclasses.dataclass
class Coder:
    codec_id: bytes
    num_in_streams: int
    num_out_streams: int
    properties: bytes


@dataclasses.dataclass
class BindPair:
    in_index: int
    out_index: int


@dataclasses.dataclass
class Folder:
    coders: list[Coder]
    bind_pairs: list[BindPair]
    packed_indices: list[int]
    unpack_sizes: list[int]
    crc: int | None = None

    @property
    def total_in_streams(self) -> int:
        return sum(c.num_in_streams for c in self.coders)

    @property
    def total_out_streams(self) -> int:
        return sum(c.num_out_streams for c in self.coders)

    @property
    def main_unpack_size(self) -> int:
        main_index = self._find_main_out_stream()
        return self.unpack_sizes[main_index]

    def _find_main_out_stream(self) -> int:
        bound_out = {bp.out_index for bp in self.bind_pairs}
        for i in range(self.total_out_streams):
            if i not in bound_out:
                return i
        return 0


@dataclasses.dataclass
class PackInfo:
    pack_pos: int
    num_pack_streams: int
    sizes: list[int]
    crcs: list[int | None]


@dataclasses.dataclass
class SubstreamsInfo:
    num_unpack_streams: list[int]
    unpack_sizes: list[int]
    crcs: list[int | None]


@dataclasses.dataclass
class FileEntry:
    name: str = ''
    size: int = 0
    crc: int | None = None
    has_stream: bool = True
    is_dir: bool = False
    is_anti: bool = False
    ctime: datetime | None = None
    atime: datetime | None = None
    mtime: datetime | None = None
    attributes: int = 0


@dataclasses.dataclass
class ArchiveHeader:
    pack_info: PackInfo | None = None
    folders: list[Folder] = dataclasses.field(default_factory=list)
    substreams: SubstreamsInfo | None = None
    files: list[FileEntry] = dataclasses.field(default_factory=list)


def read_7z_uint64(reader: StructReader) -> int:
    first = reader.u8()
    mask = 0x80
    value = 0
    for i in range(8):
        if first & mask == 0:
            value |= ((first & (mask - 1)) << (8 * i))
            return value
        value |= reader.u8() << (8 * i)
        mask >>= 1
    return value


def read_booleans(reader: StructReader, count: int, check_all_defined: bool = True) -> list[bool]:
    if check_all_defined:
        all_defined = reader.u8()
        if all_defined:
            return [True] * count
    result = []
    b = 0
    mask = 0
    for _ in range(count):
        if mask == 0:
            b = reader.u8()
            mask = 0x80
        result.append(bool(b & mask))
        mask >>= 1
    return result


def read_crcs(reader: StructReader, count: int) -> list[int | None]:
    defined = read_booleans(reader, count)
    crcs: list[int | None] = []
    for d in defined:
        if d:
            crcs.append(reader.u32())
        else:
            crcs.append(None)
    return crcs


_FILETIME_EPOCH = datetime(1601, 1, 1, tzinfo=timezone.utc)


def _filetime_to_datetime(ft: int) -> datetime | None:
    if ft == 0:
        return None
    try:
        return _FILETIME_EPOCH + timedelta(microseconds=ft // 10)
    except (OverflowError, OSError):
        return None


def parse_signature_header(data: bytes | bytearray | memoryview) -> SignatureHeader:
    view = memoryview(data)
    if view[:6] != SIGNATURE:
        raise SzCorruptArchive('Invalid 7z signature.')
    reader = StructReader(view)
    reader.seekset(6)
    major = reader.u8()
    minor = reader.u8()
    start_crc = reader.u32()
    computed_crc = zlib.crc32(view[12:32]) & 0xFFFFFFFF
    if computed_crc != start_crc:
        raise SzCorruptArchive(
            F'Start header CRC mismatch: expected {start_crc:#010x},'
            F' computed {computed_crc:#010x}.')
    next_header_offset = reader.u64()
    next_header_size = reader.u64()
    next_header_crc = reader.u32()
    return SignatureHeader(
        major_version=major,
        minor_version=minor,
        start_header_crc=start_crc,
        next_header_offset=next_header_offset,
        next_header_size=next_header_size,
        next_header_crc=next_header_crc,
    )


def _parse_pack_info(reader: StructReader) -> PackInfo:
    pack_pos = read_7z_uint64(reader)
    num_pack_streams = read_7z_uint64(reader)
    sizes: list[int] = []
    crcs: list[int | None] = [None] * num_pack_streams
    while True:
        prop_id = reader.u8()
        if prop_id == PropertyID.END:
            break
        if prop_id == PropertyID.SIZE:
            for _ in range(num_pack_streams):
                sizes.append(read_7z_uint64(reader))
        elif prop_id == PropertyID.CRC:
            crcs = read_crcs(reader, num_pack_streams)
        else:
            skip = read_7z_uint64(reader)
            reader.seekrel(skip)
    if not sizes:
        sizes = [0] * num_pack_streams
    return PackInfo(
        pack_pos=pack_pos,
        num_pack_streams=num_pack_streams,
        sizes=sizes,
        crcs=crcs,
    )


def _parse_folder(reader: StructReader) -> Folder:
    num_coders = read_7z_uint64(reader)
    coders: list[Coder] = []
    total_in = 0
    total_out = 0
    for _ in range(num_coders):
        flags = reader.u8()
        codec_size = flags & 0x0F
        is_complex = bool(flags & 0x10)
        has_attrs = bool(flags & 0x20)
        codec_id = bytes(reader.read_exactly(codec_size))
        if is_complex:
            num_in = read_7z_uint64(reader)
            num_out = read_7z_uint64(reader)
        else:
            num_in = 1
            num_out = 1
        props = b''
        if has_attrs:
            props_size = read_7z_uint64(reader)
            props = reader.read_bytes(props_size)
        coders.append(Coder(
            codec_id=codec_id,
            num_in_streams=num_in,
            num_out_streams=num_out,
            properties=props,
        ))
        total_in += num_in
        total_out += num_out
    bind_pairs: list[BindPair] = []
    for _ in range(total_out - 1):
        bind_pairs.append(BindPair(
            in_index=read_7z_uint64(reader),
            out_index=read_7z_uint64(reader),
        ))
    num_packed = total_in - (total_out - 1)
    packed_indices: list[int] = []
    if num_packed == 1:
        bound_in = {bp.in_index for bp in bind_pairs}
        for i in range(total_in):
            if i not in bound_in:
                packed_indices.append(i)
                break
    else:
        for _ in range(num_packed):
            packed_indices.append(read_7z_uint64(reader))
    return Folder(
        coders=coders,
        bind_pairs=bind_pairs,
        packed_indices=packed_indices,
        unpack_sizes=[],
    )


def _parse_unpack_info(reader: StructReader) -> list[Folder]:
    prop_id = reader.u8()
    if prop_id != PropertyID.FOLDER:
        raise SzCorruptArchive(F'Expected FOLDER property, got {prop_id:#x}.')
    num_folders = read_7z_uint64(reader)
    external = reader.u8()
    if external:
        raise SzUnsupportedMethod('External folder references are not supported.')
    folders: list[Folder] = []
    for _ in range(num_folders):
        folders.append(_parse_folder(reader))
    prop_id = reader.u8()
    if prop_id != PropertyID.CODER_UNPACK_SIZE:
        raise SzCorruptArchive(F'Expected CODER_UNPACK_SIZE, got {prop_id:#x}.')
    for folder in folders:
        folder.unpack_sizes = []
        for _ in range(folder.total_out_streams):
            folder.unpack_sizes.append(read_7z_uint64(reader))
    while True:
        prop_id = reader.u8()
        if prop_id == PropertyID.END:
            break
        if prop_id == PropertyID.CRC:
            crcs = read_crcs(reader, num_folders)
            for i, folder in enumerate(folders):
                folder.crc = crcs[i]
        else:
            skip = read_7z_uint64(reader)
            reader.seekrel(skip)
    return folders


def _parse_substreams_info(
    reader: StructReader,
    folders: list[Folder],
) -> SubstreamsInfo:
    num_unpack_streams = [1] * len(folders)
    unpack_sizes: list[int] = []
    crcs: list[int | None] = []
    while True:
        prop_id = reader.u8()
        if prop_id == PropertyID.END:
            break
        if prop_id == PropertyID.NUM_UNPACK_STREAM:
            for i in range(len(folders)):
                num_unpack_streams[i] = read_7z_uint64(reader)
        elif prop_id == PropertyID.SIZE:
            for i, folder in enumerate(folders):
                ns = num_unpack_streams[i]
                subtotal = 0
                for _ in range(ns - 1):
                    s = read_7z_uint64(reader)
                    unpack_sizes.append(s)
                    subtotal += s
                unpack_sizes.append(folder.main_unpack_size - subtotal)
        elif prop_id == PropertyID.CRC:
            num_crc_streams = 0
            for i, folder in enumerate(folders):
                ns = num_unpack_streams[i]
                if ns == 1 and folder.crc is not None:
                    continue
                num_crc_streams += ns
            crcs = read_crcs(reader, num_crc_streams)
        else:
            skip = read_7z_uint64(reader)
            reader.seekrel(skip)
    if not unpack_sizes:
        for i, folder in enumerate(folders):
            ns = num_unpack_streams[i]
            if ns == 1:
                unpack_sizes.append(folder.main_unpack_size)
            else:
                for _ in range(ns):
                    unpack_sizes.append(0)
    return SubstreamsInfo(
        num_unpack_streams=num_unpack_streams,
        unpack_sizes=unpack_sizes,
        crcs=crcs,
    )


def _parse_files_info(reader: StructReader, num_files: int) -> list[FileEntry]:
    files = [FileEntry() for _ in range(num_files)]
    while True:
        try:
            prop_id = reader.u8()
        except EOF:
            break
        if prop_id == PropertyID.END:
            break
        size = read_7z_uint64(reader)
        pos_before = reader.tell()
        if prop_id == PropertyID.NAME:
            external = reader.u8()
            if external:
                reader.seekset(pos_before + size)
                continue
            for f in files:
                name_bytes = bytearray()
                while True:
                    c = reader.read_exactly(2)
                    if c[0] == 0 and c[1] == 0:
                        break
                    name_bytes.extend(c)
                f.name = name_bytes.decode('utf-16-le')
        elif prop_id == PropertyID.EMPTY_STREAM:
            empty_streams = read_booleans(reader, num_files, check_all_defined=False)
            for i, es in enumerate(empty_streams):
                if es:
                    files[i].has_stream = False
        elif prop_id == PropertyID.EMPTY_FILE:
            empty_count = sum(1 for f in files if not f.has_stream)
            empty_file_flags = read_booleans(reader, empty_count, check_all_defined=False)
            idx = 0
            for f in files:
                if not f.has_stream:
                    if idx < len(empty_file_flags) and empty_file_flags[idx]:
                        f.is_dir = False
                    else:
                        f.is_dir = True
                    idx += 1
        elif prop_id == PropertyID.ANTI:
            anti_count = sum(1 for f in files if not f.has_stream)
            anti_flags = read_booleans(reader, anti_count, check_all_defined=False)
            idx = 0
            for f in files:
                if not f.has_stream:
                    if idx < len(anti_flags):
                        f.is_anti = anti_flags[idx]
                    idx += 1
        elif prop_id in (PropertyID.CTIME, PropertyID.ATIME, PropertyID.MTIME):
            defined = read_booleans(reader, num_files)
            external = reader.u8()
            if external:
                reader.seekset(pos_before + size)
                continue
            for i, d in enumerate(defined):
                if d:
                    ft = reader.u64()
                    dt = _filetime_to_datetime(ft)
                    if prop_id == PropertyID.CTIME:
                        files[i].ctime = dt
                    elif prop_id == PropertyID.ATIME:
                        files[i].atime = dt
                    else:
                        files[i].mtime = dt
        elif prop_id == PropertyID.WIN_ATTRIBUTES:
            defined = read_booleans(reader, num_files)
            external = reader.u8()
            if external:
                reader.seekset(pos_before + size)
                continue
            for i, d in enumerate(defined):
                if d:
                    files[i].attributes = reader.u32()
                    if files[i].attributes & 0x10:
                        files[i].is_dir = True
        elif prop_id == PropertyID.DUMMY:
            reader.seekset(pos_before + size)
        else:
            reader.seekset(pos_before + size)
    empty_stream_files = [f for f in files if not f.has_stream]
    for f in empty_stream_files:
        if not f.is_dir and f.name and f.name.endswith('/'):
            f.is_dir = True
    return files


def parse_header(reader: StructReader) -> ArchiveHeader:
    header = ArchiveHeader()
    while True:
        try:
            prop_id = reader.u8()
        except EOF:
            break
        if prop_id == PropertyID.END:
            break
        if prop_id == PropertyID.ARCHIVE_PROPS:
            while True:
                p = reader.u8()
                if p == PropertyID.END:
                    break
                skip = read_7z_uint64(reader)
                reader.seekrel(skip)
        elif prop_id == PropertyID.ADDITIONAL_STREAM:
            skip = read_7z_uint64(reader)
            reader.seekrel(skip)
        elif prop_id == PropertyID.MAIN_STREAM:
            inner = _parse_main_streams_info(reader)
            header.pack_info = inner.pack_info
            header.folders = inner.folders
            header.substreams = inner.substreams
        elif prop_id == PropertyID.FILES_INFO:
            num_files = read_7z_uint64(reader)
            header.files = _parse_files_info(reader, num_files)
        else:
            skip = read_7z_uint64(reader)
            reader.seekrel(skip)
    return header


def _parse_main_streams_info(reader: StructReader) -> ArchiveHeader:
    header = ArchiveHeader()
    while True:
        prop_id = reader.u8()
        if prop_id == PropertyID.END:
            break
        if prop_id == PropertyID.PACK_INFO:
            header.pack_info = _parse_pack_info(reader)
        elif prop_id == PropertyID.UNPACK_INFO:
            header.folders = _parse_unpack_info(reader)
        elif prop_id == PropertyID.SUBSTREAMS_INFO:
            header.substreams = _parse_substreams_info(reader, header.folders)
        else:
            skip = read_7z_uint64(reader)
            reader.seekrel(skip)
    return header


def parse_encoded_header(reader: StructReader) -> ArchiveHeader:
    return _parse_main_streams_info(reader)

Functions

def read_7z_uint64(reader)
Expand source code Browse git
def read_7z_uint64(reader: StructReader) -> int:
    first = reader.u8()
    mask = 0x80
    value = 0
    for i in range(8):
        if first & mask == 0:
            value |= ((first & (mask - 1)) << (8 * i))
            return value
        value |= reader.u8() << (8 * i)
        mask >>= 1
    return value
def read_booleans(reader, count, check_all_defined=True)
Expand source code Browse git
def read_booleans(reader: StructReader, count: int, check_all_defined: bool = True) -> list[bool]:
    if check_all_defined:
        all_defined = reader.u8()
        if all_defined:
            return [True] * count
    result = []
    b = 0
    mask = 0
    for _ in range(count):
        if mask == 0:
            b = reader.u8()
            mask = 0x80
        result.append(bool(b & mask))
        mask >>= 1
    return result
def read_crcs(reader, count)
Expand source code Browse git
def read_crcs(reader: StructReader, count: int) -> list[int | None]:
    defined = read_booleans(reader, count)
    crcs: list[int | None] = []
    for d in defined:
        if d:
            crcs.append(reader.u32())
        else:
            crcs.append(None)
    return crcs
def parse_signature_header(data)
Expand source code Browse git
def parse_signature_header(data: bytes | bytearray | memoryview) -> SignatureHeader:
    view = memoryview(data)
    if view[:6] != SIGNATURE:
        raise SzCorruptArchive('Invalid 7z signature.')
    reader = StructReader(view)
    reader.seekset(6)
    major = reader.u8()
    minor = reader.u8()
    start_crc = reader.u32()
    computed_crc = zlib.crc32(view[12:32]) & 0xFFFFFFFF
    if computed_crc != start_crc:
        raise SzCorruptArchive(
            F'Start header CRC mismatch: expected {start_crc:#010x},'
            F' computed {computed_crc:#010x}.')
    next_header_offset = reader.u64()
    next_header_size = reader.u64()
    next_header_crc = reader.u32()
    return SignatureHeader(
        major_version=major,
        minor_version=minor,
        start_header_crc=start_crc,
        next_header_offset=next_header_offset,
        next_header_size=next_header_size,
        next_header_crc=next_header_crc,
    )
def parse_header(reader)
Expand source code Browse git
def parse_header(reader: StructReader) -> ArchiveHeader:
    header = ArchiveHeader()
    while True:
        try:
            prop_id = reader.u8()
        except EOF:
            break
        if prop_id == PropertyID.END:
            break
        if prop_id == PropertyID.ARCHIVE_PROPS:
            while True:
                p = reader.u8()
                if p == PropertyID.END:
                    break
                skip = read_7z_uint64(reader)
                reader.seekrel(skip)
        elif prop_id == PropertyID.ADDITIONAL_STREAM:
            skip = read_7z_uint64(reader)
            reader.seekrel(skip)
        elif prop_id == PropertyID.MAIN_STREAM:
            inner = _parse_main_streams_info(reader)
            header.pack_info = inner.pack_info
            header.folders = inner.folders
            header.substreams = inner.substreams
        elif prop_id == PropertyID.FILES_INFO:
            num_files = read_7z_uint64(reader)
            header.files = _parse_files_info(reader, num_files)
        else:
            skip = read_7z_uint64(reader)
            reader.seekrel(skip)
    return header
def parse_encoded_header(reader)
Expand source code Browse git
def parse_encoded_header(reader: StructReader) -> ArchiveHeader:
    return _parse_main_streams_info(reader)

Classes

class SzException (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code Browse git
class SzException(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

class SzCannotUnpack (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code Browse git
class SzCannotUnpack(SzException):
    pass

Ancestors

  • SzException
  • builtins.Exception
  • builtins.BaseException

Subclasses

class SzPasswordRequired (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code Browse git
class SzPasswordRequired(SzCannotUnpack):
    pass

Ancestors

class SzInvalidPassword (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code Browse git
class SzInvalidPassword(SzCannotUnpack):
    pass

Ancestors

class SzCorruptArchive (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code Browse git
class SzCorruptArchive(SzCannotUnpack):
    pass

Ancestors

class SzUnsupportedMethod (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code Browse git
class SzUnsupportedMethod(SzException):
    pass

Ancestors

  • SzException
  • builtins.Exception
  • builtins.BaseException
class PropertyID (*args, **kwds)

int([x]) -> integer int(x, base=10) -> integer

Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating-point numbers, this truncates towards zero.

If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.

>>> int('0b100', base=0)
4
Expand source code Browse git
class PropertyID(int, enum.Enum):
    END = 0x00
    HEADER = 0x01
    ARCHIVE_PROPS = 0x02
    ADDITIONAL_STREAM = 0x03
    MAIN_STREAM = 0x04
    FILES_INFO = 0x05
    PACK_INFO = 0x06
    UNPACK_INFO = 0x07
    SUBSTREAMS_INFO = 0x08
    SIZE = 0x09
    CRC = 0x0A
    FOLDER = 0x0B
    CODER_UNPACK_SIZE = 0x0C
    NUM_UNPACK_STREAM = 0x0D
    EMPTY_STREAM = 0x0E
    EMPTY_FILE = 0x0F
    ANTI = 0x10
    NAME = 0x11
    CTIME = 0x12
    ATIME = 0x13
    MTIME = 0x14
    WIN_ATTRIBUTES = 0x15
    COMMENT = 0x16
    ENCODED_HEADER = 0x17
    START_POS = 0x18
    DUMMY = 0x19

Ancestors

  • builtins.int
  • enum.Enum

Class variables

var END

The type of the None singleton.

var HEADER

The type of the None singleton.

var ARCHIVE_PROPS

The type of the None singleton.

var ADDITIONAL_STREAM

The type of the None singleton.

var MAIN_STREAM

The type of the None singleton.

var FILES_INFO

The type of the None singleton.

var PACK_INFO

The type of the None singleton.

var UNPACK_INFO

The type of the None singleton.

var SUBSTREAMS_INFO

The type of the None singleton.

var SIZE

The type of the None singleton.

var CRC

The type of the None singleton.

var FOLDER

The type of the None singleton.

var CODER_UNPACK_SIZE

The type of the None singleton.

var NUM_UNPACK_STREAM

The type of the None singleton.

var EMPTY_STREAM

The type of the None singleton.

var EMPTY_FILE

The type of the None singleton.

var ANTI

The type of the None singleton.

var NAME

The type of the None singleton.

var CTIME

The type of the None singleton.

var ATIME

The type of the None singleton.

var MTIME

The type of the None singleton.

var WIN_ATTRIBUTES

The type of the None singleton.

var COMMENT

The type of the None singleton.

var ENCODED_HEADER

The type of the None singleton.

var START_POS

The type of the None singleton.

var DUMMY

The type of the None singleton.

class SignatureHeader (major_version, minor_version, start_header_crc, next_header_offset, next_header_size, next_header_crc)

SignatureHeader(major_version: 'int', minor_version: 'int', start_header_crc: 'int', next_header_offset: 'int', next_header_size: 'int', next_header_crc: 'int')

Expand source code Browse git
@dataclasses.dataclass
class SignatureHeader:
    major_version: int
    minor_version: int
    start_header_crc: int
    next_header_offset: int
    next_header_size: int
    next_header_crc: int

    @property
    def archive_size(self) -> int:
        return SIGNATURE_HEADER_SIZE + self.next_header_offset + self.next_header_size

Instance variables

var major_version

The type of the None singleton.

var minor_version

The type of the None singleton.

var start_header_crc

The type of the None singleton.

var next_header_offset

The type of the None singleton.

var next_header_size

The type of the None singleton.

var next_header_crc

The type of the None singleton.

var archive_size
Expand source code Browse git
@property
def archive_size(self) -> int:
    return SIGNATURE_HEADER_SIZE + self.next_header_offset + self.next_header_size
class Coder (codec_id, num_in_streams, num_out_streams, properties)

Coder(codec_id: 'bytes', num_in_streams: 'int', num_out_streams: 'int', properties: 'bytes')

Expand source code Browse git
@dataclasses.dataclass
class Coder:
    codec_id: bytes
    num_in_streams: int
    num_out_streams: int
    properties: bytes

Instance variables

var codec_id

The type of the None singleton.

var num_in_streams

The type of the None singleton.

var num_out_streams

The type of the None singleton.

var properties

The type of the None singleton.

class BindPair (in_index, out_index)

BindPair(in_index: 'int', out_index: 'int')

Expand source code Browse git
@dataclasses.dataclass
class BindPair:
    in_index: int
    out_index: int

Instance variables

var in_index

The type of the None singleton.

var out_index

The type of the None singleton.

class Folder (coders, bind_pairs, packed_indices, unpack_sizes, crc=None)

Folder(coders: 'list[Coder]', bind_pairs: 'list[BindPair]', packed_indices: 'list[int]', unpack_sizes: 'list[int]', crc: 'int | None' = None)

Expand source code Browse git
@dataclasses.dataclass
class Folder:
    coders: list[Coder]
    bind_pairs: list[BindPair]
    packed_indices: list[int]
    unpack_sizes: list[int]
    crc: int | None = None

    @property
    def total_in_streams(self) -> int:
        return sum(c.num_in_streams for c in self.coders)

    @property
    def total_out_streams(self) -> int:
        return sum(c.num_out_streams for c in self.coders)

    @property
    def main_unpack_size(self) -> int:
        main_index = self._find_main_out_stream()
        return self.unpack_sizes[main_index]

    def _find_main_out_stream(self) -> int:
        bound_out = {bp.out_index for bp in self.bind_pairs}
        for i in range(self.total_out_streams):
            if i not in bound_out:
                return i
        return 0

Instance variables

var coders

The type of the None singleton.

var bind_pairs

The type of the None singleton.

var packed_indices

The type of the None singleton.

var unpack_sizes

The type of the None singleton.

var crc

The type of the None singleton.

var total_in_streams
Expand source code Browse git
@property
def total_in_streams(self) -> int:
    return sum(c.num_in_streams for c in self.coders)
var total_out_streams
Expand source code Browse git
@property
def total_out_streams(self) -> int:
    return sum(c.num_out_streams for c in self.coders)
var main_unpack_size
Expand source code Browse git
@property
def main_unpack_size(self) -> int:
    main_index = self._find_main_out_stream()
    return self.unpack_sizes[main_index]
class PackInfo (pack_pos, num_pack_streams, sizes, crcs)

PackInfo(pack_pos: 'int', num_pack_streams: 'int', sizes: 'list[int]', crcs: 'list[int | None]')

Expand source code Browse git
@dataclasses.dataclass
class PackInfo:
    pack_pos: int
    num_pack_streams: int
    sizes: list[int]
    crcs: list[int | None]

Instance variables

var pack_pos

The type of the None singleton.

var num_pack_streams

The type of the None singleton.

var sizes

The type of the None singleton.

var crcs

The type of the None singleton.

class SubstreamsInfo (num_unpack_streams, unpack_sizes, crcs)

SubstreamsInfo(num_unpack_streams: 'list[int]', unpack_sizes: 'list[int]', crcs: 'list[int | None]')

Expand source code Browse git
@dataclasses.dataclass
class SubstreamsInfo:
    num_unpack_streams: list[int]
    unpack_sizes: list[int]
    crcs: list[int | None]

Instance variables

var num_unpack_streams

The type of the None singleton.

var unpack_sizes

The type of the None singleton.

var crcs

The type of the None singleton.

class FileEntry (name='', size=0, crc=None, has_stream=True, is_dir=False, is_anti=False, ctime=None, atime=None, mtime=None, attributes=0)

FileEntry(name: 'str' = '', size: 'int' = 0, crc: 'int | None' = None, has_stream: 'bool' = True, is_dir: 'bool' = False, is_anti: 'bool' = False, ctime: 'datetime | None' = None, atime: 'datetime | None' = None, mtime: 'datetime | None' = None, attributes: 'int' = 0)

Expand source code Browse git
@dataclasses.dataclass
class FileEntry:
    name: str = ''
    size: int = 0
    crc: int | None = None
    has_stream: bool = True
    is_dir: bool = False
    is_anti: bool = False
    ctime: datetime | None = None
    atime: datetime | None = None
    mtime: datetime | None = None
    attributes: int = 0

Instance variables

var name

The type of the None singleton.

var size

The type of the None singleton.

var crc

The type of the None singleton.

var has_stream

The type of the None singleton.

var is_dir

The type of the None singleton.

var is_anti

The type of the None singleton.

var ctime

The type of the None singleton.

var atime

The type of the None singleton.

var mtime

The type of the None singleton.

var attributes

The type of the None singleton.

class ArchiveHeader (pack_info=None, folders=<factory>, substreams=None, files=<factory>)

ArchiveHeader(pack_info: 'PackInfo | None' = None, folders: 'list[Folder]' = , substreams: 'SubstreamsInfo | None' = None, files: 'list[FileEntry]' = )

Expand source code Browse git
@dataclasses.dataclass
class ArchiveHeader:
    pack_info: PackInfo | None = None
    folders: list[Folder] = dataclasses.field(default_factory=list)
    substreams: SubstreamsInfo | None = None
    files: list[FileEntry] = dataclasses.field(default_factory=list)

Instance variables

var folders

The type of the None singleton.

var files

The type of the None singleton.

var pack_info

The type of the None singleton.

var substreams

The type of the None singleton.