Module refinery.lib.un7z

Pure-Python 7z archive parser and extractor.

Expand source code Browse git
"""
Pure-Python 7z archive parser and extractor.
"""
from __future__ import annotations

import zlib

from datetime import datetime

from refinery.lib.structures import StructReader
from refinery.lib.un7z.coders import (
    CODEC_AES256SHA256,
    decompress_folder,
)
from refinery.lib.un7z.headers import (
    SIGNATURE,
    SIGNATURE_HEADER_SIZE,
    ArchiveHeader,
    FileEntry,
    PropertyID,
    SignatureHeader,
    SzCannotUnpack,
    SzCorruptArchive,
    SzException,
    SzInvalidPassword,
    SzPasswordRequired,
    SzUnsupportedMethod,
    parse_encoded_header,
    parse_header,
    parse_signature_header,
)

__all__ = [
    'SIGNATURE',
    'SIGNATURE_HEADER_SIZE',
    'SzArchive',
    'SzCorruptArchive',
    'SzException',
    'SzFile',
    'SzInvalidPassword',
    'SzPasswordRequired',
    'SzUnsupportedMethod',
    'SzCannotUnpack',
]


class SzFile:
    def __init__(
        self,
        entry: FileEntry,
        archive: SzArchive,
        folder_index: int | None,
        stream_index: int | None,
    ):
        self._entry = entry
        self._archive = archive
        self._folder_index = folder_index
        self._stream_index = stream_index

    @property
    def name(self) -> str:
        return self._entry.name

    @property
    def size(self) -> int:
        return self._entry.size

    @property
    def crc(self) -> int | None:
        return self._entry.crc

    @property
    def is_dir(self) -> bool:
        return self._entry.is_dir

    @property
    def mtime(self) -> datetime | None:
        return self._entry.mtime

    @property
    def ctime(self) -> datetime | None:
        return self._entry.ctime

    @property
    def atime(self) -> datetime | None:
        return self._entry.atime

    @property
    def attributes(self) -> int:
        return self._entry.attributes

    def decompress(self, password: str | bytes | None = None) -> bytes | bytearray | memoryview:
        if self.is_dir:
            return b''
        if self._folder_index is None or self._stream_index is None:
            return b''
        pw = password or self._archive._password
        return self._archive._decompress_file(self._folder_index, self._stream_index, pw)


