Module refinery.lib.chm
Expand source code Browse git
from __future__ import annotations
from typing import ClassVar
from uuid import UUID
from dataclasses import dataclass, field
from functools import cached_property
import codecs
import math
from refinery.lib.structures import Struct, StructReader
from refinery.lib.lzx import LzxDecoder
from refinery.lib.lcid import LCID, DEFAULT_CODEPAGE
_LZX_HLP = UUID('0a9007c6-4076-11d3-8789-0000f8105754')
_LZX_CHM = UUID('7fc28940-9d31-11d0-9b27-00a0c91e9c7c')
class SectionHeader(Struct):
def __init__(self, reader: StructReader[memoryview]):
self.offset = reader.u64()
self.length = reader.u64()
class LanguageMixin:
language: int
@cached_property
def codepage(self):
return DEFAULT_CODEPAGE.get(self.language, None)
@cached_property
def language_name(self):
return LCID.get(self.language, 'Unknown')
class ChmStruct(Struct):
Magic: ClassVar[bytes]
def _check_magic(self, reader: StructReader):
if (s := reader.peek(len(self.Magic))) != self.Magic:
raise InvalidMagic(self, s)
class InvalidMagic(ValueError):
def __init__(self, who: ChmStruct, magic: memoryview):
super().__init__(
F'Invalid {who.__class__.__name__} signature {magic.hex(":").upper()}, '
F'should be {who.Magic.hex(":").upper()}.')
class ChmHeader(ChmStruct, LanguageMixin):
Magic = B'ITSF'
Guid = UUID('7C01FD10-7BAA-11D0-9E0C-00A0-C922-E6EC')
def __init__(self, reader: StructReader[memoryview]):
self._check_magic(reader)
self.signature = reader.read_bytes(4)
self.version = v = reader.u32()
if v < 3:
raise NotImplementedError(F'Parsing of HelpV{v} not yet implemented.')
self.header_size = reader.u32()
self.unknown = reader.u32()
self.timestamp = reader.u32()
self.language = reader.u32()
self.guid1 = reader.read_guid()
self.guid2 = reader.read_guid()
self.section_file_size = SectionHeader(reader)
self.section_directory = SectionHeader(reader)
self.content_offset = reader.u64() if v >= 3 else None
class FileSizeHeader(ChmStruct):
Magic = B'\xFE\x01\0\0'
def __init__(self, reader: StructReader[memoryview]):
self._check_magic(reader)
self.signature = reader.u32()
reader.skip(4)
self.file_size = reader.u64()
reader.skip(8)
class DirectoryHeader(ChmStruct, LanguageMixin):
Magic = B'ITSP\x01\0\0\0'
Guid = UUID('5D02926A-212E-11D0-9DF9-00A0C922E6EC')
def __init__(self, reader: StructReader[memoryview]):
self._check_magic(reader)
self.signature = reader.read_bytes(4)
self.version = reader.u32()
if self.version != 1:
raise NotImplementedError
self.header_length_1 = reader.u32()
reader.skip(4)
self.chunk_size = reader.u32()
self.density = reader.u32()
self.tree_depth = reader.u32()
self.root_chunk = reader.u32()
self.listing1st = reader.u32()
self.listingLst = reader.u32()
reader.skip(4)
self.total_chunks = reader.u32()
self.language = reader.u32()
if reader.read_guid() != self.Guid:
raise NotImplementedError
self.header_length_2 = reader.u32()
reader.skip(12)
class QuickRefArea(Struct):
def __init__(self, reader: StructReader, n: int, count: int):
self.offsets = {
(k * n): reader.u16() for k in range(count, 0, -1)
}
self.num_entries = reader.u16()
class DirectoryListingEntry(Struct):
def __init__(self, reader: StructReader):
ns = reader.read_7bit_encoded_int(64, bigendian=True)
self.name = reader.read_bytes(ns).decode('utf8')
self.section_index = reader.read_7bit_encoded_int(64, bigendian=True)
self.offset = reader.read_7bit_encoded_int(64, bigendian=True)
self.length = reader.read_7bit_encoded_int(64, bigendian=True)
class Chunk(ChmStruct):
def __init__(self, reader: StructReader, density: int):
self._check_magic(reader)
self.density = density
with reader.detour_from_end(-2):
self.num_entries = reader.u16()
self.signature = reader.read_bytes(4)
self.extra_size = reader.u32()
def _quick_refs(self, reader: StructReader, density: int):
n = 1 + (1 << density)
count = self.num_entries // n
skips = reader.remaining_bytes - 2 * (count + 1)
while skips < 0:
count -= 1
skips += 2
reader.skip(skips)
self.quickref = QuickRefArea(reader, n, count)
if self.quickref.num_entries != self.num_entries:
raise ValueError
class IndexChunk(Chunk):
Magic = B'PMGI'
def __init__(self, reader: StructReader, density: int):
super().__init__(reader, density)
self._quick_refs(reader, density)
class ListingChunk(Chunk):
Magic = B'PMGL'
def __init__(self, reader: StructReader, density: int):
super().__init__(reader, density)
reader.skip(4)
self.nr_prev = reader.u32()
self.nr_next = reader.u32()
self.entries = [
DirectoryListingEntry(reader) for _ in range(self.num_entries)]
self._quick_refs(reader, density)
class ContentSectionsName(Struct):
def __init__(self, reader: StructReader):
name = reader.read_length_prefixed(16, block_size=2)
self.name = codecs.decode(name, 'utf-16le')
if (t := reader.u16()) != 0:
raise ValueError(F'Expected a zero WORD after content section name, got 0x{t:X}.')
@property
def path(self):
return F'::DataSpace/Storage/{self.name}/'
@property
def path_content(self):
return F'{self.path}Content'
@property
def path_ctrl_data(self):
return F'{self.path}ControlData'
@property
def path_span_info(self):
return F'{self.path}SpanInfo'
def path_reset_table(self, guid: UUID):
return F'{self.path}Transform/{{{str(guid).upper()}}}/InstanceData/ResetTable'
class ContentSections(Struct):
def __init__(self, reader: StructReader):
self.file_size = reader.u16()
self.sections = [ContentSectionsName(reader) for _ in range(reader.u16())]
class ContentSectionsControlData(ChmStruct):
Magic = b'LZXC'
def __init__(self, reader: StructReader[memoryview]):
# Number of DWORDs following 'LZXC': Must be 6 if version is 2
self.field_count = reader.u32()
self._check_magic(reader)
self.signature = reader.read_bytes(4)
self.version = reader.u32()
self.reset_interval = reader.u32()
self.window_size = reader.u32()
self.cache_size = reader.u32()
if (unknowns := self.field_count - 5) > 0:
self.extra = [reader.u32() for _ in range(unknowns)]
class ContentSectionsResetTable(Struct):
def __init__(self, reader: StructReader[memoryview]):
start = reader.tell()
self.version = reader.u32()
if self.version not in {2, 3}:
raise NotImplementedError
n = reader.u32()
if reader.u32() != 8:
raise NotImplementedError
self.header_size = reader.u32()
self.size_uncompressed = reader.u64()
self.size_compressed = reader.u64()
self.block_size = reader.u64()
if reader.tell() != self.header_size + start:
raise NotImplementedError
self.entries = [reader.u64() for _ in range(n)]
@dataclass
class ContentSection:
offset: int
length: int
base_section: int | None = None
uncompressed: int | None = None
window_size: int = 0
reset_interval: int = 0
block_size: int = 0
block_offsets: list = field(default_factory=list)
class CHM(Struct):
def read_section(self, index: int) -> memoryview:
try:
return self.section_data[index]
except KeyError:
cs = self.sections[index]
if cs.base_section is None:
with self.reader.detour(cs.offset):
data = self.reader.read(cs.length)
else:
data = self.read_section(cs.base_section)[cs.offset:][:cs.length]
if cs.window_size and cs.block_offsets:
lzx = LzxDecoder()
out = bytearray()
lzx.set_params_and_alloc(cs.window_size)
for nr, offset in enumerate(cs.block_offsets):
if nr % cs.reset_interval == 0:
lzx.keep_history = False
if nr < len(cs.block_offsets) - 1:
length = cs.block_offsets[nr + 1] - offset
else:
length = len(data) - offset
out.extend(lzx.decompress(data[offset:][:length], cs.block_size))
lzx.keep_history = True
data = memoryview(out)
self.section_data[index] = data
return data
def read(self, entry: DirectoryListingEntry):
data = self.read_section(entry.section_index)
return data[entry.offset:][:entry.length]
def seekto(self, reader: StructReader, path: str):
if content := self.filesystem.get(path):
reader.seekset(self.sections[content.section_index].offset + content.offset)
return True
else:
return False
def __init__(self, reader: StructReader[memoryview], *args, **kwargs):
self.filesystem: dict[str, DirectoryListingEntry] = {}
self.sections: list[ContentSection] = []
self.section_data: dict[int, memoryview] = {}
self.reader = reader
self.header = header = ChmHeader(reader)
with reader.detour_absolute(header.section_file_size.offset):
self.file_size = FileSizeHeader(reader)
with reader.detour_absolute(header.section_directory.offset):
self.directory = dh = DirectoryHeader(reader)
self.index = []
d = dh.density
m = dh.chunk_size
self.listing: list[ListingChunk] = []
for k in range(dh.total_chunks):
body = reader.read_exactly(m)
if body[:4] == IndexChunk.Magic:
self.index.append(IndexChunk(body, d))
continue
if body[:4] == ListingChunk.Magic:
if not (dh.listing1st <= k <= dh.listingLst):
raise ValueError(F'Chunk {k} has magic {ListingChunk.Magic.decode()} but is out of listing range.')
chunk = ListingChunk(body, d)
for entry in chunk.entries:
name = entry.name
if name.startswith('/#') or name.startswith('/$'):
name = F'/$CHM{name}'
self.filesystem[name] = entry
self.listing.append(chunk)
continue
raise ValueError(F'Unknown chunk magic: {body[:4].hex(":")}')
if (co := header.content_offset) is None:
co = reader.tell()
reader.seekset(co)
total_size = reader.remaining_bytes
reader.seekset(co + self.filesystem['::DataSpace/NameList'].offset)
self.content_sections = ContentSections(reader)
for section in self.content_sections.sections:
if section.name.lower() == 'uncompressed':
self.sections.append(ContentSection(co, total_size))
continue
try:
content = self.filesystem[section.path_content]
except KeyError as KE:
raise LookupError(F'could not find content file for section {section.name}') from KE
cs = ContentSection(content.offset, content.length)
cs.base_section = s = content.section_index
reader.seekset(self.sections[s].offset + cs.offset)
if self.seekto(reader, section.path_ctrl_data):
control_data = ContentSectionsControlData(reader)
cs.reset_interval = control_data.reset_interval
cs.window_size = 15 + int(math.log2(control_data.window_size))
if self.seekto(reader, section.path_span_info):
cs.uncompressed = reader.u64()
if any(
self.seekto(reader, section.path_reset_table(guid))
for guid in (_LZX_CHM, _LZX_HLP)
):
reset_table = ContentSectionsResetTable(reader)
cs.block_offsets = reset_table.entries
cs.block_size = reset_table.block_size
if cs.base_section != 0:
raise ValueError(F'Invalid base section {cs.base_section}')
self.sections.append(cs)
Classes
class SectionHeader (reader)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class SectionHeader(Struct): def __init__(self, reader: StructReader[memoryview]): self.offset = reader.u64() self.length = reader.u64()
Ancestors
class LanguageMixin
-
Expand source code Browse git
class LanguageMixin: language: int @cached_property def codepage(self): return DEFAULT_CODEPAGE.get(self.language, None) @cached_property def language_name(self): return LCID.get(self.language, 'Unknown')
Subclasses
Class variables
var language
Instance variables
var codepage
-
Expand source code Browse git
@cached_property def codepage(self): return DEFAULT_CODEPAGE.get(self.language, None)
var language_name
-
Expand source code Browse git
@cached_property def language_name(self): return LCID.get(self.language, 'Unknown')
class ChmStruct (reader, *args, **kwargs)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class ChmStruct(Struct): Magic: ClassVar[bytes] def _check_magic(self, reader: StructReader): if (s := reader.peek(len(self.Magic))) != self.Magic: raise InvalidMagic(self, s)
Ancestors
Subclasses
Class variables
var Magic
class InvalidMagic (who, magic)
-
Inappropriate argument value (of correct type).
Expand source code Browse git
class InvalidMagic(ValueError): def __init__(self, who: ChmStruct, magic: memoryview): super().__init__( F'Invalid {who.__class__.__name__} signature {magic.hex(":").upper()}, ' F'should be {who.Magic.hex(":").upper()}.')
Ancestors
- builtins.ValueError
- builtins.Exception
- builtins.BaseException
class ChmHeader (reader)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class ChmHeader(ChmStruct, LanguageMixin): Magic = B'ITSF' Guid = UUID('7C01FD10-7BAA-11D0-9E0C-00A0-C922-E6EC') def __init__(self, reader: StructReader[memoryview]): self._check_magic(reader) self.signature = reader.read_bytes(4) self.version = v = reader.u32() if v < 3: raise NotImplementedError(F'Parsing of HelpV{v} not yet implemented.') self.header_size = reader.u32() self.unknown = reader.u32() self.timestamp = reader.u32() self.language = reader.u32() self.guid1 = reader.read_guid() self.guid2 = reader.read_guid() self.section_file_size = SectionHeader(reader) self.section_directory = SectionHeader(reader) self.content_offset = reader.u64() if v >= 3 else None
Ancestors
Class variables
var Magic
var Guid
class FileSizeHeader (reader)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class FileSizeHeader(ChmStruct): Magic = B'\xFE\x01\0\0' def __init__(self, reader: StructReader[memoryview]): self._check_magic(reader) self.signature = reader.u32() reader.skip(4) self.file_size = reader.u64() reader.skip(8)
Ancestors
Class variables
var Magic
class DirectoryHeader (reader)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class DirectoryHeader(ChmStruct, LanguageMixin): Magic = B'ITSP\x01\0\0\0' Guid = UUID('5D02926A-212E-11D0-9DF9-00A0C922E6EC') def __init__(self, reader: StructReader[memoryview]): self._check_magic(reader) self.signature = reader.read_bytes(4) self.version = reader.u32() if self.version != 1: raise NotImplementedError self.header_length_1 = reader.u32() reader.skip(4) self.chunk_size = reader.u32() self.density = reader.u32() self.tree_depth = reader.u32() self.root_chunk = reader.u32() self.listing1st = reader.u32() self.listingLst = reader.u32() reader.skip(4) self.total_chunks = reader.u32() self.language = reader.u32() if reader.read_guid() != self.Guid: raise NotImplementedError self.header_length_2 = reader.u32() reader.skip(12)
Ancestors
Class variables
var Magic
var Guid
class QuickRefArea (reader, n, count)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class QuickRefArea(Struct): def __init__(self, reader: StructReader, n: int, count: int): self.offsets = { (k * n): reader.u16() for k in range(count, 0, -1) } self.num_entries = reader.u16()
Ancestors
class DirectoryListingEntry (reader)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class DirectoryListingEntry(Struct): def __init__(self, reader: StructReader): ns = reader.read_7bit_encoded_int(64, bigendian=True) self.name = reader.read_bytes(ns).decode('utf8') self.section_index = reader.read_7bit_encoded_int(64, bigendian=True) self.offset = reader.read_7bit_encoded_int(64, bigendian=True) self.length = reader.read_7bit_encoded_int(64, bigendian=True)
Ancestors
class Chunk (reader, density)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class Chunk(ChmStruct): def __init__(self, reader: StructReader, density: int): self._check_magic(reader) self.density = density with reader.detour_from_end(-2): self.num_entries = reader.u16() self.signature = reader.read_bytes(4) self.extra_size = reader.u32() def _quick_refs(self, reader: StructReader, density: int): n = 1 + (1 << density) count = self.num_entries // n skips = reader.remaining_bytes - 2 * (count + 1) while skips < 0: count -= 1 skips += 2 reader.skip(skips) self.quickref = QuickRefArea(reader, n, count) if self.quickref.num_entries != self.num_entries: raise ValueError
Ancestors
Subclasses
class IndexChunk (reader, density)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class IndexChunk(Chunk): Magic = B'PMGI' def __init__(self, reader: StructReader, density: int): super().__init__(reader, density) self._quick_refs(reader, density)
Ancestors
Class variables
var Magic
class ListingChunk (reader, density)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class ListingChunk(Chunk): Magic = B'PMGL' def __init__(self, reader: StructReader, density: int): super().__init__(reader, density) reader.skip(4) self.nr_prev = reader.u32() self.nr_next = reader.u32() self.entries = [ DirectoryListingEntry(reader) for _ in range(self.num_entries)] self._quick_refs(reader, density)
Ancestors
Class variables
var Magic
class ContentSectionsName (reader)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class ContentSectionsName(Struct): def __init__(self, reader: StructReader): name = reader.read_length_prefixed(16, block_size=2) self.name = codecs.decode(name, 'utf-16le') if (t := reader.u16()) != 0: raise ValueError(F'Expected a zero WORD after content section name, got 0x{t:X}.') @property def path(self): return F'::DataSpace/Storage/{self.name}/' @property def path_content(self): return F'{self.path}Content' @property def path_ctrl_data(self): return F'{self.path}ControlData' @property def path_span_info(self): return F'{self.path}SpanInfo' def path_reset_table(self, guid: UUID): return F'{self.path}Transform/{{{str(guid).upper()}}}/InstanceData/ResetTable'
Ancestors
Instance variables
var path
-
Expand source code Browse git
@property def path(self): return F'::DataSpace/Storage/{self.name}/'
var path_content
-
Expand source code Browse git
@property def path_content(self): return F'{self.path}Content'
var path_ctrl_data
-
Expand source code Browse git
@property def path_ctrl_data(self): return F'{self.path}ControlData'
var path_span_info
-
Expand source code Browse git
@property def path_span_info(self): return F'{self.path}SpanInfo'
Methods
def path_reset_table(self, guid)
-
Expand source code Browse git
def path_reset_table(self, guid: UUID): return F'{self.path}Transform/{{{str(guid).upper()}}}/InstanceData/ResetTable'
class ContentSections (reader)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class ContentSections(Struct): def __init__(self, reader: StructReader): self.file_size = reader.u16() self.sections = [ContentSectionsName(reader) for _ in range(reader.u16())]
Ancestors
class ContentSectionsControlData (reader)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class ContentSectionsControlData(ChmStruct): Magic = b'LZXC' def __init__(self, reader: StructReader[memoryview]): # Number of DWORDs following 'LZXC': Must be 6 if version is 2 self.field_count = reader.u32() self._check_magic(reader) self.signature = reader.read_bytes(4) self.version = reader.u32() self.reset_interval = reader.u32() self.window_size = reader.u32() self.cache_size = reader.u32() if (unknowns := self.field_count - 5) > 0: self.extra = [reader.u32() for _ in range(unknowns)]
Ancestors
Class variables
var Magic
class ContentSectionsResetTable (reader)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class ContentSectionsResetTable(Struct): def __init__(self, reader: StructReader[memoryview]): start = reader.tell() self.version = reader.u32() if self.version not in {2, 3}: raise NotImplementedError n = reader.u32() if reader.u32() != 8: raise NotImplementedError self.header_size = reader.u32() self.size_uncompressed = reader.u64() self.size_compressed = reader.u64() self.block_size = reader.u64() if reader.tell() != self.header_size + start: raise NotImplementedError self.entries = [reader.u64() for _ in range(n)]
Ancestors
class ContentSection (offset, length, base_section=None, uncompressed=None, window_size=0, reset_interval=0, block_size=0, block_offsets=<factory>)
-
ContentSection(offset: 'int', length: 'int', base_section: 'int | None' = None, uncompressed: 'int | None' = None, window_size: 'int' = 0, reset_interval: 'int' = 0, block_size: 'int' = 0, block_offsets: 'list' =
) Expand source code Browse git
@dataclass class ContentSection: offset: int length: int base_section: int | None = None uncompressed: int | None = None window_size: int = 0 reset_interval: int = 0 block_size: int = 0 block_offsets: list = field(default_factory=list)
Instance variables
var offset
var length
var block_offsets
var base_section
var uncompressed
var window_size
var reset_interval
var block_size
class CHM (reader, *args, **kwargs)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class CHM(Struct): def read_section(self, index: int) -> memoryview: try: return self.section_data[index] except KeyError: cs = self.sections[index] if cs.base_section is None: with self.reader.detour(cs.offset): data = self.reader.read(cs.length) else: data = self.read_section(cs.base_section)[cs.offset:][:cs.length] if cs.window_size and cs.block_offsets: lzx = LzxDecoder() out = bytearray() lzx.set_params_and_alloc(cs.window_size) for nr, offset in enumerate(cs.block_offsets): if nr % cs.reset_interval == 0: lzx.keep_history = False if nr < len(cs.block_offsets) - 1: length = cs.block_offsets[nr + 1] - offset else: length = len(data) - offset out.extend(lzx.decompress(data[offset:][:length], cs.block_size)) lzx.keep_history = True data = memoryview(out) self.section_data[index] = data return data def read(self, entry: DirectoryListingEntry): data = self.read_section(entry.section_index) return data[entry.offset:][:entry.length] def seekto(self, reader: StructReader, path: str): if content := self.filesystem.get(path): reader.seekset(self.sections[content.section_index].offset + content.offset) return True else: return False def __init__(self, reader: StructReader[memoryview], *args, **kwargs): self.filesystem: dict[str, DirectoryListingEntry] = {} self.sections: list[ContentSection] = [] self.section_data: dict[int, memoryview] = {} self.reader = reader self.header = header = ChmHeader(reader) with reader.detour_absolute(header.section_file_size.offset): self.file_size = FileSizeHeader(reader) with reader.detour_absolute(header.section_directory.offset): self.directory = dh = DirectoryHeader(reader) self.index = [] d = dh.density m = dh.chunk_size self.listing: list[ListingChunk] = [] for k in range(dh.total_chunks): body = reader.read_exactly(m) if body[:4] == IndexChunk.Magic: self.index.append(IndexChunk(body, d)) continue if body[:4] == ListingChunk.Magic: if not (dh.listing1st <= k <= dh.listingLst): raise ValueError(F'Chunk {k} has magic {ListingChunk.Magic.decode()} but is out of listing range.') chunk = ListingChunk(body, d) for entry in chunk.entries: name = entry.name if name.startswith('/#') or name.startswith('/$'): name = F'/$CHM{name}' self.filesystem[name] = entry self.listing.append(chunk) continue raise ValueError(F'Unknown chunk magic: {body[:4].hex(":")}') if (co := header.content_offset) is None: co = reader.tell() reader.seekset(co) total_size = reader.remaining_bytes reader.seekset(co + self.filesystem['::DataSpace/NameList'].offset) self.content_sections = ContentSections(reader) for section in self.content_sections.sections: if section.name.lower() == 'uncompressed': self.sections.append(ContentSection(co, total_size)) continue try: content = self.filesystem[section.path_content] except KeyError as KE: raise LookupError(F'could not find content file for section {section.name}') from KE cs = ContentSection(content.offset, content.length) cs.base_section = s = content.section_index reader.seekset(self.sections[s].offset + cs.offset) if self.seekto(reader, section.path_ctrl_data): control_data = ContentSectionsControlData(reader) cs.reset_interval = control_data.reset_interval cs.window_size = 15 + int(math.log2(control_data.window_size)) if self.seekto(reader, section.path_span_info): cs.uncompressed = reader.u64() if any( self.seekto(reader, section.path_reset_table(guid)) for guid in (_LZX_CHM, _LZX_HLP) ): reset_table = ContentSectionsResetTable(reader) cs.block_offsets = reset_table.entries cs.block_size = reset_table.block_size if cs.base_section != 0: raise ValueError(F'Invalid base section {cs.base_section}') self.sections.append(cs)
Ancestors
Methods
def read_section(self, index)
-
Expand source code Browse git
def read_section(self, index: int) -> memoryview: try: return self.section_data[index] except KeyError: cs = self.sections[index] if cs.base_section is None: with self.reader.detour(cs.offset): data = self.reader.read(cs.length) else: data = self.read_section(cs.base_section)[cs.offset:][:cs.length] if cs.window_size and cs.block_offsets: lzx = LzxDecoder() out = bytearray() lzx.set_params_and_alloc(cs.window_size) for nr, offset in enumerate(cs.block_offsets): if nr % cs.reset_interval == 0: lzx.keep_history = False if nr < len(cs.block_offsets) - 1: length = cs.block_offsets[nr + 1] - offset else: length = len(data) - offset out.extend(lzx.decompress(data[offset:][:length], cs.block_size)) lzx.keep_history = True data = memoryview(out) self.section_data[index] = data return data
def read(self, entry)
-
Expand source code Browse git
def read(self, entry: DirectoryListingEntry): data = self.read_section(entry.section_index) return data[entry.offset:][:entry.length]
def seekto(self, reader, path)
-
Expand source code Browse git
def seekto(self, reader: StructReader, path: str): if content := self.filesystem.get(path): reader.seekset(self.sections[content.section_index].offset + content.offset) return True else: return False