Module refinery.lib.ole.file
Parser for Microsoft OLE2 Compound Binary Files (CFB).
Expand source code Browse git
"""
Parser for Microsoft OLE2 Compound Binary Files (CFB).
"""
from __future__ import annotations
import codecs
import datetime
import enum
import itertools
import math
import re
import struct
from typing import Any
from uuid import UUID
from refinery.lib.structures import MemoryFile, StructReader
MAGIC = b'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1'
MINIMAL_OLEFILE_SIZE = 1536
MAXREGSECT = 0xFFFFFFFA
DIFSECT = 0xFFFFFFFC # noqa
FATSECT = 0xFFFFFFFD # noqa
ENDOFCHAIN = 0xFFFFFFFE # noqa
FREESECT = 0xFFFFFFFF # noqa
MAXREGSID = 0xFFFFFFFA # noqa
NOSTREAM = 0xFFFFFFFF # noqa
class STGTY(enum.IntEnum):
EMPTY = 0 # noqa
STORAGE = 1 # noqa
STREAM = 2 # noqa
LOCKBYTES = 3 # noqa
PROPERTY = 4 # noqa
ROOT = 5 # noqa
VT_EMPTY = 0 # noqa
VT_NULL = 1 # noqa
VT_I2 = 2 # noqa
VT_I4 = 3 # noqa
VT_R4 = 4 # noqa
VT_R8 = 5 # noqa
VT_CY = 6 # noqa
VT_DATE = 7 # noqa
VT_BSTR = 8 # noqa
VT_DISPATCH = 9 # noqa
VT_ERROR = 10 # noqa
VT_BOOL = 11 # noqa
VT_VARIANT = 12 # noqa
VT_UNKNOWN = 13 # noqa
VT_DECIMAL = 14 # noqa
VT_I1 = 16 # noqa
VT_UI1 = 17 # noqa
VT_UI2 = 18 # noqa
VT_UI4 = 19 # noqa
VT_I8 = 20 # noqa
VT_UI8 = 21 # noqa
VT_INT = 22 # noqa
VT_UINT = 23 # noqa
VT_VOID = 24 # noqa
VT_HRESULT = 25 # noqa
VT_PTR = 26 # noqa
VT_SAFEARRAY = 27 # noqa
VT_CARRAY = 28 # noqa
VT_USERDEFINED = 29 # noqa
VT_LPSTR = 30 # noqa
VT_LPWSTR = 31 # noqa
VT_FILETIME = 64 # noqa
VT_BLOB = 65 # noqa
VT_STREAM = 66 # noqa
VT_STORAGE = 67 # noqa
VT_STREAMED_OBJECT = 68 # noqa
VT_STORED_OBJECT = 69 # noqa
VT_BLOB_OBJECT = 70 # noqa
VT_CF = 71 # noqa
VT_CLSID = 72 # noqa
VT_VECTOR = 0x1000 # noqa
DEFECT_UNSURE = 10 # noqa
DEFECT_POTENTIAL = 20 # noqa
DEFECT_INCORRECT = 30 # noqa
DEFECT_FATAL = 40 # noqa
_FILETIME_EPOCH = datetime.datetime(1601, 1, 1, 0, 0, 0)
class OleFileError(IOError):
pass
class NotOleFileError(OleFileError):
pass
def filetime_to_datetime(filetime: int) -> datetime.datetime | None:
if filetime <= 0:
return None
try:
return _FILETIME_EPOCH + datetime.timedelta(microseconds=filetime // 10)
except (ValueError, OverflowError):
return None
def _clsid(data: bytes | bytearray | memoryview) -> str:
if len(data) != 16 or not any(data):
return ''
if not isinstance(data, bytes):
data = bytes(data)
return str(UUID(bytes_le=data))
def is_ole_file(data: bytes | bytearray | memoryview) -> bool:
return data[:8] == MAGIC
def _i16(data, offset: int = 0) -> int:
return struct.unpack_from('<H', data, offset)[0]
def _i32(data, offset: int = 0) -> int:
return struct.unpack_from('<I', data, offset)[0]
class DirectoryEntry:
"""
Represents a single 128-byte directory entry in an OLE2 file.
"""
__slots__ = (
'sid',
'name',
'entry_type',
'color',
'sid_left',
'sid_right',
'sid_child',
'clsid',
'user_flags',
'create_time',
'modify_time',
'start',
'size',
'is_minifat',
'kids',
'kids_dict',
'used',
)
def __init__(
self,
sid: int,
data: bytes | bytearray | memoryview,
sector_size: int,
mini_stream_cutoff: int,
):
self.sid = sid
self.kids: list[DirectoryEntry] = []
self.kids_dict: dict[str, DirectoryEntry] = {}
self.used = False
entry = memoryview(data)
name_raw = bytes(entry[0:64])
name_length = _i16(entry, 64)
self.entry_type = entry[66]
self.color = entry[67]
self.sid_left = _i32(entry, 68)
self.sid_right = _i32(entry, 72)
self.sid_child = _i32(entry, 76)
self.clsid = entry[80:96]
self.user_flags = _i32(entry, 96)
self.create_time = struct.unpack_from('<Q', entry, 100)[0]
self.modify_time = struct.unpack_from('<Q', entry, 108)[0]
self.start = _i32(entry, 116)
size_low = _i32(entry, 120)
size_high = _i32(entry, 124)
if name_length > 64:
name_length = 64
name_bytes = name_raw[:name_length]
if name_bytes[-2:] == b'\x00\x00':
name_bytes = name_bytes[:-2]
try:
self.name = codecs.decode(name_bytes, 'utf-16-le')
except UnicodeDecodeError:
self.name = codecs.decode(name_bytes, 'utf-16-le', errors='replace')
if sector_size == 512:
self.size = size_low
else:
self.size = size_low + (size_high << 32)
self.is_minifat = (
self.entry_type in (STGTY.STREAM, STGTY.LOCKBYTES, STGTY.PROPERTY)
and self.size < mini_stream_cutoff
)
@property
def clsid_str(self) -> str:
return _clsid(self.clsid)
def build_storage_tree(self, entries: list[DirectoryEntry | None]):
if self.sid_child != NOSTREAM:
child = entries[self.sid_child] if self.sid_child < len(entries) else None
if child is not None:
self._walk_tree(child, entries)
self.kids.sort(key=lambda e: e.name.lower())
for kid in self.kids:
if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT):
kid.build_storage_tree(entries)
def _walk_tree(self, node: DirectoryEntry, entries: list[DirectoryEntry | None]):
if node.used:
return
node.used = True
if node.sid_left != NOSTREAM and node.sid_left < len(entries):
left = entries[node.sid_left]
if left is not None:
self._walk_tree(left, entries)
self.kids.append(node)
self.kids_dict[node.name.lower()] = node
if node.sid_right != NOSTREAM and node.sid_right < len(entries):
right = entries[node.sid_right]
if right is not None:
self._walk_tree(right, entries)
def _read_chain(
fp: MemoryFile[memoryview],
fat: list[int],
start_sector: int,
sector_size: int,
sector_offset: int,
declared_size: int,
max_sectors: int,
) -> bytearray:
if declared_size == 0:
return bytearray()
if declared_size >= 0:
nb_sectors = math.ceil(declared_size / sector_size)
else:
nb_sectors = max_sectors
result = bytearray()
sect = start_sector
visited = set()
for _ in range(nb_sectors):
if sect > MAXREGSECT:
break
if sect in visited:
break
visited.add(sect)
offset = sector_offset + sect * sector_size
fp.seek(offset)
result.extend(fp.read(sector_size))
if sect >= len(fat):
break
sect = fat[sect]
if sect == ENDOFCHAIN or sect == FREESECT:
break
if declared_size >= 0 and len(result) > declared_size:
del result[declared_size:]
return result
SUMMARY_ATTRIBS = [
None,
'codepage',
'title',
'subject',
'author',
'keywords',
'comments',
'template',
'last_saved_by',
'revision_number',
'total_edit_time',
'last_printed',
'create_time',
'last_saved_time',
'num_pages',
'num_words',
'num_chars',
'thumbnail',
'creating_application',
'security',
]
DOCSUM_ATTRIBS = [
None,
'codepage_doc',
'category',
'presentation_target',
'bytes',
'lines',
'paragraphs',
'slides',
'notes',
'hidden_slides',
'mm_clips',
'scale_crop',
'heading_pairs',
'titles_of_parts',
'manager',
'company',
'links_dirty',
'chars_with_spaces',
'unused',
'shared_doc',
'link_base',
'hlinks',
'hlinks_changed',
'version',
'dig_sig',
'content_type',
'content_status',
'language',
'doc_version',
]
class OleMetadata:
"""
Parses standard OLE metadata from SummaryInformation and DocumentSummaryInformation
property streams.
"""
def __init__(self):
for attr in SUMMARY_ATTRIBS[1:]:
setattr(self, attr, None)
for attr in DOCSUM_ATTRIBS[1:]:
setattr(self, attr, None)
def parse(self, ole: OleFile):
for stream_name, attribs in (
('\x05SummaryInformation', SUMMARY_ATTRIBS),
('\x05DocumentSummaryInformation', DOCSUM_ATTRIBS),
):
if not ole.exists(stream_name):
continue
no_conversion = [10] if attribs is SUMMARY_ATTRIBS else []
try:
props = ole.getproperties(
stream_name,
convert_time=True,
no_conversion=no_conversion,
)
except Exception:
continue
for prop_id, attr_name in enumerate(attribs):
if attr_name is None:
continue
value = props.get(prop_id)
if value is not None:
setattr(self, attr_name, value)
def dump(self) -> dict[str, Any]:
result = {}
for attr in SUMMARY_ATTRIBS[1:]:
value = getattr(self, attr, None)
if value is not None:
result[attr] = value
for attr in DOCSUM_ATTRIBS[1:]:
value = getattr(self, attr, None)
if value is not None:
result[attr] = value
return result
class OleFile:
"""
Parser for OLE2 Compound Binary Files with in-place stream writing support.
"""
def __init__(self, data: bytes | bytearray | memoryview | MemoryFile[memoryview]):
if isinstance(data, MemoryFile):
fp = data
mv = data.getbuffer()
elif isinstance(data, (bytes, bytearray)):
mv = memoryview(data)
fp = MemoryFile(mv)
elif isinstance(data, memoryview):
mv = data
fp = MemoryFile(mv)
else:
raise TypeError(
F'Expected bytes, bytearray, memoryview, or MemoryFile,'
F' got {type(data).__name__}')
if len(data) < MINIMAL_OLEFILE_SIZE:
raise NotOleFileError('Data too small to be an OLE2 file.')
if data[:8] != MAGIC:
raise NotOleFileError('Not an OLE2 file (invalid magic bytes).')
self._mv = mv
self._fp = fp
self._raise_defects_level = DEFECT_FATAL
self._metadata: OleMetadata | None = None
self._parse_header()
self._load_fat()
self._load_directory()
self._minifat: list[int] | None = None
self._ministream: bytearray | None = None
def __enter__(self):
return self
def __exit__(self, *args):
pass
def _raise_defect(self, level: int, message: str):
if level >= self._raise_defects_level:
raise OleFileError(message)
def _parse_header(self):
reader = StructReader(self._mv[:512])
reader.seekset(8)
self._header_clsid = reader.read_bytes(16)
self._minor_version = reader.u16()
self._dll_version = reader.u16()
byte_order = reader.u16()
if byte_order != 0xFFFE:
self._raise_defect(DEFECT_INCORRECT, F'Invalid byte order: {byte_order:#06x}')
sector_shift = reader.u16()
self._sector_size = 1 << sector_shift
mini_sector_shift = reader.u16()
self._mini_sector_size = 1 << mini_sector_shift
reader.seekrel(6)
if self._dll_version == 4:
self._num_dir_sectors = reader.u32()
else:
reader.seekrel(4)
self._num_dir_sectors = 0
self._num_fat_sectors = reader.u32()
self._first_dir_sector = reader.u32()
self._transaction_sig = reader.u32()
self._mini_stream_cutoff = reader.u32()
self._first_mini_fat_sector = reader.u32()
self._num_mini_fat_sectors = reader.u32()
self._first_difat_sector = reader.u32()
self._num_difat_sectors = reader.u32()
self._initial_difat: list[int] = list(
struct.unpack_from('<109I', self._mv, 76))
self._nb_sect = (len(self._mv) - self._sector_size) // self._sector_size
def _getsect(self, sect: int):
size = self._sector_size
offset = size * (sect + 1)
end = offset + size
if end > len(mv := self._mv):
if offset >= len(mv):
raise EOFError(F'Attempting to read sector at {offset:#x}, which is out of bound.')
out = bytearray(mv[offset:])
out.extend(itertools.repeat(0, end - len(mv)))
return out
return mv[offset:end]
def _load_fat(self):
fat: list[int] = []
sector_ints = self._sector_size // 4
for i in range(109):
sect_index = self._initial_difat[i]
if sect_index == FREESECT or sect_index == ENDOFCHAIN:
break
if sect_index > MAXREGSECT:
continue
sect_data = self._getsect(sect_index)
fat.extend(struct.unpack_from(F'<{sector_ints}I', sect_data))
if self._num_difat_sectors > 0:
difat_sect = self._first_difat_sector
visited_difat = set()
for _ in range(self._num_difat_sectors):
if difat_sect == ENDOFCHAIN or difat_sect == FREESECT:
break
if difat_sect in visited_difat:
break
visited_difat.add(difat_sect)
difat_data = self._getsect(difat_sect)
entries_per_difat = sector_ints - 1
entries = struct.unpack_from(F'<{entries_per_difat}I', difat_data)
for sect_index in entries:
if sect_index == FREESECT or sect_index == ENDOFCHAIN:
continue
if sect_index > MAXREGSECT:
continue
sect_data = self._getsect(sect_index)
fat.extend(struct.unpack_from(F'<{sector_ints}I', sect_data))
difat_sect = struct.unpack_from('<I', difat_data, entries_per_difat * 4)[0]
if len(fat) > self._nb_sect:
fat = fat[:self._nb_sect]
self._fat = fat
def _load_ministream(self):
if self._minifat is not None:
return
if self._first_mini_fat_sector == ENDOFCHAIN or self._num_mini_fat_sectors == 0:
self._minifat = []
self._ministream = bytearray()
return
minifat_data = _read_chain(
self._fp,
self._fat,
self._first_mini_fat_sector,
self._sector_size,
self._sector_size,
self._num_mini_fat_sectors * self._sector_size,
self._nb_sect,
)
minifat: list[int] = []
count = len(minifat_data) // 4
if count > 0:
minifat = list(struct.unpack_from(F'<{count}I', minifat_data))
root = self._root
if root.size > 0:
used_entries = root.size // self._mini_sector_size
if len(minifat) > used_entries:
minifat = minifat[:used_entries]
self._minifat = minifat
mini_data = _read_chain(
self._fp,
self._fat,
root.start,
self._sector_size,
self._sector_size,
root.size,
self._nb_sect,
)
self._ministream = mini_data
def _load_directory(self):
dir_data = _read_chain(
self._fp,
self._fat,
self._first_dir_sector,
self._sector_size,
self._sector_size,
-1,
self._nb_sect,
)
max_entries = len(dir_data) // 128
entries: list[DirectoryEntry | None] = [None] * max_entries
for sid in range(max_entries):
offset = sid * 128
chunk = dir_data[offset:offset + 128]
if len(chunk) < 128:
break
entry_type = chunk[66]
if entry_type == STGTY.EMPTY:
continue
entry = DirectoryEntry(sid, chunk, self._sector_size, self._mini_stream_cutoff)
entries[sid] = entry
if entries[0] is None:
raise OleFileError('Root directory entry not found.')
self._root = entries[0]
self._entries = entries
self._root.build_storage_tree(entries)
def _open_stream(self, entry: DirectoryEntry):
if entry.is_minifat:
self._load_ministream()
if (ms := self._ministream) is None or (mf := self._minifat) is None:
raise RuntimeError('Ministream was not read.')
ms = MemoryFile(memoryview(ms))
data = _read_chain(
ms,
mf,
entry.start,
self._mini_sector_size,
0,
entry.size,
len(self._minifat) if self._minifat else 0,
)
else:
data = _read_chain(
self._fp,
self._fat,
entry.start,
self._sector_size,
self._sector_size,
entry.size,
self._nb_sect,
)
return MemoryFile(memoryview(data))
def _find(self, filename: str) -> DirectoryEntry | None:
node = self._root
for part in re.split(r'[\\/]+', filename):
key = part.lower()
child = node.kids_dict.get(key)
if child is None:
return None
node = child
return node
def listdir(self, streams: bool = True, storages: bool = False) -> list[list[str]]:
result: list[list[str]] = []
self._list_recursive(self._root, [], result, streams, storages)
return result
def _list_recursive(
self,
node: DirectoryEntry,
path: list[str],
result: list[list[str]],
streams: bool,
storages: bool,
):
for kid in node.kids:
current_path = path + [kid.name]
if kid.entry_type == STGTY.STREAM and streams:
result.append(current_path)
elif kid.entry_type in (STGTY.STORAGE, STGTY.ROOT):
if storages:
result.append(current_path)
self._list_recursive(kid, current_path, result, streams, storages)
def openstream(self, filename: str) -> MemoryFile[memoryview]:
entry = self._find(filename)
if entry is None:
raise OleFileError(F'Stream not found: {filename!r}')
if entry.entry_type != STGTY.STREAM:
raise OleFileError(F'Not a stream: {filename!r}')
return self._open_stream(entry)
def exists(self, filename: str) -> bool:
return self._find(filename) is not None
def get_type(self, path: str) -> int:
entry = self._find(path)
if entry is None:
return STGTY.EMPTY
return entry.entry_type
def get_size(self, filename: str) -> int:
entry = self._find(filename)
if entry is None:
raise OleFileError(F'Entry not found: {filename!r}')
return entry.size
def get_rootentry_name(self) -> str:
return self._root.name
def getclsid(self, filename: str) -> str:
entry = self._find(filename)
if entry is None:
raise OleFileError(F'Entry not found: {filename!r}')
return entry.clsid_str
def getmtime(self, filename: str) -> datetime.datetime | None:
entry = self._find(filename)
if entry is None:
return None
return filetime_to_datetime(entry.modify_time)
def getctime(self, filename: str) -> datetime.datetime | None:
entry = self._find(filename)
if entry is None:
return None
return filetime_to_datetime(entry.create_time)
def getproperties(
self,
filename: str,
convert_time: bool = False,
no_conversion: list[int] | None = None,
) -> dict[int, Any]:
if no_conversion is None:
no_conversion = []
raw = self.openstream(filename).read()
if len(raw) < 28:
return {}
try:
return _parse_property_set(memoryview(raw), convert_time, no_conversion)
except Exception:
return {}
def get_metadata(self) -> OleMetadata:
if self._metadata is None:
self._metadata = OleMetadata()
self._metadata.parse(self)
return self._metadata
def write_stream(self, filename: str, data: bytes | bytearray | memoryview) -> None:
"""
Overwrite an existing stream's data in-place. The new data must be the same length as the
existing stream. The underlying buffer must be mutable (i.e. the OleFile was constructed
from a bytearray).
"""
entry = self._find(filename)
if entry is None:
raise OleFileError(F'Stream not found: {filename!r}')
if entry.entry_type != STGTY.STREAM:
raise OleFileError(F'Not a stream: {filename!r}')
if len(data) != entry.size:
raise OleFileError(F'Data length {len(data)} does not match stream size {entry.size}')
if not data:
return
if entry.is_minifat:
self._write_mini_stream(entry, data)
else:
self._write_regular_stream(entry, data)
def _write_regular_stream(self, entry: DirectoryEntry, data: bytes | bytearray | memoryview):
sect = entry.start
offset = 0
remaining = len(data)
visited: set[int] = set()
while remaining > 0 and sect <= MAXREGSECT and sect not in visited:
visited.add(sect)
chunk_size = min(self._sector_size, remaining)
file_offset = self._sector_size * (sect + 1)
self._mv[file_offset:file_offset + chunk_size] = data[offset:offset + chunk_size]
offset += chunk_size
remaining -= chunk_size
if sect < len(self._fat):
sect = self._fat[sect]
else:
break
def _write_mini_stream(self, entry: DirectoryEntry, data: bytes | bytearray | memoryview):
self._load_ministream()
if self._ministream is None or self._minifat is None:
raise RuntimeError('Ministream was not loaded.')
sect = entry.start
offset = 0
remaining = len(data)
visited: set[int] = set()
while remaining > 0 and sect <= MAXREGSECT and sect not in visited:
visited.add(sect)
chunk_size = min(self._mini_sector_size, remaining)
ms_offset = sect * self._mini_sector_size
self._ministream[ms_offset:ms_offset + chunk_size] = data[offset:offset + chunk_size]
offset += chunk_size
remaining -= chunk_size
if sect < len(self._minifat):
sect = self._minifat[sect]
else:
break
self._flush_ministream()
def _flush_ministream(self):
"""
Write the in-memory ministream back to the underlying file buffer by following the root
entry's FAT chain.
"""
if self._ministream is None:
return
root = self._root
sect = root.start
offset = 0
remaining = len(self._ministream)
visited: set[int] = set()
while remaining > 0 and sect <= MAXREGSECT and sect not in visited:
visited.add(sect)
chunk_size = min(self._sector_size, remaining)
file_offset = self._sector_size * (sect + 1)
self._mv[file_offset:file_offset + chunk_size] = \
self._ministream[offset:offset + chunk_size]
offset += chunk_size
remaining -= chunk_size
if sect < len(self._fat):
sect = self._fat[sect]
else:
break
def _parse_property_set(
data: memoryview,
convert_time: bool,
no_conversion: list[int],
) -> dict[int, Any]:
if len(data) < 28:
return {}
num_sections = _i32(data, 24)
if num_sections < 1:
return {}
section_offset = _i32(data, 44)
if section_offset >= len(data):
return {}
section_data = data[section_offset:]
if len(section_data) < 8:
return {}
num_props = _i32(section_data, 4)
props: dict[int, Any] = {}
codepage = None
for i in range(num_props):
entry_offset = 8 + i * 8
if entry_offset + 8 > len(section_data):
break
prop_id = _i32(section_data, entry_offset)
prop_offset = _i32(section_data, entry_offset + 4)
if prop_offset + 4 > len(section_data):
continue
prop_type = _i32(section_data, prop_offset) & 0xFFFF
value = _parse_property_value(
section_data, prop_offset, prop_type, prop_id,
convert_time, no_conversion, codepage)
if value is not None:
props[prop_id] = value
if prop_id == 1 and isinstance(value, int):
codepage = value
return props
def _parse_property_value(
data: memoryview,
offset: int,
prop_type: int,
prop_id: int,
convert_time: bool,
no_conversion: list[int],
codepage: int | None,
) -> Any:
base_type = prop_type & 0x0FFF
is_vector = bool(prop_type & VT_VECTOR)
if is_vector:
return _parse_vector_property(
data, offset, base_type, prop_id,
convert_time, no_conversion, codepage)
return _parse_basic_property(
data, offset + 4, base_type, prop_id,
convert_time, no_conversion, codepage)
def _parse_vector_property(
data: memoryview,
offset: int,
base_type: int,
prop_id: int,
convert_time: bool,
no_conversion: list[int],
codepage: int | None,
) -> list | None:
value_offset = offset + 4
if value_offset + 4 > len(data):
return None
count = _i32(data, value_offset)
value_offset += 4
result = []
for _ in range(count):
if base_type == VT_VARIANT:
if value_offset + 4 > len(data):
break
variant_type = _i32(data, value_offset) & 0xFFFF
val = _parse_basic_property(
data, value_offset + 4, variant_type, prop_id,
convert_time, no_conversion, codepage)
size = _property_size(data, value_offset + 4, variant_type, codepage)
value_offset += 4 + size
else:
val = _parse_basic_property(
data, value_offset, base_type, prop_id,
convert_time, no_conversion, codepage)
size = _property_size(data, value_offset, base_type, codepage)
value_offset += size
result.append(val)
pad = (4 - (value_offset % 4)) % 4
value_offset += pad
return result
def _property_size(
data: bytes | bytearray | memoryview,
offset: int,
vt: int,
codepage: int | None,
) -> int:
if vt in (VT_I2, VT_UI2, VT_BOOL):
return 2
if vt in (VT_I4, VT_UI4, VT_INT, VT_UINT, VT_ERROR, VT_R4):
return 4
if vt in (VT_I8, VT_UI8, VT_R8, VT_CY, VT_FILETIME):
return 8
if vt == VT_UI1:
return 1
if vt == VT_CLSID:
return 16
if vt in (VT_BSTR, VT_LPSTR, VT_BLOB, VT_CF):
if offset + 4 > len(data):
return 4
length = _i32(data, offset)
return 4 + length
if vt == VT_LPWSTR:
if offset + 4 > len(data):
return 4
char_count = _i32(data, offset)
return 4 + char_count * 2
return 0
def _parse_basic_property(
data: memoryview,
offset: int,
vt: int,
prop_id: int,
convert_time: bool,
no_conversion: list[int],
codepage: int | None,
) -> Any:
def _remove_trailing_nullbytes(m: memoryview):
end = len(m)
for end in range(end, 0, -1):
if m[end - 1]:
break
return m[:end]
if vt in (VT_EMPTY, VT_NULL):
return None
if vt == VT_I2:
if offset + 2 > len(data):
return None
val = _i16(data, offset)
if val >= 0x8000:
val -= 0x10000
return val
if vt == VT_UI2:
if offset + 2 > len(data):
return None
return _i16(data, offset)
if vt in (VT_I4, VT_INT, VT_ERROR):
if offset + 4 > len(data):
return None
val = _i32(data, offset)
if vt != VT_ERROR and val >= 0x80000000:
val -= 0x100000000
return val
if vt in (VT_UI4, VT_UINT):
if offset + 4 > len(data):
return None
return _i32(data, offset)
if vt == VT_I8:
if offset + 8 > len(data):
return None
return struct.unpack_from('<q', data, offset)[0]
if vt == VT_UI8:
if offset + 8 > len(data):
return None
return struct.unpack_from('<Q', data, offset)[0]
if vt == VT_R4:
if offset + 4 > len(data):
return None
return struct.unpack_from('<f', data, offset)[0]
if vt == VT_R8:
if offset + 8 > len(data):
return None
return struct.unpack_from('<d', data, offset)[0]
if vt == VT_BOOL:
if offset + 2 > len(data):
return None
return bool(_i16(data, offset))
if vt in (VT_BSTR, VT_LPSTR):
if (so := offset + 4) > len(data):
return None
length = _i32(data, offset)
if (end := so + length) > len(data):
length = len(data) - so
raw = _remove_trailing_nullbytes(data[so:end])
if codepage is not None:
try:
codec = F'cp{codepage}' if codepage < 65535 else 'utf-8'
if codepage == 1200:
codec = 'utf-16-le'
elif codepage == 65001:
codec = 'utf-8'
return codecs.decode(raw, codec, errors='replace')
except (LookupError, UnicodeDecodeError):
return codecs.decode(raw, 'latin-1', errors='replace')
return codecs.decode(raw, 'latin-1', errors='replace')
if vt == VT_LPWSTR:
if (so := offset + 4) > len(data):
return None
length = _i32(data, offset) * 2
if (end := so + length) > len(data):
length = len(data) - so
raw = _remove_trailing_nullbytes(data[so:end])
return codecs.decode(raw, 'utf-16-le', errors='replace').rstrip('\x00')
if vt == VT_FILETIME:
if offset + 8 > len(data):
return None
low = _i32(data, offset)
high = _i32(data, offset + 4)
filetime = low + (high << 32)
if convert_time and prop_id not in no_conversion:
return filetime_to_datetime(filetime)
return filetime
if vt == VT_UI1:
if offset >= len(data):
return None
return data[offset]
if vt == VT_CLSID:
if offset + 16 > len(data):
return None
return _clsid(data[offset:offset + 16])
if vt in (VT_BLOB, VT_CF):
if offset + 4 > len(data):
return None
length = _i32(data, offset)
if offset + 4 + length > len(data):
length = len(data) - offset - 4
return data[offset + 4:offset + 4 + length]
return None
Functions
def filetime_to_datetime(filetime)-
Expand source code Browse git
def filetime_to_datetime(filetime: int) -> datetime.datetime | None: if filetime <= 0: return None try: return _FILETIME_EPOCH + datetime.timedelta(microseconds=filetime // 10) except (ValueError, OverflowError): return None def is_ole_file(data)-
Expand source code Browse git
def is_ole_file(data: bytes | bytearray | memoryview) -> bool: return data[:8] == MAGIC
Classes
class STGTY (*args, **kwds)-
Enum where members are also (and must be) ints
Expand source code Browse git
class STGTY(enum.IntEnum): EMPTY = 0 # noqa STORAGE = 1 # noqa STREAM = 2 # noqa LOCKBYTES = 3 # noqa PROPERTY = 4 # noqa ROOT = 5 # noqaAncestors
- enum.IntEnum
- builtins.int
- enum.ReprEnum
- enum.Enum
Class variables
var EMPTY-
The type of the None singleton.
var STORAGE-
The type of the None singleton.
var STREAM-
The type of the None singleton.
var LOCKBYTES-
The type of the None singleton.
var PROPERTY-
The type of the None singleton.
var ROOT-
The type of the None singleton.
class OleFileError (*args, **kwargs)-
Base class for I/O related errors.
Expand source code Browse git
class OleFileError(IOError): passAncestors
- builtins.OSError
- builtins.Exception
- builtins.BaseException
Subclasses
class NotOleFileError (*args, **kwargs)-
Base class for I/O related errors.
Expand source code Browse git
class NotOleFileError(OleFileError): passAncestors
- OleFileError
- builtins.OSError
- builtins.Exception
- builtins.BaseException
class DirectoryEntry (sid, data, sector_size, mini_stream_cutoff)-
Represents a single 128-byte directory entry in an OLE2 file.
Expand source code Browse git
class DirectoryEntry: """ Represents a single 128-byte directory entry in an OLE2 file. """ __slots__ = ( 'sid', 'name', 'entry_type', 'color', 'sid_left', 'sid_right', 'sid_child', 'clsid', 'user_flags', 'create_time', 'modify_time', 'start', 'size', 'is_minifat', 'kids', 'kids_dict', 'used', ) def __init__( self, sid: int, data: bytes | bytearray | memoryview, sector_size: int, mini_stream_cutoff: int, ): self.sid = sid self.kids: list[DirectoryEntry] = [] self.kids_dict: dict[str, DirectoryEntry] = {} self.used = False entry = memoryview(data) name_raw = bytes(entry[0:64]) name_length = _i16(entry, 64) self.entry_type = entry[66] self.color = entry[67] self.sid_left = _i32(entry, 68) self.sid_right = _i32(entry, 72) self.sid_child = _i32(entry, 76) self.clsid = entry[80:96] self.user_flags = _i32(entry, 96) self.create_time = struct.unpack_from('<Q', entry, 100)[0] self.modify_time = struct.unpack_from('<Q', entry, 108)[0] self.start = _i32(entry, 116) size_low = _i32(entry, 120) size_high = _i32(entry, 124) if name_length > 64: name_length = 64 name_bytes = name_raw[:name_length] if name_bytes[-2:] == b'\x00\x00': name_bytes = name_bytes[:-2] try: self.name = codecs.decode(name_bytes, 'utf-16-le') except UnicodeDecodeError: self.name = codecs.decode(name_bytes, 'utf-16-le', errors='replace') if sector_size == 512: self.size = size_low else: self.size = size_low + (size_high << 32) self.is_minifat = ( self.entry_type in (STGTY.STREAM, STGTY.LOCKBYTES, STGTY.PROPERTY) and self.size < mini_stream_cutoff ) @property def clsid_str(self) -> str: return _clsid(self.clsid) def build_storage_tree(self, entries: list[DirectoryEntry | None]): if self.sid_child != NOSTREAM: child = entries[self.sid_child] if self.sid_child < len(entries) else None if child is not None: self._walk_tree(child, entries) self.kids.sort(key=lambda e: e.name.lower()) for kid in self.kids: if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): kid.build_storage_tree(entries) def _walk_tree(self, node: DirectoryEntry, entries: list[DirectoryEntry | None]): if node.used: return node.used = True if node.sid_left != NOSTREAM and node.sid_left < len(entries): left = entries[node.sid_left] if left is not None: self._walk_tree(left, entries) self.kids.append(node) self.kids_dict[node.name.lower()] = node if node.sid_right != NOSTREAM and node.sid_right < len(entries): right = entries[node.sid_right] if right is not None: self._walk_tree(right, entries)Instance variables
var clsid_str-
Expand source code Browse git
@property def clsid_str(self) -> str: return _clsid(self.clsid) var clsid-
Expand source code Browse git
class DirectoryEntry: """ Represents a single 128-byte directory entry in an OLE2 file. """ __slots__ = ( 'sid', 'name', 'entry_type', 'color', 'sid_left', 'sid_right', 'sid_child', 'clsid', 'user_flags', 'create_time', 'modify_time', 'start', 'size', 'is_minifat', 'kids', 'kids_dict', 'used', ) def __init__( self, sid: int, data: bytes | bytearray | memoryview, sector_size: int, mini_stream_cutoff: int, ): self.sid = sid self.kids: list[DirectoryEntry] = [] self.kids_dict: dict[str, DirectoryEntry] = {} self.used = False entry = memoryview(data) name_raw = bytes(entry[0:64]) name_length = _i16(entry, 64) self.entry_type = entry[66] self.color = entry[67] self.sid_left = _i32(entry, 68) self.sid_right = _i32(entry, 72) self.sid_child = _i32(entry, 76) self.clsid = entry[80:96] self.user_flags = _i32(entry, 96) self.create_time = struct.unpack_from('<Q', entry, 100)[0] self.modify_time = struct.unpack_from('<Q', entry, 108)[0] self.start = _i32(entry, 116) size_low = _i32(entry, 120) size_high = _i32(entry, 124) if name_length > 64: name_length = 64 name_bytes = name_raw[:name_length] if name_bytes[-2:] == b'\x00\x00': name_bytes = name_bytes[:-2] try: self.name = codecs.decode(name_bytes, 'utf-16-le') except UnicodeDecodeError: self.name = codecs.decode(name_bytes, 'utf-16-le', errors='replace') if sector_size == 512: self.size = size_low else: self.size = size_low + (size_high << 32) self.is_minifat = ( self.entry_type in (STGTY.STREAM, STGTY.LOCKBYTES, STGTY.PROPERTY) and self.size < mini_stream_cutoff ) @property def clsid_str(self) -> str: return _clsid(self.clsid) def build_storage_tree(self, entries: list[DirectoryEntry | None]): if self.sid_child != NOSTREAM: child = entries[self.sid_child] if self.sid_child < len(entries) else None if child is not None: self._walk_tree(child, entries) self.kids.sort(key=lambda e: e.name.lower()) for kid in self.kids: if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): kid.build_storage_tree(entries) def _walk_tree(self, node: DirectoryEntry, entries: list[DirectoryEntry | None]): if node.used: return node.used = True if node.sid_left != NOSTREAM and node.sid_left < len(entries): left = entries[node.sid_left] if left is not None: self._walk_tree(left, entries) self.kids.append(node) self.kids_dict[node.name.lower()] = node if node.sid_right != NOSTREAM and node.sid_right < len(entries): right = entries[node.sid_right] if right is not None: self._walk_tree(right, entries) var color-
Expand source code Browse git
class DirectoryEntry: """ Represents a single 128-byte directory entry in an OLE2 file. """ __slots__ = ( 'sid', 'name', 'entry_type', 'color', 'sid_left', 'sid_right', 'sid_child', 'clsid', 'user_flags', 'create_time', 'modify_time', 'start', 'size', 'is_minifat', 'kids', 'kids_dict', 'used', ) def __init__( self, sid: int, data: bytes | bytearray | memoryview, sector_size: int, mini_stream_cutoff: int, ): self.sid = sid self.kids: list[DirectoryEntry] = [] self.kids_dict: dict[str, DirectoryEntry] = {} self.used = False entry = memoryview(data) name_raw = bytes(entry[0:64]) name_length = _i16(entry, 64) self.entry_type = entry[66] self.color = entry[67] self.sid_left = _i32(entry, 68) self.sid_right = _i32(entry, 72) self.sid_child = _i32(entry, 76) self.clsid = entry[80:96] self.user_flags = _i32(entry, 96) self.create_time = struct.unpack_from('<Q', entry, 100)[0] self.modify_time = struct.unpack_from('<Q', entry, 108)[0] self.start = _i32(entry, 116) size_low = _i32(entry, 120) size_high = _i32(entry, 124) if name_length > 64: name_length = 64 name_bytes = name_raw[:name_length] if name_bytes[-2:] == b'\x00\x00': name_bytes = name_bytes[:-2] try: self.name = codecs.decode(name_bytes, 'utf-16-le') except UnicodeDecodeError: self.name = codecs.decode(name_bytes, 'utf-16-le', errors='replace') if sector_size == 512: self.size = size_low else: self.size = size_low + (size_high << 32) self.is_minifat = ( self.entry_type in (STGTY.STREAM, STGTY.LOCKBYTES, STGTY.PROPERTY) and self.size < mini_stream_cutoff ) @property def clsid_str(self) -> str: return _clsid(self.clsid) def build_storage_tree(self, entries: list[DirectoryEntry | None]): if self.sid_child != NOSTREAM: child = entries[self.sid_child] if self.sid_child < len(entries) else None if child is not None: self._walk_tree(child, entries) self.kids.sort(key=lambda e: e.name.lower()) for kid in self.kids: if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): kid.build_storage_tree(entries) def _walk_tree(self, node: DirectoryEntry, entries: list[DirectoryEntry | None]): if node.used: return node.used = True if node.sid_left != NOSTREAM and node.sid_left < len(entries): left = entries[node.sid_left] if left is not None: self._walk_tree(left, entries) self.kids.append(node) self.kids_dict[node.name.lower()] = node if node.sid_right != NOSTREAM and node.sid_right < len(entries): right = entries[node.sid_right] if right is not None: self._walk_tree(right, entries) var create_time-
Expand source code Browse git
class DirectoryEntry: """ Represents a single 128-byte directory entry in an OLE2 file. """ __slots__ = ( 'sid', 'name', 'entry_type', 'color', 'sid_left', 'sid_right', 'sid_child', 'clsid', 'user_flags', 'create_time', 'modify_time', 'start', 'size', 'is_minifat', 'kids', 'kids_dict', 'used', ) def __init__( self, sid: int, data: bytes | bytearray | memoryview, sector_size: int, mini_stream_cutoff: int, ): self.sid = sid self.kids: list[DirectoryEntry] = [] self.kids_dict: dict[str, DirectoryEntry] = {} self.used = False entry = memoryview(data) name_raw = bytes(entry[0:64]) name_length = _i16(entry, 64) self.entry_type = entry[66] self.color = entry[67] self.sid_left = _i32(entry, 68) self.sid_right = _i32(entry, 72) self.sid_child = _i32(entry, 76) self.clsid = entry[80:96] self.user_flags = _i32(entry, 96) self.create_time = struct.unpack_from('<Q', entry, 100)[0] self.modify_time = struct.unpack_from('<Q', entry, 108)[0] self.start = _i32(entry, 116) size_low = _i32(entry, 120) size_high = _i32(entry, 124) if name_length > 64: name_length = 64 name_bytes = name_raw[:name_length] if name_bytes[-2:] == b'\x00\x00': name_bytes = name_bytes[:-2] try: self.name = codecs.decode(name_bytes, 'utf-16-le') except UnicodeDecodeError: self.name = codecs.decode(name_bytes, 'utf-16-le', errors='replace') if sector_size == 512: self.size = size_low else: self.size = size_low + (size_high << 32) self.is_minifat = ( self.entry_type in (STGTY.STREAM, STGTY.LOCKBYTES, STGTY.PROPERTY) and self.size < mini_stream_cutoff ) @property def clsid_str(self) -> str: return _clsid(self.clsid) def build_storage_tree(self, entries: list[DirectoryEntry | None]): if self.sid_child != NOSTREAM: child = entries[self.sid_child] if self.sid_child < len(entries) else None if child is not None: self._walk_tree(child, entries) self.kids.sort(key=lambda e: e.name.lower()) for kid in self.kids: if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): kid.build_storage_tree(entries) def _walk_tree(self, node: DirectoryEntry, entries: list[DirectoryEntry | None]): if node.used: return node.used = True if node.sid_left != NOSTREAM and node.sid_left < len(entries): left = entries[node.sid_left] if left is not None: self._walk_tree(left, entries) self.kids.append(node) self.kids_dict[node.name.lower()] = node if node.sid_right != NOSTREAM and node.sid_right < len(entries): right = entries[node.sid_right] if right is not None: self._walk_tree(right, entries) var entry_type-
Expand source code Browse git
class DirectoryEntry: """ Represents a single 128-byte directory entry in an OLE2 file. """ __slots__ = ( 'sid', 'name', 'entry_type', 'color', 'sid_left', 'sid_right', 'sid_child', 'clsid', 'user_flags', 'create_time', 'modify_time', 'start', 'size', 'is_minifat', 'kids', 'kids_dict', 'used', ) def __init__( self, sid: int, data: bytes | bytearray | memoryview, sector_size: int, mini_stream_cutoff: int, ): self.sid = sid self.kids: list[DirectoryEntry] = [] self.kids_dict: dict[str, DirectoryEntry] = {} self.used = False entry = memoryview(data) name_raw = bytes(entry[0:64]) name_length = _i16(entry, 64) self.entry_type = entry[66] self.color = entry[67] self.sid_left = _i32(entry, 68) self.sid_right = _i32(entry, 72) self.sid_child = _i32(entry, 76) self.clsid = entry[80:96] self.user_flags = _i32(entry, 96) self.create_time = struct.unpack_from('<Q', entry, 100)[0] self.modify_time = struct.unpack_from('<Q', entry, 108)[0] self.start = _i32(entry, 116) size_low = _i32(entry, 120) size_high = _i32(entry, 124) if name_length > 64: name_length = 64 name_bytes = name_raw[:name_length] if name_bytes[-2:] == b'\x00\x00': name_bytes = name_bytes[:-2] try: self.name = codecs.decode(name_bytes, 'utf-16-le') except UnicodeDecodeError: self.name = codecs.decode(name_bytes, 'utf-16-le', errors='replace') if sector_size == 512: self.size = size_low else: self.size = size_low + (size_high << 32) self.is_minifat = ( self.entry_type in (STGTY.STREAM, STGTY.LOCKBYTES, STGTY.PROPERTY) and self.size < mini_stream_cutoff ) @property def clsid_str(self) -> str: return _clsid(self.clsid) def build_storage_tree(self, entries: list[DirectoryEntry | None]): if self.sid_child != NOSTREAM: child = entries[self.sid_child] if self.sid_child < len(entries) else None if child is not None: self._walk_tree(child, entries) self.kids.sort(key=lambda e: e.name.lower()) for kid in self.kids: if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): kid.build_storage_tree(entries) def _walk_tree(self, node: DirectoryEntry, entries: list[DirectoryEntry | None]): if node.used: return node.used = True if node.sid_left != NOSTREAM and node.sid_left < len(entries): left = entries[node.sid_left] if left is not None: self._walk_tree(left, entries) self.kids.append(node) self.kids_dict[node.name.lower()] = node if node.sid_right != NOSTREAM and node.sid_right < len(entries): right = entries[node.sid_right] if right is not None: self._walk_tree(right, entries) var is_minifat-
Expand source code Browse git
class DirectoryEntry: """ Represents a single 128-byte directory entry in an OLE2 file. """ __slots__ = ( 'sid', 'name', 'entry_type', 'color', 'sid_left', 'sid_right', 'sid_child', 'clsid', 'user_flags', 'create_time', 'modify_time', 'start', 'size', 'is_minifat', 'kids', 'kids_dict', 'used', ) def __init__( self, sid: int, data: bytes | bytearray | memoryview, sector_size: int, mini_stream_cutoff: int, ): self.sid = sid self.kids: list[DirectoryEntry] = [] self.kids_dict: dict[str, DirectoryEntry] = {} self.used = False entry = memoryview(data) name_raw = bytes(entry[0:64]) name_length = _i16(entry, 64) self.entry_type = entry[66] self.color = entry[67] self.sid_left = _i32(entry, 68) self.sid_right = _i32(entry, 72) self.sid_child = _i32(entry, 76) self.clsid = entry[80:96] self.user_flags = _i32(entry, 96) self.create_time = struct.unpack_from('<Q', entry, 100)[0] self.modify_time = struct.unpack_from('<Q', entry, 108)[0] self.start = _i32(entry, 116) size_low = _i32(entry, 120) size_high = _i32(entry, 124) if name_length > 64: name_length = 64 name_bytes = name_raw[:name_length] if name_bytes[-2:] == b'\x00\x00': name_bytes = name_bytes[:-2] try: self.name = codecs.decode(name_bytes, 'utf-16-le') except UnicodeDecodeError: self.name = codecs.decode(name_bytes, 'utf-16-le', errors='replace') if sector_size == 512: self.size = size_low else: self.size = size_low + (size_high << 32) self.is_minifat = ( self.entry_type in (STGTY.STREAM, STGTY.LOCKBYTES, STGTY.PROPERTY) and self.size < mini_stream_cutoff ) @property def clsid_str(self) -> str: return _clsid(self.clsid) def build_storage_tree(self, entries: list[DirectoryEntry | None]): if self.sid_child != NOSTREAM: child = entries[self.sid_child] if self.sid_child < len(entries) else None if child is not None: self._walk_tree(child, entries) self.kids.sort(key=lambda e: e.name.lower()) for kid in self.kids: if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): kid.build_storage_tree(entries) def _walk_tree(self, node: DirectoryEntry, entries: list[DirectoryEntry | None]): if node.used: return node.used = True if node.sid_left != NOSTREAM and node.sid_left < len(entries): left = entries[node.sid_left] if left is not None: self._walk_tree(left, entries) self.kids.append(node) self.kids_dict[node.name.lower()] = node if node.sid_right != NOSTREAM and node.sid_right < len(entries): right = entries[node.sid_right] if right is not None: self._walk_tree(right, entries) var kids-
Expand source code Browse git
class DirectoryEntry: """ Represents a single 128-byte directory entry in an OLE2 file. """ __slots__ = ( 'sid', 'name', 'entry_type', 'color', 'sid_left', 'sid_right', 'sid_child', 'clsid', 'user_flags', 'create_time', 'modify_time', 'start', 'size', 'is_minifat', 'kids', 'kids_dict', 'used', ) def __init__( self, sid: int, data: bytes | bytearray | memoryview, sector_size: int, mini_stream_cutoff: int, ): self.sid = sid self.kids: list[DirectoryEntry] = [] self.kids_dict: dict[str, DirectoryEntry] = {} self.used = False entry = memoryview(data) name_raw = bytes(entry[0:64]) name_length = _i16(entry, 64) self.entry_type = entry[66] self.color = entry[67] self.sid_left = _i32(entry, 68) self.sid_right = _i32(entry, 72) self.sid_child = _i32(entry, 76) self.clsid = entry[80:96] self.user_flags = _i32(entry, 96) self.create_time = struct.unpack_from('<Q', entry, 100)[0] self.modify_time = struct.unpack_from('<Q', entry, 108)[0] self.start = _i32(entry, 116) size_low = _i32(entry, 120) size_high = _i32(entry, 124) if name_length > 64: name_length = 64 name_bytes = name_raw[:name_length] if name_bytes[-2:] == b'\x00\x00': name_bytes = name_bytes[:-2] try: self.name = codecs.decode(name_bytes, 'utf-16-le') except UnicodeDecodeError: self.name = codecs.decode(name_bytes, 'utf-16-le', errors='replace') if sector_size == 512: self.size = size_low else: self.size = size_low + (size_high << 32) self.is_minifat = ( self.entry_type in (STGTY.STREAM, STGTY.LOCKBYTES, STGTY.PROPERTY) and self.size < mini_stream_cutoff ) @property def clsid_str(self) -> str: return _clsid(self.clsid) def build_storage_tree(self, entries: list[DirectoryEntry | None]): if self.sid_child != NOSTREAM: child = entries[self.sid_child] if self.sid_child < len(entries) else None if child is not None: self._walk_tree(child, entries) self.kids.sort(key=lambda e: e.name.lower()) for kid in self.kids: if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): kid.build_storage_tree(entries) def _walk_tree(self, node: DirectoryEntry, entries: list[DirectoryEntry | None]): if node.used: return node.used = True if node.sid_left != NOSTREAM and node.sid_left < len(entries): left = entries[node.sid_left] if left is not None: self._walk_tree(left, entries) self.kids.append(node) self.kids_dict[node.name.lower()] = node if node.sid_right != NOSTREAM and node.sid_right < len(entries): right = entries[node.sid_right] if right is not None: self._walk_tree(right, entries) var kids_dict-
Expand source code Browse git
class DirectoryEntry: """ Represents a single 128-byte directory entry in an OLE2 file. """ __slots__ = ( 'sid', 'name', 'entry_type', 'color', 'sid_left', 'sid_right', 'sid_child', 'clsid', 'user_flags', 'create_time', 'modify_time', 'start', 'size', 'is_minifat', 'kids', 'kids_dict', 'used', ) def __init__( self, sid: int, data: bytes | bytearray | memoryview, sector_size: int, mini_stream_cutoff: int, ): self.sid = sid self.kids: list[DirectoryEntry] = [] self.kids_dict: dict[str, DirectoryEntry] = {} self.used = False entry = memoryview(data) name_raw = bytes(entry[0:64]) name_length = _i16(entry, 64) self.entry_type = entry[66] self.color = entry[67] self.sid_left = _i32(entry, 68) self.sid_right = _i32(entry, 72) self.sid_child = _i32(entry, 76) self.clsid = entry[80:96] self.user_flags = _i32(entry, 96) self.create_time = struct.unpack_from('<Q', entry, 100)[0] self.modify_time = struct.unpack_from('<Q', entry, 108)[0] self.start = _i32(entry, 116) size_low = _i32(entry, 120) size_high = _i32(entry, 124) if name_length > 64: name_length = 64 name_bytes = name_raw[:name_length] if name_bytes[-2:] == b'\x00\x00': name_bytes = name_bytes[:-2] try: self.name = codecs.decode(name_bytes, 'utf-16-le') except UnicodeDecodeError: self.name = codecs.decode(name_bytes, 'utf-16-le', errors='replace') if sector_size == 512: self.size = size_low else: self.size = size_low + (size_high << 32) self.is_minifat = ( self.entry_type in (STGTY.STREAM, STGTY.LOCKBYTES, STGTY.PROPERTY) and self.size < mini_stream_cutoff ) @property def clsid_str(self) -> str: return _clsid(self.clsid) def build_storage_tree(self, entries: list[DirectoryEntry | None]): if self.sid_child != NOSTREAM: child = entries[self.sid_child] if self.sid_child < len(entries) else None if child is not None: self._walk_tree(child, entries) self.kids.sort(key=lambda e: e.name.lower()) for kid in self.kids: if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): kid.build_storage_tree(entries) def _walk_tree(self, node: DirectoryEntry, entries: list[DirectoryEntry | None]): if node.used: return node.used = True if node.sid_left != NOSTREAM and node.sid_left < len(entries): left = entries[node.sid_left] if left is not None: self._walk_tree(left, entries) self.kids.append(node) self.kids_dict[node.name.lower()] = node if node.sid_right != NOSTREAM and node.sid_right < len(entries): right = entries[node.sid_right] if right is not None: self._walk_tree(right, entries) var modify_time-
Expand source code Browse git
class DirectoryEntry: """ Represents a single 128-byte directory entry in an OLE2 file. """ __slots__ = ( 'sid', 'name', 'entry_type', 'color', 'sid_left', 'sid_right', 'sid_child', 'clsid', 'user_flags', 'create_time', 'modify_time', 'start', 'size', 'is_minifat', 'kids', 'kids_dict', 'used', ) def __init__( self, sid: int, data: bytes | bytearray | memoryview, sector_size: int, mini_stream_cutoff: int, ): self.sid = sid self.kids: list[DirectoryEntry] = [] self.kids_dict: dict[str, DirectoryEntry] = {} self.used = False entry = memoryview(data) name_raw = bytes(entry[0:64]) name_length = _i16(entry, 64) self.entry_type = entry[66] self.color = entry[67] self.sid_left = _i32(entry, 68) self.sid_right = _i32(entry, 72) self.sid_child = _i32(entry, 76) self.clsid = entry[80:96] self.user_flags = _i32(entry, 96) self.create_time = struct.unpack_from('<Q', entry, 100)[0] self.modify_time = struct.unpack_from('<Q', entry, 108)[0] self.start = _i32(entry, 116) size_low = _i32(entry, 120) size_high = _i32(entry, 124) if name_length > 64: name_length = 64 name_bytes = name_raw[:name_length] if name_bytes[-2:] == b'\x00\x00': name_bytes = name_bytes[:-2] try: self.name = codecs.decode(name_bytes, 'utf-16-le') except UnicodeDecodeError: self.name = codecs.decode(name_bytes, 'utf-16-le', errors='replace') if sector_size == 512: self.size = size_low else: self.size = size_low + (size_high << 32) self.is_minifat = ( self.entry_type in (STGTY.STREAM, STGTY.LOCKBYTES, STGTY.PROPERTY) and self.size < mini_stream_cutoff ) @property def clsid_str(self) -> str: return _clsid(self.clsid) def build_storage_tree(self, entries: list[DirectoryEntry | None]): if self.sid_child != NOSTREAM: child = entries[self.sid_child] if self.sid_child < len(entries) else None if child is not None: self._walk_tree(child, entries) self.kids.sort(key=lambda e: e.name.lower()) for kid in self.kids: if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): kid.build_storage_tree(entries) def _walk_tree(self, node: DirectoryEntry, entries: list[DirectoryEntry | None]): if node.used: return node.used = True if node.sid_left != NOSTREAM and node.sid_left < len(entries): left = entries[node.sid_left] if left is not None: self._walk_tree(left, entries) self.kids.append(node) self.kids_dict[node.name.lower()] = node if node.sid_right != NOSTREAM and node.sid_right < len(entries): right = entries[node.sid_right] if right is not None: self._walk_tree(right, entries) var name-
Expand source code Browse git
class DirectoryEntry: """ Represents a single 128-byte directory entry in an OLE2 file. """ __slots__ = ( 'sid', 'name', 'entry_type', 'color', 'sid_left', 'sid_right', 'sid_child', 'clsid', 'user_flags', 'create_time', 'modify_time', 'start', 'size', 'is_minifat', 'kids', 'kids_dict', 'used', ) def __init__( self, sid: int, data: bytes | bytearray | memoryview, sector_size: int, mini_stream_cutoff: int, ): self.sid = sid self.kids: list[DirectoryEntry] = [] self.kids_dict: dict[str, DirectoryEntry] = {} self.used = False entry = memoryview(data) name_raw = bytes(entry[0:64]) name_length = _i16(entry, 64) self.entry_type = entry[66] self.color = entry[67] self.sid_left = _i32(entry, 68) self.sid_right = _i32(entry, 72) self.sid_child = _i32(entry, 76) self.clsid = entry[80:96] self.user_flags = _i32(entry, 96) self.create_time = struct.unpack_from('<Q', entry, 100)[0] self.modify_time = struct.unpack_from('<Q', entry, 108)[0] self.start = _i32(entry, 116) size_low = _i32(entry, 120) size_high = _i32(entry, 124) if name_length > 64: name_length = 64 name_bytes = name_raw[:name_length] if name_bytes[-2:] == b'\x00\x00': name_bytes = name_bytes[:-2] try: self.name = codecs.decode(name_bytes, 'utf-16-le') except UnicodeDecodeError: self.name = codecs.decode(name_bytes, 'utf-16-le', errors='replace') if sector_size == 512: self.size = size_low else: self.size = size_low + (size_high << 32) self.is_minifat = ( self.entry_type in (STGTY.STREAM, STGTY.LOCKBYTES, STGTY.PROPERTY) and self.size < mini_stream_cutoff ) @property def clsid_str(self) -> str: return _clsid(self.clsid) def build_storage_tree(self, entries: list[DirectoryEntry | None]): if self.sid_child != NOSTREAM: child = entries[self.sid_child] if self.sid_child < len(entries) else None if child is not None: self._walk_tree(child, entries) self.kids.sort(key=lambda e: e.name.lower()) for kid in self.kids: if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): kid.build_storage_tree(entries) def _walk_tree(self, node: DirectoryEntry, entries: list[DirectoryEntry | None]): if node.used: return node.used = True if node.sid_left != NOSTREAM and node.sid_left < len(entries): left = entries[node.sid_left] if left is not None: self._walk_tree(left, entries) self.kids.append(node) self.kids_dict[node.name.lower()] = node if node.sid_right != NOSTREAM and node.sid_right < len(entries): right = entries[node.sid_right] if right is not None: self._walk_tree(right, entries) var sid-
Expand source code Browse git
class DirectoryEntry: """ Represents a single 128-byte directory entry in an OLE2 file. """ __slots__ = ( 'sid', 'name', 'entry_type', 'color', 'sid_left', 'sid_right', 'sid_child', 'clsid', 'user_flags', 'create_time', 'modify_time', 'start', 'size', 'is_minifat', 'kids', 'kids_dict', 'used', ) def __init__( self, sid: int, data: bytes | bytearray | memoryview, sector_size: int, mini_stream_cutoff: int, ): self.sid = sid self.kids: list[DirectoryEntry] = [] self.kids_dict: dict[str, DirectoryEntry] = {} self.used = False entry = memoryview(data) name_raw = bytes(entry[0:64]) name_length = _i16(entry, 64) self.entry_type = entry[66] self.color = entry[67] self.sid_left = _i32(entry, 68) self.sid_right = _i32(entry, 72) self.sid_child = _i32(entry, 76) self.clsid = entry[80:96] self.user_flags = _i32(entry, 96) self.create_time = struct.unpack_from('<Q', entry, 100)[0] self.modify_time = struct.unpack_from('<Q', entry, 108)[0] self.start = _i32(entry, 116) size_low = _i32(entry, 120) size_high = _i32(entry, 124) if name_length > 64: name_length = 64 name_bytes = name_raw[:name_length] if name_bytes[-2:] == b'\x00\x00': name_bytes = name_bytes[:-2] try: self.name = codecs.decode(name_bytes, 'utf-16-le') except UnicodeDecodeError: self.name = codecs.decode(name_bytes, 'utf-16-le', errors='replace') if sector_size == 512: self.size = size_low else: self.size = size_low + (size_high << 32) self.is_minifat = ( self.entry_type in (STGTY.STREAM, STGTY.LOCKBYTES, STGTY.PROPERTY) and self.size < mini_stream_cutoff ) @property def clsid_str(self) -> str: return _clsid(self.clsid) def build_storage_tree(self, entries: list[DirectoryEntry | None]): if self.sid_child != NOSTREAM: child = entries[self.sid_child] if self.sid_child < len(entries) else None if child is not None: self._walk_tree(child, entries) self.kids.sort(key=lambda e: e.name.lower()) for kid in self.kids: if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): kid.build_storage_tree(entries) def _walk_tree(self, node: DirectoryEntry, entries: list[DirectoryEntry | None]): if node.used: return node.used = True if node.sid_left != NOSTREAM and node.sid_left < len(entries): left = entries[node.sid_left] if left is not None: self._walk_tree(left, entries) self.kids.append(node) self.kids_dict[node.name.lower()] = node if node.sid_right != NOSTREAM and node.sid_right < len(entries): right = entries[node.sid_right] if right is not None: self._walk_tree(right, entries) var sid_child-
Expand source code Browse git
class DirectoryEntry: """ Represents a single 128-byte directory entry in an OLE2 file. """ __slots__ = ( 'sid', 'name', 'entry_type', 'color', 'sid_left', 'sid_right', 'sid_child', 'clsid', 'user_flags', 'create_time', 'modify_time', 'start', 'size', 'is_minifat', 'kids', 'kids_dict', 'used', ) def __init__( self, sid: int, data: bytes | bytearray | memoryview, sector_size: int, mini_stream_cutoff: int, ): self.sid = sid self.kids: list[DirectoryEntry] = [] self.kids_dict: dict[str, DirectoryEntry] = {} self.used = False entry = memoryview(data) name_raw = bytes(entry[0:64]) name_length = _i16(entry, 64) self.entry_type = entry[66] self.color = entry[67] self.sid_left = _i32(entry, 68) self.sid_right = _i32(entry, 72) self.sid_child = _i32(entry, 76) self.clsid = entry[80:96] self.user_flags = _i32(entry, 96) self.create_time = struct.unpack_from('<Q', entry, 100)[0] self.modify_time = struct.unpack_from('<Q', entry, 108)[0] self.start = _i32(entry, 116) size_low = _i32(entry, 120) size_high = _i32(entry, 124) if name_length > 64: name_length = 64 name_bytes = name_raw[:name_length] if name_bytes[-2:] == b'\x00\x00': name_bytes = name_bytes[:-2] try: self.name = codecs.decode(name_bytes, 'utf-16-le') except UnicodeDecodeError: self.name = codecs.decode(name_bytes, 'utf-16-le', errors='replace') if sector_size == 512: self.size = size_low else: self.size = size_low + (size_high << 32) self.is_minifat = ( self.entry_type in (STGTY.STREAM, STGTY.LOCKBYTES, STGTY.PROPERTY) and self.size < mini_stream_cutoff ) @property def clsid_str(self) -> str: return _clsid(self.clsid) def build_storage_tree(self, entries: list[DirectoryEntry | None]): if self.sid_child != NOSTREAM: child = entries[self.sid_child] if self.sid_child < len(entries) else None if child is not None: self._walk_tree(child, entries) self.kids.sort(key=lambda e: e.name.lower()) for kid in self.kids: if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): kid.build_storage_tree(entries) def _walk_tree(self, node: DirectoryEntry, entries: list[DirectoryEntry | None]): if node.used: return node.used = True if node.sid_left != NOSTREAM and node.sid_left < len(entries): left = entries[node.sid_left] if left is not None: self._walk_tree(left, entries) self.kids.append(node) self.kids_dict[node.name.lower()] = node if node.sid_right != NOSTREAM and node.sid_right < len(entries): right = entries[node.sid_right] if right is not None: self._walk_tree(right, entries) var sid_left-
Expand source code Browse git
class DirectoryEntry: """ Represents a single 128-byte directory entry in an OLE2 file. """ __slots__ = ( 'sid', 'name', 'entry_type', 'color', 'sid_left', 'sid_right', 'sid_child', 'clsid', 'user_flags', 'create_time', 'modify_time', 'start', 'size', 'is_minifat', 'kids', 'kids_dict', 'used', ) def __init__( self, sid: int, data: bytes | bytearray | memoryview, sector_size: int, mini_stream_cutoff: int, ): self.sid = sid self.kids: list[DirectoryEntry] = [] self.kids_dict: dict[str, DirectoryEntry] = {} self.used = False entry = memoryview(data) name_raw = bytes(entry[0:64]) name_length = _i16(entry, 64) self.entry_type = entry[66] self.color = entry[67] self.sid_left = _i32(entry, 68) self.sid_right = _i32(entry, 72) self.sid_child = _i32(entry, 76) self.clsid = entry[80:96] self.user_flags = _i32(entry, 96) self.create_time = struct.unpack_from('<Q', entry, 100)[0] self.modify_time = struct.unpack_from('<Q', entry, 108)[0] self.start = _i32(entry, 116) size_low = _i32(entry, 120) size_high = _i32(entry, 124) if name_length > 64: name_length = 64 name_bytes = name_raw[:name_length] if name_bytes[-2:] == b'\x00\x00': name_bytes = name_bytes[:-2] try: self.name = codecs.decode(name_bytes, 'utf-16-le') except UnicodeDecodeError: self.name = codecs.decode(name_bytes, 'utf-16-le', errors='replace') if sector_size == 512: self.size = size_low else: self.size = size_low + (size_high << 32) self.is_minifat = ( self.entry_type in (STGTY.STREAM, STGTY.LOCKBYTES, STGTY.PROPERTY) and self.size < mini_stream_cutoff ) @property def clsid_str(self) -> str: return _clsid(self.clsid) def build_storage_tree(self, entries: list[DirectoryEntry | None]): if self.sid_child != NOSTREAM: child = entries[self.sid_child] if self.sid_child < len(entries) else None if child is not None: self._walk_tree(child, entries) self.kids.sort(key=lambda e: e.name.lower()) for kid in self.kids: if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): kid.build_storage_tree(entries) def _walk_tree(self, node: DirectoryEntry, entries: list[DirectoryEntry | None]): if node.used: return node.used = True if node.sid_left != NOSTREAM and node.sid_left < len(entries): left = entries[node.sid_left] if left is not None: self._walk_tree(left, entries) self.kids.append(node) self.kids_dict[node.name.lower()] = node if node.sid_right != NOSTREAM and node.sid_right < len(entries): right = entries[node.sid_right] if right is not None: self._walk_tree(right, entries) var sid_right-
Expand source code Browse git
class DirectoryEntry: """ Represents a single 128-byte directory entry in an OLE2 file. """ __slots__ = ( 'sid', 'name', 'entry_type', 'color', 'sid_left', 'sid_right', 'sid_child', 'clsid', 'user_flags', 'create_time', 'modify_time', 'start', 'size', 'is_minifat', 'kids', 'kids_dict', 'used', ) def __init__( self, sid: int, data: bytes | bytearray | memoryview, sector_size: int, mini_stream_cutoff: int, ): self.sid = sid self.kids: list[DirectoryEntry] = [] self.kids_dict: dict[str, DirectoryEntry] = {} self.used = False entry = memoryview(data) name_raw = bytes(entry[0:64]) name_length = _i16(entry, 64) self.entry_type = entry[66] self.color = entry[67] self.sid_left = _i32(entry, 68) self.sid_right = _i32(entry, 72) self.sid_child = _i32(entry, 76) self.clsid = entry[80:96] self.user_flags = _i32(entry, 96) self.create_time = struct.unpack_from('<Q', entry, 100)[0] self.modify_time = struct.unpack_from('<Q', entry, 108)[0] self.start = _i32(entry, 116) size_low = _i32(entry, 120) size_high = _i32(entry, 124) if name_length > 64: name_length = 64 name_bytes = name_raw[:name_length] if name_bytes[-2:] == b'\x00\x00': name_bytes = name_bytes[:-2] try: self.name = codecs.decode(name_bytes, 'utf-16-le') except UnicodeDecodeError: self.name = codecs.decode(name_bytes, 'utf-16-le', errors='replace') if sector_size == 512: self.size = size_low else: self.size = size_low + (size_high << 32) self.is_minifat = ( self.entry_type in (STGTY.STREAM, STGTY.LOCKBYTES, STGTY.PROPERTY) and self.size < mini_stream_cutoff ) @property def clsid_str(self) -> str: return _clsid(self.clsid) def build_storage_tree(self, entries: list[DirectoryEntry | None]): if self.sid_child != NOSTREAM: child = entries[self.sid_child] if self.sid_child < len(entries) else None if child is not None: self._walk_tree(child, entries) self.kids.sort(key=lambda e: e.name.lower()) for kid in self.kids: if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): kid.build_storage_tree(entries) def _walk_tree(self, node: DirectoryEntry, entries: list[DirectoryEntry | None]): if node.used: return node.used = True if node.sid_left != NOSTREAM and node.sid_left < len(entries): left = entries[node.sid_left] if left is not None: self._walk_tree(left, entries) self.kids.append(node) self.kids_dict[node.name.lower()] = node if node.sid_right != NOSTREAM and node.sid_right < len(entries): right = entries[node.sid_right] if right is not None: self._walk_tree(right, entries) var size-
Expand source code Browse git
class DirectoryEntry: """ Represents a single 128-byte directory entry in an OLE2 file. """ __slots__ = ( 'sid', 'name', 'entry_type', 'color', 'sid_left', 'sid_right', 'sid_child', 'clsid', 'user_flags', 'create_time', 'modify_time', 'start', 'size', 'is_minifat', 'kids', 'kids_dict', 'used', ) def __init__( self, sid: int, data: bytes | bytearray | memoryview, sector_size: int, mini_stream_cutoff: int, ): self.sid = sid self.kids: list[DirectoryEntry] = [] self.kids_dict: dict[str, DirectoryEntry] = {} self.used = False entry = memoryview(data) name_raw = bytes(entry[0:64]) name_length = _i16(entry, 64) self.entry_type = entry[66] self.color = entry[67] self.sid_left = _i32(entry, 68) self.sid_right = _i32(entry, 72) self.sid_child = _i32(entry, 76) self.clsid = entry[80:96] self.user_flags = _i32(entry, 96) self.create_time = struct.unpack_from('<Q', entry, 100)[0] self.modify_time = struct.unpack_from('<Q', entry, 108)[0] self.start = _i32(entry, 116) size_low = _i32(entry, 120) size_high = _i32(entry, 124) if name_length > 64: name_length = 64 name_bytes = name_raw[:name_length] if name_bytes[-2:] == b'\x00\x00': name_bytes = name_bytes[:-2] try: self.name = codecs.decode(name_bytes, 'utf-16-le') except UnicodeDecodeError: self.name = codecs.decode(name_bytes, 'utf-16-le', errors='replace') if sector_size == 512: self.size = size_low else: self.size = size_low + (size_high << 32) self.is_minifat = ( self.entry_type in (STGTY.STREAM, STGTY.LOCKBYTES, STGTY.PROPERTY) and self.size < mini_stream_cutoff ) @property def clsid_str(self) -> str: return _clsid(self.clsid) def build_storage_tree(self, entries: list[DirectoryEntry | None]): if self.sid_child != NOSTREAM: child = entries[self.sid_child] if self.sid_child < len(entries) else None if child is not None: self._walk_tree(child, entries) self.kids.sort(key=lambda e: e.name.lower()) for kid in self.kids: if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): kid.build_storage_tree(entries) def _walk_tree(self, node: DirectoryEntry, entries: list[DirectoryEntry | None]): if node.used: return node.used = True if node.sid_left != NOSTREAM and node.sid_left < len(entries): left = entries[node.sid_left] if left is not None: self._walk_tree(left, entries) self.kids.append(node) self.kids_dict[node.name.lower()] = node if node.sid_right != NOSTREAM and node.sid_right < len(entries): right = entries[node.sid_right] if right is not None: self._walk_tree(right, entries) var start-
Expand source code Browse git
class DirectoryEntry: """ Represents a single 128-byte directory entry in an OLE2 file. """ __slots__ = ( 'sid', 'name', 'entry_type', 'color', 'sid_left', 'sid_right', 'sid_child', 'clsid', 'user_flags', 'create_time', 'modify_time', 'start', 'size', 'is_minifat', 'kids', 'kids_dict', 'used', ) def __init__( self, sid: int, data: bytes | bytearray | memoryview, sector_size: int, mini_stream_cutoff: int, ): self.sid = sid self.kids: list[DirectoryEntry] = [] self.kids_dict: dict[str, DirectoryEntry] = {} self.used = False entry = memoryview(data) name_raw = bytes(entry[0:64]) name_length = _i16(entry, 64) self.entry_type = entry[66] self.color = entry[67] self.sid_left = _i32(entry, 68) self.sid_right = _i32(entry, 72) self.sid_child = _i32(entry, 76) self.clsid = entry[80:96] self.user_flags = _i32(entry, 96) self.create_time = struct.unpack_from('<Q', entry, 100)[0] self.modify_time = struct.unpack_from('<Q', entry, 108)[0] self.start = _i32(entry, 116) size_low = _i32(entry, 120) size_high = _i32(entry, 124) if name_length > 64: name_length = 64 name_bytes = name_raw[:name_length] if name_bytes[-2:] == b'\x00\x00': name_bytes = name_bytes[:-2] try: self.name = codecs.decode(name_bytes, 'utf-16-le') except UnicodeDecodeError: self.name = codecs.decode(name_bytes, 'utf-16-le', errors='replace') if sector_size == 512: self.size = size_low else: self.size = size_low + (size_high << 32) self.is_minifat = ( self.entry_type in (STGTY.STREAM, STGTY.LOCKBYTES, STGTY.PROPERTY) and self.size < mini_stream_cutoff ) @property def clsid_str(self) -> str: return _clsid(self.clsid) def build_storage_tree(self, entries: list[DirectoryEntry | None]): if self.sid_child != NOSTREAM: child = entries[self.sid_child] if self.sid_child < len(entries) else None if child is not None: self._walk_tree(child, entries) self.kids.sort(key=lambda e: e.name.lower()) for kid in self.kids: if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): kid.build_storage_tree(entries) def _walk_tree(self, node: DirectoryEntry, entries: list[DirectoryEntry | None]): if node.used: return node.used = True if node.sid_left != NOSTREAM and node.sid_left < len(entries): left = entries[node.sid_left] if left is not None: self._walk_tree(left, entries) self.kids.append(node) self.kids_dict[node.name.lower()] = node if node.sid_right != NOSTREAM and node.sid_right < len(entries): right = entries[node.sid_right] if right is not None: self._walk_tree(right, entries) var used-
Expand source code Browse git
class DirectoryEntry: """ Represents a single 128-byte directory entry in an OLE2 file. """ __slots__ = ( 'sid', 'name', 'entry_type', 'color', 'sid_left', 'sid_right', 'sid_child', 'clsid', 'user_flags', 'create_time', 'modify_time', 'start', 'size', 'is_minifat', 'kids', 'kids_dict', 'used', ) def __init__( self, sid: int, data: bytes | bytearray | memoryview, sector_size: int, mini_stream_cutoff: int, ): self.sid = sid self.kids: list[DirectoryEntry] = [] self.kids_dict: dict[str, DirectoryEntry] = {} self.used = False entry = memoryview(data) name_raw = bytes(entry[0:64]) name_length = _i16(entry, 64) self.entry_type = entry[66] self.color = entry[67] self.sid_left = _i32(entry, 68) self.sid_right = _i32(entry, 72) self.sid_child = _i32(entry, 76) self.clsid = entry[80:96] self.user_flags = _i32(entry, 96) self.create_time = struct.unpack_from('<Q', entry, 100)[0] self.modify_time = struct.unpack_from('<Q', entry, 108)[0] self.start = _i32(entry, 116) size_low = _i32(entry, 120) size_high = _i32(entry, 124) if name_length > 64: name_length = 64 name_bytes = name_raw[:name_length] if name_bytes[-2:] == b'\x00\x00': name_bytes = name_bytes[:-2] try: self.name = codecs.decode(name_bytes, 'utf-16-le') except UnicodeDecodeError: self.name = codecs.decode(name_bytes, 'utf-16-le', errors='replace') if sector_size == 512: self.size = size_low else: self.size = size_low + (size_high << 32) self.is_minifat = ( self.entry_type in (STGTY.STREAM, STGTY.LOCKBYTES, STGTY.PROPERTY) and self.size < mini_stream_cutoff ) @property def clsid_str(self) -> str: return _clsid(self.clsid) def build_storage_tree(self, entries: list[DirectoryEntry | None]): if self.sid_child != NOSTREAM: child = entries[self.sid_child] if self.sid_child < len(entries) else None if child is not None: self._walk_tree(child, entries) self.kids.sort(key=lambda e: e.name.lower()) for kid in self.kids: if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): kid.build_storage_tree(entries) def _walk_tree(self, node: DirectoryEntry, entries: list[DirectoryEntry | None]): if node.used: return node.used = True if node.sid_left != NOSTREAM and node.sid_left < len(entries): left = entries[node.sid_left] if left is not None: self._walk_tree(left, entries) self.kids.append(node) self.kids_dict[node.name.lower()] = node if node.sid_right != NOSTREAM and node.sid_right < len(entries): right = entries[node.sid_right] if right is not None: self._walk_tree(right, entries) var user_flags-
Expand source code Browse git
class DirectoryEntry: """ Represents a single 128-byte directory entry in an OLE2 file. """ __slots__ = ( 'sid', 'name', 'entry_type', 'color', 'sid_left', 'sid_right', 'sid_child', 'clsid', 'user_flags', 'create_time', 'modify_time', 'start', 'size', 'is_minifat', 'kids', 'kids_dict', 'used', ) def __init__( self, sid: int, data: bytes | bytearray | memoryview, sector_size: int, mini_stream_cutoff: int, ): self.sid = sid self.kids: list[DirectoryEntry] = [] self.kids_dict: dict[str, DirectoryEntry] = {} self.used = False entry = memoryview(data) name_raw = bytes(entry[0:64]) name_length = _i16(entry, 64) self.entry_type = entry[66] self.color = entry[67] self.sid_left = _i32(entry, 68) self.sid_right = _i32(entry, 72) self.sid_child = _i32(entry, 76) self.clsid = entry[80:96] self.user_flags = _i32(entry, 96) self.create_time = struct.unpack_from('<Q', entry, 100)[0] self.modify_time = struct.unpack_from('<Q', entry, 108)[0] self.start = _i32(entry, 116) size_low = _i32(entry, 120) size_high = _i32(entry, 124) if name_length > 64: name_length = 64 name_bytes = name_raw[:name_length] if name_bytes[-2:] == b'\x00\x00': name_bytes = name_bytes[:-2] try: self.name = codecs.decode(name_bytes, 'utf-16-le') except UnicodeDecodeError: self.name = codecs.decode(name_bytes, 'utf-16-le', errors='replace') if sector_size == 512: self.size = size_low else: self.size = size_low + (size_high << 32) self.is_minifat = ( self.entry_type in (STGTY.STREAM, STGTY.LOCKBYTES, STGTY.PROPERTY) and self.size < mini_stream_cutoff ) @property def clsid_str(self) -> str: return _clsid(self.clsid) def build_storage_tree(self, entries: list[DirectoryEntry | None]): if self.sid_child != NOSTREAM: child = entries[self.sid_child] if self.sid_child < len(entries) else None if child is not None: self._walk_tree(child, entries) self.kids.sort(key=lambda e: e.name.lower()) for kid in self.kids: if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): kid.build_storage_tree(entries) def _walk_tree(self, node: DirectoryEntry, entries: list[DirectoryEntry | None]): if node.used: return node.used = True if node.sid_left != NOSTREAM and node.sid_left < len(entries): left = entries[node.sid_left] if left is not None: self._walk_tree(left, entries) self.kids.append(node) self.kids_dict[node.name.lower()] = node if node.sid_right != NOSTREAM and node.sid_right < len(entries): right = entries[node.sid_right] if right is not None: self._walk_tree(right, entries)
Methods
def build_storage_tree(self, entries)-
Expand source code Browse git
def build_storage_tree(self, entries: list[DirectoryEntry | None]): if self.sid_child != NOSTREAM: child = entries[self.sid_child] if self.sid_child < len(entries) else None if child is not None: self._walk_tree(child, entries) self.kids.sort(key=lambda e: e.name.lower()) for kid in self.kids: if kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): kid.build_storage_tree(entries)
class OleMetadata-
Parses standard OLE metadata from SummaryInformation and DocumentSummaryInformation property streams.
Expand source code Browse git
class OleMetadata: """ Parses standard OLE metadata from SummaryInformation and DocumentSummaryInformation property streams. """ def __init__(self): for attr in SUMMARY_ATTRIBS[1:]: setattr(self, attr, None) for attr in DOCSUM_ATTRIBS[1:]: setattr(self, attr, None) def parse(self, ole: OleFile): for stream_name, attribs in ( ('\x05SummaryInformation', SUMMARY_ATTRIBS), ('\x05DocumentSummaryInformation', DOCSUM_ATTRIBS), ): if not ole.exists(stream_name): continue no_conversion = [10] if attribs is SUMMARY_ATTRIBS else [] try: props = ole.getproperties( stream_name, convert_time=True, no_conversion=no_conversion, ) except Exception: continue for prop_id, attr_name in enumerate(attribs): if attr_name is None: continue value = props.get(prop_id) if value is not None: setattr(self, attr_name, value) def dump(self) -> dict[str, Any]: result = {} for attr in SUMMARY_ATTRIBS[1:]: value = getattr(self, attr, None) if value is not None: result[attr] = value for attr in DOCSUM_ATTRIBS[1:]: value = getattr(self, attr, None) if value is not None: result[attr] = value return resultMethods
def parse(self, ole)-
Expand source code Browse git
def parse(self, ole: OleFile): for stream_name, attribs in ( ('\x05SummaryInformation', SUMMARY_ATTRIBS), ('\x05DocumentSummaryInformation', DOCSUM_ATTRIBS), ): if not ole.exists(stream_name): continue no_conversion = [10] if attribs is SUMMARY_ATTRIBS else [] try: props = ole.getproperties( stream_name, convert_time=True, no_conversion=no_conversion, ) except Exception: continue for prop_id, attr_name in enumerate(attribs): if attr_name is None: continue value = props.get(prop_id) if value is not None: setattr(self, attr_name, value) def dump(self)-
Expand source code Browse git
def dump(self) -> dict[str, Any]: result = {} for attr in SUMMARY_ATTRIBS[1:]: value = getattr(self, attr, None) if value is not None: result[attr] = value for attr in DOCSUM_ATTRIBS[1:]: value = getattr(self, attr, None) if value is not None: result[attr] = value return result
class OleFile (data)-
Parser for OLE2 Compound Binary Files with in-place stream writing support.
Expand source code Browse git
class OleFile: """ Parser for OLE2 Compound Binary Files with in-place stream writing support. """ def __init__(self, data: bytes | bytearray | memoryview | MemoryFile[memoryview]): if isinstance(data, MemoryFile): fp = data mv = data.getbuffer() elif isinstance(data, (bytes, bytearray)): mv = memoryview(data) fp = MemoryFile(mv) elif isinstance(data, memoryview): mv = data fp = MemoryFile(mv) else: raise TypeError( F'Expected bytes, bytearray, memoryview, or MemoryFile,' F' got {type(data).__name__}') if len(data) < MINIMAL_OLEFILE_SIZE: raise NotOleFileError('Data too small to be an OLE2 file.') if data[:8] != MAGIC: raise NotOleFileError('Not an OLE2 file (invalid magic bytes).') self._mv = mv self._fp = fp self._raise_defects_level = DEFECT_FATAL self._metadata: OleMetadata | None = None self._parse_header() self._load_fat() self._load_directory() self._minifat: list[int] | None = None self._ministream: bytearray | None = None def __enter__(self): return self def __exit__(self, *args): pass def _raise_defect(self, level: int, message: str): if level >= self._raise_defects_level: raise OleFileError(message) def _parse_header(self): reader = StructReader(self._mv[:512]) reader.seekset(8) self._header_clsid = reader.read_bytes(16) self._minor_version = reader.u16() self._dll_version = reader.u16() byte_order = reader.u16() if byte_order != 0xFFFE: self._raise_defect(DEFECT_INCORRECT, F'Invalid byte order: {byte_order:#06x}') sector_shift = reader.u16() self._sector_size = 1 << sector_shift mini_sector_shift = reader.u16() self._mini_sector_size = 1 << mini_sector_shift reader.seekrel(6) if self._dll_version == 4: self._num_dir_sectors = reader.u32() else: reader.seekrel(4) self._num_dir_sectors = 0 self._num_fat_sectors = reader.u32() self._first_dir_sector = reader.u32() self._transaction_sig = reader.u32() self._mini_stream_cutoff = reader.u32() self._first_mini_fat_sector = reader.u32() self._num_mini_fat_sectors = reader.u32() self._first_difat_sector = reader.u32() self._num_difat_sectors = reader.u32() self._initial_difat: list[int] = list( struct.unpack_from('<109I', self._mv, 76)) self._nb_sect = (len(self._mv) - self._sector_size) // self._sector_size def _getsect(self, sect: int): size = self._sector_size offset = size * (sect + 1) end = offset + size if end > len(mv := self._mv): if offset >= len(mv): raise EOFError(F'Attempting to read sector at {offset:#x}, which is out of bound.') out = bytearray(mv[offset:]) out.extend(itertools.repeat(0, end - len(mv))) return out return mv[offset:end] def _load_fat(self): fat: list[int] = [] sector_ints = self._sector_size // 4 for i in range(109): sect_index = self._initial_difat[i] if sect_index == FREESECT or sect_index == ENDOFCHAIN: break if sect_index > MAXREGSECT: continue sect_data = self._getsect(sect_index) fat.extend(struct.unpack_from(F'<{sector_ints}I', sect_data)) if self._num_difat_sectors > 0: difat_sect = self._first_difat_sector visited_difat = set() for _ in range(self._num_difat_sectors): if difat_sect == ENDOFCHAIN or difat_sect == FREESECT: break if difat_sect in visited_difat: break visited_difat.add(difat_sect) difat_data = self._getsect(difat_sect) entries_per_difat = sector_ints - 1 entries = struct.unpack_from(F'<{entries_per_difat}I', difat_data) for sect_index in entries: if sect_index == FREESECT or sect_index == ENDOFCHAIN: continue if sect_index > MAXREGSECT: continue sect_data = self._getsect(sect_index) fat.extend(struct.unpack_from(F'<{sector_ints}I', sect_data)) difat_sect = struct.unpack_from('<I', difat_data, entries_per_difat * 4)[0] if len(fat) > self._nb_sect: fat = fat[:self._nb_sect] self._fat = fat def _load_ministream(self): if self._minifat is not None: return if self._first_mini_fat_sector == ENDOFCHAIN or self._num_mini_fat_sectors == 0: self._minifat = [] self._ministream = bytearray() return minifat_data = _read_chain( self._fp, self._fat, self._first_mini_fat_sector, self._sector_size, self._sector_size, self._num_mini_fat_sectors * self._sector_size, self._nb_sect, ) minifat: list[int] = [] count = len(minifat_data) // 4 if count > 0: minifat = list(struct.unpack_from(F'<{count}I', minifat_data)) root = self._root if root.size > 0: used_entries = root.size // self._mini_sector_size if len(minifat) > used_entries: minifat = minifat[:used_entries] self._minifat = minifat mini_data = _read_chain( self._fp, self._fat, root.start, self._sector_size, self._sector_size, root.size, self._nb_sect, ) self._ministream = mini_data def _load_directory(self): dir_data = _read_chain( self._fp, self._fat, self._first_dir_sector, self._sector_size, self._sector_size, -1, self._nb_sect, ) max_entries = len(dir_data) // 128 entries: list[DirectoryEntry | None] = [None] * max_entries for sid in range(max_entries): offset = sid * 128 chunk = dir_data[offset:offset + 128] if len(chunk) < 128: break entry_type = chunk[66] if entry_type == STGTY.EMPTY: continue entry = DirectoryEntry(sid, chunk, self._sector_size, self._mini_stream_cutoff) entries[sid] = entry if entries[0] is None: raise OleFileError('Root directory entry not found.') self._root = entries[0] self._entries = entries self._root.build_storage_tree(entries) def _open_stream(self, entry: DirectoryEntry): if entry.is_minifat: self._load_ministream() if (ms := self._ministream) is None or (mf := self._minifat) is None: raise RuntimeError('Ministream was not read.') ms = MemoryFile(memoryview(ms)) data = _read_chain( ms, mf, entry.start, self._mini_sector_size, 0, entry.size, len(self._minifat) if self._minifat else 0, ) else: data = _read_chain( self._fp, self._fat, entry.start, self._sector_size, self._sector_size, entry.size, self._nb_sect, ) return MemoryFile(memoryview(data)) def _find(self, filename: str) -> DirectoryEntry | None: node = self._root for part in re.split(r'[\\/]+', filename): key = part.lower() child = node.kids_dict.get(key) if child is None: return None node = child return node def listdir(self, streams: bool = True, storages: bool = False) -> list[list[str]]: result: list[list[str]] = [] self._list_recursive(self._root, [], result, streams, storages) return result def _list_recursive( self, node: DirectoryEntry, path: list[str], result: list[list[str]], streams: bool, storages: bool, ): for kid in node.kids: current_path = path + [kid.name] if kid.entry_type == STGTY.STREAM and streams: result.append(current_path) elif kid.entry_type in (STGTY.STORAGE, STGTY.ROOT): if storages: result.append(current_path) self._list_recursive(kid, current_path, result, streams, storages) def openstream(self, filename: str) -> MemoryFile[memoryview]: entry = self._find(filename) if entry is None: raise OleFileError(F'Stream not found: {filename!r}') if entry.entry_type != STGTY.STREAM: raise OleFileError(F'Not a stream: {filename!r}') return self._open_stream(entry) def exists(self, filename: str) -> bool: return self._find(filename) is not None def get_type(self, path: str) -> int: entry = self._find(path) if entry is None: return STGTY.EMPTY return entry.entry_type def get_size(self, filename: str) -> int: entry = self._find(filename) if entry is None: raise OleFileError(F'Entry not found: {filename!r}') return entry.size def get_rootentry_name(self) -> str: return self._root.name def getclsid(self, filename: str) -> str: entry = self._find(filename) if entry is None: raise OleFileError(F'Entry not found: {filename!r}') return entry.clsid_str def getmtime(self, filename: str) -> datetime.datetime | None: entry = self._find(filename) if entry is None: return None return filetime_to_datetime(entry.modify_time) def getctime(self, filename: str) -> datetime.datetime | None: entry = self._find(filename) if entry is None: return None return filetime_to_datetime(entry.create_time) def getproperties( self, filename: str, convert_time: bool = False, no_conversion: list[int] | None = None, ) -> dict[int, Any]: if no_conversion is None: no_conversion = [] raw = self.openstream(filename).read() if len(raw) < 28: return {} try: return _parse_property_set(memoryview(raw), convert_time, no_conversion) except Exception: return {} def get_metadata(self) -> OleMetadata: if self._metadata is None: self._metadata = OleMetadata() self._metadata.parse(self) return self._metadata def write_stream(self, filename: str, data: bytes | bytearray | memoryview) -> None: """ Overwrite an existing stream's data in-place. The new data must be the same length as the existing stream. The underlying buffer must be mutable (i.e. the OleFile was constructed from a bytearray). """ entry = self._find(filename) if entry is None: raise OleFileError(F'Stream not found: {filename!r}') if entry.entry_type != STGTY.STREAM: raise OleFileError(F'Not a stream: {filename!r}') if len(data) != entry.size: raise OleFileError(F'Data length {len(data)} does not match stream size {entry.size}') if not data: return if entry.is_minifat: self._write_mini_stream(entry, data) else: self._write_regular_stream(entry, data) def _write_regular_stream(self, entry: DirectoryEntry, data: bytes | bytearray | memoryview): sect = entry.start offset = 0 remaining = len(data) visited: set[int] = set() while remaining > 0 and sect <= MAXREGSECT and sect not in visited: visited.add(sect) chunk_size = min(self._sector_size, remaining) file_offset = self._sector_size * (sect + 1) self._mv[file_offset:file_offset + chunk_size] = data[offset:offset + chunk_size] offset += chunk_size remaining -= chunk_size if sect < len(self._fat): sect = self._fat[sect] else: break def _write_mini_stream(self, entry: DirectoryEntry, data: bytes | bytearray | memoryview): self._load_ministream() if self._ministream is None or self._minifat is None: raise RuntimeError('Ministream was not loaded.') sect = entry.start offset = 0 remaining = len(data) visited: set[int] = set() while remaining > 0 and sect <= MAXREGSECT and sect not in visited: visited.add(sect) chunk_size = min(self._mini_sector_size, remaining) ms_offset = sect * self._mini_sector_size self._ministream[ms_offset:ms_offset + chunk_size] = data[offset:offset + chunk_size] offset += chunk_size remaining -= chunk_size if sect < len(self._minifat): sect = self._minifat[sect] else: break self._flush_ministream() def _flush_ministream(self): """ Write the in-memory ministream back to the underlying file buffer by following the root entry's FAT chain. """ if self._ministream is None: return root = self._root sect = root.start offset = 0 remaining = len(self._ministream) visited: set[int] = set() while remaining > 0 and sect <= MAXREGSECT and sect not in visited: visited.add(sect) chunk_size = min(self._sector_size, remaining) file_offset = self._sector_size * (sect + 1) self._mv[file_offset:file_offset + chunk_size] = \ self._ministream[offset:offset + chunk_size] offset += chunk_size remaining -= chunk_size if sect < len(self._fat): sect = self._fat[sect] else: breakMethods
def listdir(self, streams=True, storages=False)-
Expand source code Browse git
def listdir(self, streams: bool = True, storages: bool = False) -> list[list[str]]: result: list[list[str]] = [] self._list_recursive(self._root, [], result, streams, storages) return result def openstream(self, filename)-
Expand source code Browse git
def openstream(self, filename: str) -> MemoryFile[memoryview]: entry = self._find(filename) if entry is None: raise OleFileError(F'Stream not found: {filename!r}') if entry.entry_type != STGTY.STREAM: raise OleFileError(F'Not a stream: {filename!r}') return self._open_stream(entry) def exists(self, filename)-
Expand source code Browse git
def exists(self, filename: str) -> bool: return self._find(filename) is not None def get_type(self, path)-
Expand source code Browse git
def get_type(self, path: str) -> int: entry = self._find(path) if entry is None: return STGTY.EMPTY return entry.entry_type def get_size(self, filename)-
Expand source code Browse git
def get_size(self, filename: str) -> int: entry = self._find(filename) if entry is None: raise OleFileError(F'Entry not found: {filename!r}') return entry.size def get_rootentry_name(self)-
Expand source code Browse git
def get_rootentry_name(self) -> str: return self._root.name def getclsid(self, filename)-
Expand source code Browse git
def getclsid(self, filename: str) -> str: entry = self._find(filename) if entry is None: raise OleFileError(F'Entry not found: {filename!r}') return entry.clsid_str def getmtime(self, filename)-
Expand source code Browse git
def getmtime(self, filename: str) -> datetime.datetime | None: entry = self._find(filename) if entry is None: return None return filetime_to_datetime(entry.modify_time) def getctime(self, filename)-
Expand source code Browse git
def getctime(self, filename: str) -> datetime.datetime | None: entry = self._find(filename) if entry is None: return None return filetime_to_datetime(entry.create_time) def getproperties(self, filename, convert_time=False, no_conversion=None)-
Expand source code Browse git
def getproperties( self, filename: str, convert_time: bool = False, no_conversion: list[int] | None = None, ) -> dict[int, Any]: if no_conversion is None: no_conversion = [] raw = self.openstream(filename).read() if len(raw) < 28: return {} try: return _parse_property_set(memoryview(raw), convert_time, no_conversion) except Exception: return {} def get_metadata(self)-
Expand source code Browse git
def get_metadata(self) -> OleMetadata: if self._metadata is None: self._metadata = OleMetadata() self._metadata.parse(self) return self._metadata def write_stream(self, filename, data)-
Overwrite an existing stream's data in-place. The new data must be the same length as the existing stream. The underlying buffer must be mutable (i.e. the OleFile was constructed from a bytearray).
Expand source code Browse git
def write_stream(self, filename: str, data: bytes | bytearray | memoryview) -> None: """ Overwrite an existing stream's data in-place. The new data must be the same length as the existing stream. The underlying buffer must be mutable (i.e. the OleFile was constructed from a bytearray). """ entry = self._find(filename) if entry is None: raise OleFileError(F'Stream not found: {filename!r}') if entry.entry_type != STGTY.STREAM: raise OleFileError(F'Not a stream: {filename!r}') if len(data) != entry.size: raise OleFileError(F'Data length {len(data)} does not match stream size {entry.size}') if not data: return if entry.is_minifat: self._write_mini_stream(entry, data) else: self._write_regular_stream(entry, data)