class SzArchive:
    def __init__(
        self,
        data: bytes | bytearray | memoryview,
        password: str | bytes | None = None,
    ):
        self._view = memoryview(data)
        self._password = password
        self._sig_header: SignatureHeader | None = None
        self._header: ArchiveHeader | None = None
        self._files: list[SzFile] = []
        self._folder_cache: dict[int, list[memoryview]] = {}
        self._parse()

    @property
    def signature_header(self) -> SignatureHeader:
        assert self._sig_header is not None
        return self._sig_header

    @property
    def files(self) -> list[SzFile]:
        return list(self._files)

    def _parse(self):
        self._sig_header = parse_signature_header(self._view)
        sh = self._sig_header
        header_offset = SIGNATURE_HEADER_SIZE + sh.next_header_offset
        header_end = header_offset + sh.next_header_size
        header_view = self._view[header_offset:header_end]
        crc = zlib.crc32(header_view) & 0xFFFFFFFF
        if crc != sh.next_header_crc:
            raise SzCorruptArchive(
                F'Next header CRC mismatch: expected {sh.next_header_crc:#010x},'
                F' computed {crc:#010x}.')
        reader = StructReader(header_view)
        prop_id = reader.u8()
        if prop_id == PropertyID.HEADER:
            self._header = parse_header(reader)
        elif prop_id == PropertyID.ENCODED_HEADER:
            self._header = self._decode_encoded_header(reader)
        else:
            raise SzCorruptArchive(F'Unexpected top-level property: {prop_id:#x}')
        self._build_file_list()

    def _decode_encoded_header(self, reader: StructReader) -> ArchiveHeader:
        enc_header = parse_encoded_header(reader)
        if enc_header.pack_info is None or not enc_header.folders:
            raise SzCorruptArchive('Encoded header has no pack info or folders.')
        pack_offset = SIGNATURE_HEADER_SIZE + enc_header.pack_info.pack_pos
        packed_streams: list[memoryview] = []
        offset = pack_offset
        for size in enc_header.pack_info.sizes:
            packed_streams.append(self._view[offset:offset + size])
            offset += size
        folder = enc_header.folders[0]
        unpack_size = folder.main_unpack_size
        header_data = decompress_folder(folder, packed_streams, unpack_size, self._password)
        inner_reader = StructReader(memoryview(header_data))
        inner_prop = inner_reader.u8()
        if inner_prop == PropertyID.HEADER:
            return parse_header(inner_reader)
        elif inner_prop == PropertyID.ENCODED_HEADER:
            return self._decode_encoded_header(inner_reader)
        else:
            raise SzCorruptArchive(F'Unexpected property in decoded header: {inner_prop:#x}')

    def _build_file_list(self):
        assert self._header is not None
        header = self._header
        files = header.files
        folders = header.folders
        substreams = header.substreams
        num_unpack_per_folder: list[int] = []
        if substreams:
            num_unpack_per_folder = substreams.num_unpack_streams
        else:
            num_unpack_per_folder = [1] * len(folders)
        file_idx = 0
        stream_offset = 0
        for fi, folder in enumerate(folders):
            ns = num_unpack_per_folder[fi] if fi < len(num_unpack_per_folder) else 1
            for si in range(ns):
                while file_idx < len(files) and not files[file_idx].has_stream:
                    entry = files[file_idx]
                    self._files.append(SzFile(entry, self, None, None))
                    file_idx += 1
                if file_idx < len(files):
                    entry = files[file_idx]
                    if substreams and (stream_offset + si) < len(substreams.unpack_sizes):
                        entry.size = substreams.unpack_sizes[stream_offset + si]
                    elif ns == 1:
                        entry.size = folder.main_unpack_size
                    if substreams and substreams.crcs:
                        crc_idx = stream_offset + si
                        if crc_idx < len(substreams.crcs) and substreams.crcs[crc_idx] is not None:
                            entry.crc = substreams.crcs[crc_idx]
                        elif ns == 1 and folder.crc is not None:
                            entry.crc = folder.crc
                    elif ns == 1 and folder.crc is not None:
                        entry.crc = folder.crc
                    self._files.append(SzFile(entry, self, fi, si))
                    file_idx += 1
            stream_offset += ns
        while file_idx < len(files):
            entry = files[file_idx]
            self._files.append(SzFile(entry, self, None, None))
            file_idx += 1

    def _decompress_file(
        self,
        folder_index: int,
        stream_index: int,
        password: str | bytes | None = None,
    ) -> bytes:
        if folder_index in self._folder_cache:
            streams = self._folder_cache[folder_index]
            if stream_index < len(streams):
                return streams[stream_index]
        assert self._header is not None
        header = self._header
        folder = header.folders[folder_index]
        for coder in folder.coders:
            if coder.codec_id == CODEC_AES256SHA256:
                if password is None:
                    raise SzPasswordRequired('Password required for encrypted archive.')
                break
        pack_info = header.pack_info
        if pack_info is None:
            raise SzCorruptArchive('No pack info in header.')
        pack_offset = SIGNATURE_HEADER_SIZE + pack_info.pack_pos
        pack_start = 0
        for fi in range(folder_index):
            f = header.folders[fi]
            for _ in range(len(f.packed_indices)):
                if pack_start < len(pack_info.sizes):
                    pack_start += 1
        packed_streams: list[memoryview] = []
        for pi in range(len(folder.packed_indices)):
            idx = pack_start + pi
            if idx < len(pack_info.sizes):
                sz = pack_info.sizes[idx]
                stream_offset = pack_offset
                for k in range(idx):
                    stream_offset += pack_info.sizes[k]
                packed_streams.append(self._view[stream_offset:stream_offset + sz])
            else:
                packed_streams.append(memoryview(b''))
        unpack_size = folder.main_unpack_size
        try:
            raw = decompress_folder(folder, packed_streams, unpack_size, password)
        except SzPasswordRequired:
            raise
        except SzException:
            raise
        except Exception as e:
            if password is not None:
                raise SzInvalidPassword(str(e)) from e
            raise SzCorruptArchive(str(e)) from e
        substreams = header.substreams
        num_unpack: list[int] = []
        if substreams:
            num_unpack = substreams.num_unpack_streams
        else:
            num_unpack = [1] * len(header.folders)
        ns = num_unpack[folder_index] if folder_index < len(num_unpack) else 1
        if ns <= 1:
            self._folder_cache[folder_index] = [memoryview(raw)]
        else:
            stream_offset_global = sum(
                num_unpack[fi] for fi in range(folder_index)
            )
            raw_view = memoryview(raw)
            streams: list[memoryview] = []
            offset = 0
            for si in range(ns):
                if substreams and (stream_offset_global + si) < len(substreams.unpack_sizes):
                    sz = substreams.unpack_sizes[stream_offset_global + si]
                else:
                    sz = len(raw) - offset
                streams.append(raw_view[offset:offset + sz])
                offset += sz
            self._folder_cache[folder_index] = streams
        result = self._folder_cache[folder_index]
        if stream_index < len(result):
            data = result[stream_index]
            entry_crc = self._get_file_crc(folder_index, stream_index)
            if entry_crc is not None:
                computed = zlib.crc32(data) & 0xFFFFFFFF
                if computed != entry_crc:
                    raise SzCorruptArchive(
                        F'CRC mismatch for stream {stream_index} in folder {folder_index}:'
                        F' expected {entry_crc:#010x}, computed {computed:#010x}.')
            return bytes(data)
        raise SzCorruptArchive(
            F'Stream index {stream_index} out of range for folder {folder_index}.')

    def _get_file_crc(self, folder_index: int, stream_index: int) -> int | None:
        for f in self._files:
            if f._folder_index == folder_index and f._stream_index == stream_index:
                return f.crc
        return None

