Module refinery.lib.cab
Parsing of CAB archives.
Expand source code Browse git
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Parsing of CAB archives.
"""
from __future__ import annotations
from typing import NamedTuple, Optional, Iterable
from enum import IntFlag, IntEnum
from datetime import date, time, datetime
import zlib
from refinery.lib.structures import Struct, StructReader
from refinery.lib import chunks
from refinery.lib.lzx import LzxDecoder
class CabVolumeMissing(LookupError):
def __init__(self, idx: int = -1, ref: Optional[CabRef] = None):
self.idx = idx
self.ref = ref
def __str__(self):
if self.ref is not None:
name = str(self.ref)
elif self.idx >= 0:
name = F'Disk {self.idx}'
return F'Missing CAB volume: {name}'
class CabVolumeCorrupt(ValueError):
pass
def cab_data_checksum(content: memoryview, checksum: int = 0) -> int:
for chunk in chunks.unpack(content, 4):
checksum ^= chunk
if k := len(content) % 4:
checksum ^= int.from_bytes(content[-k:], 'big')
return checksum
class CabFlags(IntFlag):
HasPrev = 1
HasNext = 2
Reserve = 4
class CabMethod(IntEnum):
Nothing = 0
Deflate = 1
Quantum = 2
LZX = 3
class CabAttr(IntFlag):
ReadOnly = 0x01
Hidden = 0x02
System = 0x04
Arch = 0x20
Exec = 0x40
NameUTF8 = 0x80
class NFolderIndex(IntFlag):
HasPrev = 0xFFFD
HasNext = 0xFFFE
HasPrevAndNext = 0xFFFF
class CabFolder(Struct):
def __init__(self, reader: StructReader[memoryview], parent: CabDisk, compute_checksums: bool, no_magic: bool):
start = reader.u32()
count = reader.u16()
if no_magic:
start -= 4
self.method = (reader.u8(), reader.u8())
self.compression = CabMethod(self.method[0] & 0xF)
reader.seekrel(parent.skip_per_fldr)
with reader.detour(start):
self.blocks = [CabCompressedBlock(reader, parent, compute_checksums) for _ in range(count)]
self.decompressed = None
def __repr__(self):
return F'<fldr:{self.compression.name}({self.method[1]}):{len(self.blocks)}>'
def decompress(self):
if self.decompressed is not None:
return memoryview(self.decompressed)
dst = bytearray()
cm = self.compression
it = iter(self.blocks)
if cm == CabMethod.Nothing:
for block in it:
dst.extend(block.data)
elif cm == CabMethod.Deflate:
zdict = B''
for block in it:
if block.data[:2] != B'CK':
raise ValueError('Corrupted MSZip block with invalid header.')
try:
inflate = zlib.decompressobj(-zlib.MAX_WBITS, zdict)
zdict = inflate.decompress(block.data[2:]) + inflate.flush()
except zlib.error:
raise RuntimeError('Failed to inflate CAB data block.')
else:
dst.extend(zdict)
elif cm == CabMethod.LZX:
lzx = LzxDecoder(False)
lzx.set_params_and_alloc(self.method[1])
for block in it:
if size := block.decompressed_size:
data = block.data
else:
data = bytearray(block.data)
tail = next(it)
data.extend(tail.data)
size = tail.decompressed_size
if not size:
raise RuntimeError('Zero size in continued block.')
dst.extend(lzx.decompress(data, size))
lzx.keep_history = True
elif cm == CabMethod.Quantum:
raise NotImplementedError('Quantum decompression is not yet implemented.')
else:
raise ValueError(F'Unknown decompression method: {cm!r}')
self.decompressed = dst
return memoryview(dst)
class CabFile(Struct):
folder: Optional[CabFolder]
def __init__(self, reader: StructReader[memoryview]):
self.size = reader.u32()
self.offset = reader.u32()
self._index = reader.u16()
self.folder = None
self.end = self.offset + self.size
d = reader.u16()
t = reader.u16()
s = (t & 0x1F) << 1
try:
self.date = d = date(
((d & 0xFE00) >> 0x9) + 1980,
((d & 0x01E0) >> 0x5),
((d & 0x001F) >> 0x0),
)
except Exception:
self.date = d = None
try:
self.time = t = time(
((t & 0xF800) >> 0xB),
((t & 0x07E0) >> 0x5),
59 if s == 60 else s,
)
except Exception:
self.time = t = None
self.timestamp = datetime.combine(d, t) if d and t else None
self.attributes = CabAttr(reader.u16())
self.name = reader.read_c_string(self.codec)
def __repr__(self):
index = {
NFolderIndex.HasPrev: 'PP',
NFolderIndex.HasNext: 'NN',
NFolderIndex.HasPrevAndNext: 'PN',
}.get(self._index, F'{self._index:02d}')
d = d.isoformat() if (d := self.date) else '????-??-??'
t = t.isoformat('seconds') if (t := self.time) else '??:??:??'
return F'<file:{index}:{d}T{t}:{self.name}>'
def decompress(self):
folder = self.folder
if folder is None:
raise RuntimeError(F'CAB file entry is missing a link to its folder: {self!r}')
folder_data = self.folder.decompress()
data = folder_data[self.offset:self.end]
if len(data) != self.size:
raise RuntimeError(F'The extracted file does not have the correct size: {self!r}')
return data
@property
def codec(self):
return 'utf8' if self.attributes & CabAttr.NameUTF8 else 'latin1'
def has_prev(self):
return self._index in (NFolderIndex.HasPrev, NFolderIndex.HasPrevAndNext)
def has_next(self):
return self._index in (NFolderIndex.HasNext, NFolderIndex.HasPrevAndNext)
@property
def index(self):
if self.has_prev():
return +0
if self.has_next():
return ~0
else:
return self._index
class CabCompressedBlock(Struct):
def __init__(self, reader: StructReader[memoryview], parent: CabDisk, compute_checksums: bool):
self.provided_checksum = reader.u32()
seed = reader.u32(peek=True)
size = reader.u16()
self.decompressed_size = reader.u16()
reader.seekrel(parent.skip_per_data)
self.data = data = reader.read_exactly(size)
self.computed_checksum = cab_data_checksum(data, seed) if compute_checksums else None
def __repr__(self):
if self.computed_checksum == self.provided_checksum:
checksum = 'OK'
elif self.computed_checksum is None:
checksum = '??'
else:
checksum = '!!'
return F'<block:{len(self.data):04X}->{self.decompressed_size:04X}:{checksum}>'
class CabRef(NamedTuple):
name: str
disk: str
def __str__(self):
return F'{self.disk} ({self.name})'
class CabDisk(Struct):
MAGIC = B'MSCF'
def __init__(self, reader: StructReader[memoryview], compute_checksums: bool, no_magic: bool):
if no_magic:
self.signature = self.MAGIC
else:
self.signature = reader.read(4)
self._reserved = []
self._reserved.append(reader.u32())
self.size = reader.u32()
self._reserved.append(reader.u32())
self.file_offset = reader.u32()
if no_magic:
self.file_offset -= 4
self._reserved.append(reader.u32())
self.version = (reader.u8(), reader.u8())
self.nr_of_folders = reader.u16()
self.nr_of_files = reader.u16()
self.flags = CabFlags(reader.u16())
self.id = reader.u16()
self.index = reader.u16()
(
self.skip_per_disk,
self.skip_per_fldr,
self.skip_per_data,
) = reader.read_struct('HBB') if (
self.flags & CabFlags.Reserve
) else (0, 0, 0)
reader.seekrel(self.skip_per_disk)
self.prev = CabRef(
reader.read_c_string('ascii'),
reader.read_c_string('ascii'),
) if self.flags & CabFlags.HasPrev else None
self.next = CabRef(
reader.read_c_string('ascii'),
reader.read_c_string('ascii'),
) if self.flags & CabFlags.HasNext else None
self.folders = [
CabFolder(reader, self, compute_checksums, no_magic) for _ in range(self.nr_of_folders)]
reader.seekset(self.file_offset)
self.files = [CabFile(reader) for _ in range(self.nr_of_files)]
self._reader = reader
self._arcpos = reader.tell()
def check(self):
if self.signature != self.MAGIC:
raise ValueError(F'Invalid signature: {self.signature.hex()}')
if self.flags.value > 7:
raise ValueError(F'Invalid flags: {self.flags.value}.')
if any(self._reserved):
raise ValueError(U'Reserved field was nonzero.')
if self.size < 36:
raise ValueError(F'Archive header specifies invalid size of {self.size} bytes.')
return self
class Cabinet:
files: dict[int, list[CabFile]]
disks: dict[int, list[CabDisk]]
def __init__(self, *disks: memoryview, compute_checksums: bool = True, no_magic: bool = False):
self.disks = {}
self.files = {}
self.compute_checksums = compute_checksums
self.no_magic = no_magic
self.extend(disks)
def get_files(self, id: Optional[int] = None):
if id is None:
if len(self.files) != 1:
raise LookupError
return next(iter(self.files.values()))
else:
return self.files[id]
def __bool__(self):
return bool(self.disks)
def __len__(self):
return sum(len(disks) for disks in self.disks.values())
def extend(self, disks: Iterable[memoryview]):
for d in disks:
disk = CabDisk(memoryview(d), self.compute_checksums, self.no_magic)
byid = self.disks.setdefault(disk.id, [])
byid.append(disk)
for byid in self.disks.values():
byid.sort(key=lambda c: c.index)
def append(self, *disks: memoryview):
self.extend(disks)
def process(self):
for id, disks in self.disks.items():
files = self.files[id] = []
partial: Optional[CabFolder] = None
folders: list[CabFolder] = []
for disk in disks:
folders.clear()
for folder in disk.folders:
if partial is None:
folders.append(folder)
continue
if partial.method != folder.method:
raise ValueError('Mismatching methods for continued folder.')
if folder.blocks:
partial.blocks.extend(folder.blocks)
folder.blocks.clear()
folders.append(partial)
partial = None
for file in disk.files:
file.folder = folders[file.index]
if file.has_next():
partial = file.folder
else:
files.append(file)
return self
def needs_more_disks(self):
if not self.disks:
return True
try:
self.check(checksums=False)
except CabVolumeMissing:
return True
else:
return False
def check(self, checksums: bool = True):
for disks in self.disks.values():
for k, disk in enumerate(disks):
if disk.index != k:
raise CabVolumeMissing(idx=k)
prev_list = [disk.prev for disk in disks]
next_list = [disk.next for disk in disks]
if prev := prev_list[+0]:
raise CabVolumeMissing(ref=prev)
if next := next_list[~0]:
raise CabVolumeMissing(ref=next)
for prev, next in zip(prev_list[2:], next_list[:-2]):
if prev != next:
raise ValueError(F'CAB disk sequence mismatch: {prev!s} != {next!s}.')
if not checksums:
continue
for disk in disks:
for f, folder in enumerate(disk.folders):
for b, block in enumerate(folder.blocks):
if block.computed_checksum is None:
continue
p = block.provided_checksum
c = block.computed_checksum
if p == c:
continue
raise CabVolumeCorrupt(
F'Incorrect checksum in Disk {disk.index}, folder {f}, block {b}; '
F'provided value was {p:08X}, computed value {c:08X}.')
Functions
def cab_data_checksum(content, checksum=0)
-
Expand source code Browse git
def cab_data_checksum(content: memoryview, checksum: int = 0) -> int: for chunk in chunks.unpack(content, 4): checksum ^= chunk if k := len(content) % 4: checksum ^= int.from_bytes(content[-k:], 'big') return checksum
Classes
class CabVolumeMissing (idx=-1, ref=None)
-
Base class for lookup errors.
Expand source code Browse git
class CabVolumeMissing(LookupError): def __init__(self, idx: int = -1, ref: Optional[CabRef] = None): self.idx = idx self.ref = ref def __str__(self): if self.ref is not None: name = str(self.ref) elif self.idx >= 0: name = F'Disk {self.idx}' return F'Missing CAB volume: {name}'
Ancestors
- builtins.LookupError
- builtins.Exception
- builtins.BaseException
class CabVolumeCorrupt (*args, **kwargs)
-
Inappropriate argument value (of correct type).
Expand source code Browse git
class CabVolumeCorrupt(ValueError): pass
Ancestors
- builtins.ValueError
- builtins.Exception
- builtins.BaseException
class CabFlags (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code Browse git
class CabFlags(IntFlag): HasPrev = 1 HasNext = 2 Reserve = 4
Ancestors
- enum.IntFlag
- builtins.int
- enum.Flag
- enum.Enum
Class variables
var HasPrev
var HasNext
var Reserve
class CabMethod (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code Browse git
class CabMethod(IntEnum): Nothing = 0 Deflate = 1 Quantum = 2 LZX = 3
Ancestors
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var Nothing
var Deflate
var Quantum
var LZX
class CabAttr (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code Browse git
class CabAttr(IntFlag): ReadOnly = 0x01 Hidden = 0x02 System = 0x04 Arch = 0x20 Exec = 0x40 NameUTF8 = 0x80
Ancestors
- enum.IntFlag
- builtins.int
- enum.Flag
- enum.Enum
Class variables
var ReadOnly
var Hidden
var System
var Arch
var Exec
var NameUTF8
class NFolderIndex (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code Browse git
class NFolderIndex(IntFlag): HasPrev = 0xFFFD HasNext = 0xFFFE HasPrevAndNext = 0xFFFF
Ancestors
- enum.IntFlag
- builtins.int
- enum.Flag
- enum.Enum
Class variables
var HasPrev
var HasNext
var HasPrevAndNext
class CabFolder (reader, parent, compute_checksums, no_magic)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class CabFolder(Struct): def __init__(self, reader: StructReader[memoryview], parent: CabDisk, compute_checksums: bool, no_magic: bool): start = reader.u32() count = reader.u16() if no_magic: start -= 4 self.method = (reader.u8(), reader.u8()) self.compression = CabMethod(self.method[0] & 0xF) reader.seekrel(parent.skip_per_fldr) with reader.detour(start): self.blocks = [CabCompressedBlock(reader, parent, compute_checksums) for _ in range(count)] self.decompressed = None def __repr__(self): return F'<fldr:{self.compression.name}({self.method[1]}):{len(self.blocks)}>' def decompress(self): if self.decompressed is not None: return memoryview(self.decompressed) dst = bytearray() cm = self.compression it = iter(self.blocks) if cm == CabMethod.Nothing: for block in it: dst.extend(block.data) elif cm == CabMethod.Deflate: zdict = B'' for block in it: if block.data[:2] != B'CK': raise ValueError('Corrupted MSZip block with invalid header.') try: inflate = zlib.decompressobj(-zlib.MAX_WBITS, zdict) zdict = inflate.decompress(block.data[2:]) + inflate.flush() except zlib.error: raise RuntimeError('Failed to inflate CAB data block.') else: dst.extend(zdict) elif cm == CabMethod.LZX: lzx = LzxDecoder(False) lzx.set_params_and_alloc(self.method[1]) for block in it: if size := block.decompressed_size: data = block.data else: data = bytearray(block.data) tail = next(it) data.extend(tail.data) size = tail.decompressed_size if not size: raise RuntimeError('Zero size in continued block.') dst.extend(lzx.decompress(data, size)) lzx.keep_history = True elif cm == CabMethod.Quantum: raise NotImplementedError('Quantum decompression is not yet implemented.') else: raise ValueError(F'Unknown decompression method: {cm!r}') self.decompressed = dst return memoryview(dst)
Ancestors
Methods
def decompress(self)
-
Expand source code Browse git
def decompress(self): if self.decompressed is not None: return memoryview(self.decompressed) dst = bytearray() cm = self.compression it = iter(self.blocks) if cm == CabMethod.Nothing: for block in it: dst.extend(block.data) elif cm == CabMethod.Deflate: zdict = B'' for block in it: if block.data[:2] != B'CK': raise ValueError('Corrupted MSZip block with invalid header.') try: inflate = zlib.decompressobj(-zlib.MAX_WBITS, zdict) zdict = inflate.decompress(block.data[2:]) + inflate.flush() except zlib.error: raise RuntimeError('Failed to inflate CAB data block.') else: dst.extend(zdict) elif cm == CabMethod.LZX: lzx = LzxDecoder(False) lzx.set_params_and_alloc(self.method[1]) for block in it: if size := block.decompressed_size: data = block.data else: data = bytearray(block.data) tail = next(it) data.extend(tail.data) size = tail.decompressed_size if not size: raise RuntimeError('Zero size in continued block.') dst.extend(lzx.decompress(data, size)) lzx.keep_history = True elif cm == CabMethod.Quantum: raise NotImplementedError('Quantum decompression is not yet implemented.') else: raise ValueError(F'Unknown decompression method: {cm!r}') self.decompressed = dst return memoryview(dst)
class CabFile (reader)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class CabFile(Struct): folder: Optional[CabFolder] def __init__(self, reader: StructReader[memoryview]): self.size = reader.u32() self.offset = reader.u32() self._index = reader.u16() self.folder = None self.end = self.offset + self.size d = reader.u16() t = reader.u16() s = (t & 0x1F) << 1 try: self.date = d = date( ((d & 0xFE00) >> 0x9) + 1980, ((d & 0x01E0) >> 0x5), ((d & 0x001F) >> 0x0), ) except Exception: self.date = d = None try: self.time = t = time( ((t & 0xF800) >> 0xB), ((t & 0x07E0) >> 0x5), 59 if s == 60 else s, ) except Exception: self.time = t = None self.timestamp = datetime.combine(d, t) if d and t else None self.attributes = CabAttr(reader.u16()) self.name = reader.read_c_string(self.codec) def __repr__(self): index = { NFolderIndex.HasPrev: 'PP', NFolderIndex.HasNext: 'NN', NFolderIndex.HasPrevAndNext: 'PN', }.get(self._index, F'{self._index:02d}') d = d.isoformat() if (d := self.date) else '????-??-??' t = t.isoformat('seconds') if (t := self.time) else '??:??:??' return F'<file:{index}:{d}T{t}:{self.name}>' def decompress(self): folder = self.folder if folder is None: raise RuntimeError(F'CAB file entry is missing a link to its folder: {self!r}') folder_data = self.folder.decompress() data = folder_data[self.offset:self.end] if len(data) != self.size: raise RuntimeError(F'The extracted file does not have the correct size: {self!r}') return data @property def codec(self): return 'utf8' if self.attributes & CabAttr.NameUTF8 else 'latin1' def has_prev(self): return self._index in (NFolderIndex.HasPrev, NFolderIndex.HasPrevAndNext) def has_next(self): return self._index in (NFolderIndex.HasNext, NFolderIndex.HasPrevAndNext) @property def index(self): if self.has_prev(): return +0 if self.has_next(): return ~0 else: return self._index
Ancestors
Class variables
var folder
Instance variables
var codec
-
Expand source code Browse git
@property def codec(self): return 'utf8' if self.attributes & CabAttr.NameUTF8 else 'latin1'
var index
-
Expand source code Browse git
@property def index(self): if self.has_prev(): return +0 if self.has_next(): return ~0 else: return self._index
Methods
def decompress(self)
-
Expand source code Browse git
def decompress(self): folder = self.folder if folder is None: raise RuntimeError(F'CAB file entry is missing a link to its folder: {self!r}') folder_data = self.folder.decompress() data = folder_data[self.offset:self.end] if len(data) != self.size: raise RuntimeError(F'The extracted file does not have the correct size: {self!r}') return data
def has_prev(self)
-
Expand source code Browse git
def has_prev(self): return self._index in (NFolderIndex.HasPrev, NFolderIndex.HasPrevAndNext)
def has_next(self)
-
Expand source code Browse git
def has_next(self): return self._index in (NFolderIndex.HasNext, NFolderIndex.HasPrevAndNext)
class CabCompressedBlock (reader, parent, compute_checksums)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class CabCompressedBlock(Struct): def __init__(self, reader: StructReader[memoryview], parent: CabDisk, compute_checksums: bool): self.provided_checksum = reader.u32() seed = reader.u32(peek=True) size = reader.u16() self.decompressed_size = reader.u16() reader.seekrel(parent.skip_per_data) self.data = data = reader.read_exactly(size) self.computed_checksum = cab_data_checksum(data, seed) if compute_checksums else None def __repr__(self): if self.computed_checksum == self.provided_checksum: checksum = 'OK' elif self.computed_checksum is None: checksum = '??' else: checksum = '!!' return F'<block:{len(self.data):04X}->{self.decompressed_size:04X}:{checksum}>'
Ancestors
class CabRef (name, disk)
-
CabRef(name, disk)
Expand source code Browse git
class CabRef(NamedTuple): name: str disk: str def __str__(self): return F'{self.disk} ({self.name})'
Ancestors
- builtins.tuple
Instance variables
var name
-
Alias for field number 0
var disk
-
Alias for field number 1
class CabDisk (reader, compute_checksums, no_magic)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class CabDisk(Struct): MAGIC = B'MSCF' def __init__(self, reader: StructReader[memoryview], compute_checksums: bool, no_magic: bool): if no_magic: self.signature = self.MAGIC else: self.signature = reader.read(4) self._reserved = [] self._reserved.append(reader.u32()) self.size = reader.u32() self._reserved.append(reader.u32()) self.file_offset = reader.u32() if no_magic: self.file_offset -= 4 self._reserved.append(reader.u32()) self.version = (reader.u8(), reader.u8()) self.nr_of_folders = reader.u16() self.nr_of_files = reader.u16() self.flags = CabFlags(reader.u16()) self.id = reader.u16() self.index = reader.u16() ( self.skip_per_disk, self.skip_per_fldr, self.skip_per_data, ) = reader.read_struct('HBB') if ( self.flags & CabFlags.Reserve ) else (0, 0, 0) reader.seekrel(self.skip_per_disk) self.prev = CabRef( reader.read_c_string('ascii'), reader.read_c_string('ascii'), ) if self.flags & CabFlags.HasPrev else None self.next = CabRef( reader.read_c_string('ascii'), reader.read_c_string('ascii'), ) if self.flags & CabFlags.HasNext else None self.folders = [ CabFolder(reader, self, compute_checksums, no_magic) for _ in range(self.nr_of_folders)] reader.seekset(self.file_offset) self.files = [CabFile(reader) for _ in range(self.nr_of_files)] self._reader = reader self._arcpos = reader.tell() def check(self): if self.signature != self.MAGIC: raise ValueError(F'Invalid signature: {self.signature.hex()}') if self.flags.value > 7: raise ValueError(F'Invalid flags: {self.flags.value}.') if any(self._reserved): raise ValueError(U'Reserved field was nonzero.') if self.size < 36: raise ValueError(F'Archive header specifies invalid size of {self.size} bytes.') return self
Ancestors
Class variables
var MAGIC
Methods
def check(self)
-
Expand source code Browse git
def check(self): if self.signature != self.MAGIC: raise ValueError(F'Invalid signature: {self.signature.hex()}') if self.flags.value > 7: raise ValueError(F'Invalid flags: {self.flags.value}.') if any(self._reserved): raise ValueError(U'Reserved field was nonzero.') if self.size < 36: raise ValueError(F'Archive header specifies invalid size of {self.size} bytes.') return self
class Cabinet (*disks, compute_checksums=True, no_magic=False)
-
Expand source code Browse git
class Cabinet: files: dict[int, list[CabFile]] disks: dict[int, list[CabDisk]] def __init__(self, *disks: memoryview, compute_checksums: bool = True, no_magic: bool = False): self.disks = {} self.files = {} self.compute_checksums = compute_checksums self.no_magic = no_magic self.extend(disks) def get_files(self, id: Optional[int] = None): if id is None: if len(self.files) != 1: raise LookupError return next(iter(self.files.values())) else: return self.files[id] def __bool__(self): return bool(self.disks) def __len__(self): return sum(len(disks) for disks in self.disks.values()) def extend(self, disks: Iterable[memoryview]): for d in disks: disk = CabDisk(memoryview(d), self.compute_checksums, self.no_magic) byid = self.disks.setdefault(disk.id, []) byid.append(disk) for byid in self.disks.values(): byid.sort(key=lambda c: c.index) def append(self, *disks: memoryview): self.extend(disks) def process(self): for id, disks in self.disks.items(): files = self.files[id] = [] partial: Optional[CabFolder] = None folders: list[CabFolder] = [] for disk in disks: folders.clear() for folder in disk.folders: if partial is None: folders.append(folder) continue if partial.method != folder.method: raise ValueError('Mismatching methods for continued folder.') if folder.blocks: partial.blocks.extend(folder.blocks) folder.blocks.clear() folders.append(partial) partial = None for file in disk.files: file.folder = folders[file.index] if file.has_next(): partial = file.folder else: files.append(file) return self def needs_more_disks(self): if not self.disks: return True try: self.check(checksums=False) except CabVolumeMissing: return True else: return False def check(self, checksums: bool = True): for disks in self.disks.values(): for k, disk in enumerate(disks): if disk.index != k: raise CabVolumeMissing(idx=k) prev_list = [disk.prev for disk in disks] next_list = [disk.next for disk in disks] if prev := prev_list[+0]: raise CabVolumeMissing(ref=prev) if next := next_list[~0]: raise CabVolumeMissing(ref=next) for prev, next in zip(prev_list[2:], next_list[:-2]): if prev != next: raise ValueError(F'CAB disk sequence mismatch: {prev!s} != {next!s}.') if not checksums: continue for disk in disks: for f, folder in enumerate(disk.folders): for b, block in enumerate(folder.blocks): if block.computed_checksum is None: continue p = block.provided_checksum c = block.computed_checksum if p == c: continue raise CabVolumeCorrupt( F'Incorrect checksum in Disk {disk.index}, folder {f}, block {b}; ' F'provided value was {p:08X}, computed value {c:08X}.')
Class variables
var files
var disks
Methods
def get_files(self, id=None)
-
Expand source code Browse git
def get_files(self, id: Optional[int] = None): if id is None: if len(self.files) != 1: raise LookupError return next(iter(self.files.values())) else: return self.files[id]
def extend(self, disks)
-
Expand source code Browse git
def extend(self, disks: Iterable[memoryview]): for d in disks: disk = CabDisk(memoryview(d), self.compute_checksums, self.no_magic) byid = self.disks.setdefault(disk.id, []) byid.append(disk) for byid in self.disks.values(): byid.sort(key=lambda c: c.index)
def append(self, *disks)
-
Expand source code Browse git
def append(self, *disks: memoryview): self.extend(disks)
def process(self)
-
Expand source code Browse git
def process(self): for id, disks in self.disks.items(): files = self.files[id] = [] partial: Optional[CabFolder] = None folders: list[CabFolder] = [] for disk in disks: folders.clear() for folder in disk.folders: if partial is None: folders.append(folder) continue if partial.method != folder.method: raise ValueError('Mismatching methods for continued folder.') if folder.blocks: partial.blocks.extend(folder.blocks) folder.blocks.clear() folders.append(partial) partial = None for file in disk.files: file.folder = folders[file.index] if file.has_next(): partial = file.folder else: files.append(file) return self
def needs_more_disks(self)
-
Expand source code Browse git
def needs_more_disks(self): if not self.disks: return True try: self.check(checksums=False) except CabVolumeMissing: return True else: return False
def check(self, checksums=True)
-
Expand source code Browse git
def check(self, checksums: bool = True): for disks in self.disks.values(): for k, disk in enumerate(disks): if disk.index != k: raise CabVolumeMissing(idx=k) prev_list = [disk.prev for disk in disks] next_list = [disk.next for disk in disks] if prev := prev_list[+0]: raise CabVolumeMissing(ref=prev) if next := next_list[~0]: raise CabVolumeMissing(ref=next) for prev, next in zip(prev_list[2:], next_list[:-2]): if prev != next: raise ValueError(F'CAB disk sequence mismatch: {prev!s} != {next!s}.') if not checksums: continue for disk in disks: for f, folder in enumerate(disk.folders): for b, block in enumerate(folder.blocks): if block.computed_checksum is None: continue p = block.provided_checksum c = block.computed_checksum if p == c: continue raise CabVolumeCorrupt( F'Incorrect checksum in Disk {disk.index}, folder {f}, block {b}; ' F'provided value was {p:08X}, computed value {c:08X}.')