Sub-modules

refinery.lib.un7z.bcj2

BCJ2 decoder for 7z archives …

refinery.lib.un7z.coders

Decompression and filter chain for 7z archives …

refinery.lib.un7z.headers

Low-level 7z format header parsing using StructReader.

Classes

class SzArchive (data, password=None)
Expand source code Browse git
class SzArchive:
    def __init__(
        self,
        data: bytes | bytearray | memoryview,
        password: str | bytes | None = None,
    ):
        self._view = memoryview(data)
        self._password = password
        self._sig_header: SignatureHeader | None = None
        self._header: ArchiveHeader | None = None
        self._files: list[SzFile] = []
        self._folder_cache: dict[int, list[memoryview]] = {}
        self._parse()

    @property
    def signature_header(self) -> SignatureHeader:
        assert self._sig_header is not None
        return self._sig_header

    @property
    def files(self) -> list[SzFile]:
        return list(self._files)

    def _parse(self):
        self._sig_header = parse_signature_header(self._view)
        sh = self._sig_header
        header_offset = SIGNATURE_HEADER_SIZE + sh.next_header_offset
        header_end = header_offset + sh.next_header_size
        header_view = self._view[header_offset:header_end]
        crc = zlib.crc32(header_view) & 0xFFFFFFFF
        if crc != sh.next_header_crc:
            raise SzCorruptArchive(
                F'Next header CRC mismatch: expected {sh.next_header_crc:#010x},'
                F' computed {crc:#010x}.')
        reader = StructReader(header_view)
        prop_id = reader.u8()
        if prop_id == PropertyID.HEADER:
            self._header = parse_header(reader)
        elif prop_id == PropertyID.ENCODED_HEADER:
            self._header = self._decode_encoded_header(reader)
        else:
            raise SzCorruptArchive(F'Unexpected top-level property: {prop_id:#x}')
        self._build_file_list()

    def _decode_encoded_header(self, reader: StructReader) -> ArchiveHeader:
        enc_header = parse_encoded_header(reader)
        if enc_header.pack_info is None or not enc_header.folders:
            raise SzCorruptArchive('Encoded header has no pack info or folders.')
        pack_offset = SIGNATURE_HEADER_SIZE + enc_header.pack_info.pack_pos
        packed_streams: list[memoryview] = []
        offset = pack_offset
        for size in enc_header.pack_info.sizes:
            packed_streams.append(self._view[offset:offset + size])
            offset += size
        folder = enc_header.folders[0]
        unpack_size = folder.main_unpack_size
        header_data = decompress_folder(folder, packed_streams, unpack_size, self._password)
        inner_reader = StructReader(memoryview(header_data))
        inner_prop = inner_reader.u8()
        if inner_prop == PropertyID.HEADER:
            return parse_header(inner_reader)
        elif inner_prop == PropertyID.ENCODED_HEADER:
            return self._decode_encoded_header(inner_reader)
        else:
            raise SzCorruptArchive(F'Unexpected property in decoded header: {inner_prop:#x}')

    def _build_file_list(self):
        assert self._header is not None
        header = self._header
        files = header.files
        folders = header.folders
        substreams = header.substreams
        num_unpack_per_folder: list[int] = []
        if substreams:
            num_unpack_per_folder = substreams.num_unpack_streams
        else:
            num_unpack_per_folder = [1] * len(folders)
        file_idx = 0
        stream_offset = 0
        for fi, folder in enumerate(folders):
            ns = num_unpack_per_folder[fi] if fi < len(num_unpack_per_folder) else 1
            for si in range(ns):
                while file_idx < len(files) and not files[file_idx].has_stream:
                    entry = files[file_idx]
                    self._files.append(SzFile(entry, self, None, None))
                    file_idx += 1
                if file_idx < len(files):
                    entry = files[file_idx]
                    if substreams and (stream_offset + si) < len(substreams.unpack_sizes):
                        entry.size = substreams.unpack_sizes[stream_offset + si]
                    elif ns == 1:
                        entry.size = folder.main_unpack_size
                    if substreams and substreams.crcs:
                        crc_idx = stream_offset + si
                        if crc_idx < len(substreams.crcs) and substreams.crcs[crc_idx] is not None:
                            entry.crc = substreams.crcs[crc_idx]
                        elif ns == 1 and folder.crc is not None:
                            entry.crc = folder.crc
                    elif ns == 1 and folder.crc is not None:
                        entry.crc = folder.crc
                    self._files.append(SzFile(entry, self, fi, si))
                    file_idx += 1
            stream_offset += ns
        while file_idx < len(files):
            entry = files[file_idx]
            self._files.append(SzFile(entry, self, None, None))
            file_idx += 1

    def _decompress_file(
        self,
        folder_index: int,
        stream_index: int,
        password: str | bytes | None = None,
    ) -> bytes:
        if folder_index in self._folder_cache:
            streams = self._folder_cache[folder_index]
            if stream_index < len(streams):
                return streams[stream_index]
        assert self._header is not None
        header = self._header
        folder = header.folders[folder_index]
        for coder in folder.coders:
            if coder.codec_id == CODEC_AES256SHA256:
                if password is None:
                    raise SzPasswordRequired('Password required for encrypted archive.')
                break
        pack_info = header.pack_info
        if pack_info is None:
            raise SzCorruptArchive('No pack info in header.')
        pack_offset = SIGNATURE_HEADER_SIZE + pack_info.pack_pos
        pack_start = 0
        for fi in range(folder_index):
            f = header.folders[fi]
            for _ in range(len(f.packed_indices)):
                if pack_start < len(pack_info.sizes):
                    pack_start += 1
        packed_streams: list[memoryview] = []
        for pi in range(len(folder.packed_indices)):
            idx = pack_start + pi
            if idx < len(pack_info.sizes):
                sz = pack_info.sizes[idx]
                stream_offset = pack_offset
                for k in range(idx):
                    stream_offset += pack_info.sizes[k]
                packed_streams.append(self._view[stream_offset:stream_offset + sz])
            else:
                packed_streams.append(memoryview(b''))
        unpack_size = folder.main_unpack_size
        try:
            raw = decompress_folder(folder, packed_streams, unpack_size, password)
        except SzPasswordRequired:
            raise
        except SzException:
            raise
        except Exception as e:
            if password is not None:
                raise SzInvalidPassword(str(e)) from e
            raise SzCorruptArchive(str(e)) from e
        substreams = header.substreams
        num_unpack: list[int] = []
        if substreams:
            num_unpack = substreams.num_unpack_streams
        else:
            num_unpack = [1] * len(header.folders)
        ns = num_unpack[folder_index] if folder_index < len(num_unpack) else 1
        if ns <= 1:
            self._folder_cache[folder_index] = [memoryview(raw)]
        else:
            stream_offset_global = sum(
                num_unpack[fi] for fi in range(folder_index)
            )
            raw_view = memoryview(raw)
            streams: list[memoryview] = []
            offset = 0
            for si in range(ns):
                if substreams and (stream_offset_global + si) < len(substreams.unpack_sizes):
                    sz = substreams.unpack_sizes[stream_offset_global + si]
                else:
                    sz = len(raw) - offset
                streams.append(raw_view[offset:offset + sz])
                offset += sz
            self._folder_cache[folder_index] = streams
        result = self._folder_cache[folder_index]
        if stream_index < len(result):
            data = result[stream_index]
            entry_crc = self._get_file_crc(folder_index, stream_index)
            if entry_crc is not None:
                computed = zlib.crc32(data) & 0xFFFFFFFF
                if computed != entry_crc:
                    raise SzCorruptArchive(
                        F'CRC mismatch for stream {stream_index} in folder {folder_index}:'
                        F' expected {entry_crc:#010x}, computed {computed:#010x}.')
            return bytes(data)
        raise SzCorruptArchive(
            F'Stream index {stream_index} out of range for folder {folder_index}.')

    def _get_file_crc(self, folder_index: int, stream_index: int) -> int | None:
        for f in self._files:
            if f._folder_index == folder_index and f._stream_index == stream_index:
                return f.crc
        return None

Instance variables

var signature_header
Expand source code Browse git
@property
def signature_header(self) -> SignatureHeader:
    assert self._sig_header is not None
    return self._sig_header
var files
Expand source code Browse git
@property
def files(self) -> list[SzFile]:
    return list(self._files)
class SzCorruptArchive (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code Browse git
class SzCorruptArchive(SzCannotUnpack):
    pass

Ancestors

class SzException (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code Browse git
class SzException(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

class SzFile (entry, archive, folder_index, stream_index)
Expand source code Browse git
class SzFile:
    def __init__(
        self,
        entry: FileEntry,
        archive: SzArchive,
        folder_index: int | None,
        stream_index: int | None,
    ):
        self._entry = entry
        self._archive = archive
        self._folder_index = folder_index
        self._stream_index = stream_index

    @property
    def name(self) -> str:
        return self._entry.name

    @property
    def size(self) -> int:
        return self._entry.size

    @property
    def crc(self) -> int | None:
        return self._entry.crc

    @property
    def is_dir(self) -> bool:
        return self._entry.is_dir

    @property
    def mtime(self) -> datetime | None:
        return self._entry.mtime

    @property
    def ctime(self) -> datetime | None:
        return self._entry.ctime

    @property
    def atime(self) -> datetime | None:
        return self._entry.atime

    @property
    def attributes(self) -> int:
        return self._entry.attributes

    def decompress(self, password: str | bytes | None = None) -> bytes | bytearray | memoryview:
        if self.is_dir:
            return b''
        if self._folder_index is None or self._stream_index is None:
            return b''
        pw = password or self._archive._password
        return self._archive._decompress_file(self._folder_index, self._stream_index, pw)

Instance variables

var name
Expand source code Browse git
@property
def name(self) -> str:
    return self._entry.name
var size
Expand source code Browse git
@property
def size(self) -> int:
    return self._entry.size
var crc
Expand source code Browse git
@property
def crc(self) -> int | None:
    return self._entry.crc
var is_dir
Expand source code Browse git
@property
def is_dir(self) -> bool:
    return self._entry.is_dir
var mtime
Expand source code Browse git
@property
def mtime(self) -> datetime | None:
    return self._entry.mtime
var ctime
Expand source code Browse git
@property
def ctime(self) -> datetime | None:
    return self._entry.ctime
var atime
Expand source code Browse git
@property
def atime(self) -> datetime | None:
    return self._entry.atime
var attributes
Expand source code Browse git
@property
def attributes(self) -> int:
    return self._entry.attributes

Methods

def decompress(self, password=None)
Expand source code Browse git
def decompress(self, password: str | bytes | None = None) -> bytes | bytearray | memoryview:
    if self.is_dir:
        return b''
    if self._folder_index is None or self._stream_index is None:
        return b''
    pw = password or self._archive._password
    return self._archive._decompress_file(self._folder_index, self._stream_index, pw)
class SzInvalidPassword (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code Browse git
class SzInvalidPassword(SzCannotUnpack):
    pass

Ancestors

class SzPasswordRequired (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code Browse git
class SzPasswordRequired(SzCannotUnpack):
    pass

Ancestors

class SzUnsupportedMethod (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code Browse git
class SzUnsupportedMethod(SzException):
    pass

Ancestors

  • SzException
  • builtins.Exception
  • builtins.BaseException
class SzCannotUnpack (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code Browse git
class SzCannotUnpack(SzException):
    pass

Ancestors

  • SzException
  • builtins.Exception
  • builtins.BaseException

Subclasses