Module refinery.lib.dmg.hfs
HFS+ filesystem parser for extracting files from Apple Disk Image partitions.
Expand source code Browse git
"""
HFS+ filesystem parser for extracting files from Apple Disk Image partitions.
"""
from __future__ import annotations
import codecs
import zlib
from datetime import datetime, timezone
from enum import IntEnum
from typing import Generator
from refinery.lib.structures import StructReader
from refinery.lib.types import buf
_HFS_SIGNATURE_PLUS = b'H+'
_HFS_SIGNATURE_X = b'HX'
_VOLUME_HEADER_OFFSET = 0x400
_MAC_EPOCH_OFFSET = 2082844800
_HFS_ROOT_FOLDER_ID = 2
_EXTENTS_FILE_ID = 3
_ATTRIBUTES_FILE_ID = 8
_DECMPFS_MAGIC = b'fpmc'
_DECMPFS_ATTR_NAME = 'com.apple.decmpfs'
_FORK_DATA = 0x00
_FORK_RSRC = 0xFF
class _RECORD(IntEnum):
FOLDER = 1
FILE = 2
FOLDER_THREAD = 3
FILE_THREAD = 4
class _NODE(IntEnum):
LEAF = 0xFF
INDEX = 0x00
HEADER = 0x01
def _mac_to_datetime(ts: int) -> datetime | None:
if ts == 0:
return None
try:
unix_ts = ts - _MAC_EPOCH_OFFSET
return datetime.fromtimestamp(unix_ts, timezone.utc).replace(tzinfo=None)
except (OSError, OverflowError, ValueError):
return None
class HFSFork:
__slots__ = 'size', 'clump_size', 'num_blocks', 'extents'
def __init__(self, reader: StructReader):
self.size = reader.u64()
self.clump_size = reader.u32()
self.num_blocks = reader.u32()
self.extents = []
for _ in range(8):
start_block = reader.u32()
block_count = reader.u32()
self.extents.append((start_block, block_count))
class HFSVolumeHeader:
__slots__ = (
'signature',
'version',
'block_size',
'total_blocks',
'num_files',
'num_folders',
'allocation_fork',
'extents_fork',
'catalog_fork',
'attributes_fork',
'startup_fork',
)
def __init__(self, data: bytes | bytearray | memoryview):
if len(data) < _VOLUME_HEADER_OFFSET + 0x200:
raise ValueError('partition data too small for HFS+ volume header')
reader = StructReader(memoryview(data)[_VOLUME_HEADER_OFFSET:], bigendian=True)
self.signature = reader.read_bytes(2)
if self.signature not in (_HFS_SIGNATURE_PLUS, _HFS_SIGNATURE_X):
raise ValueError(F'invalid HFS+ signature: {self.signature!r}')
self.version = reader.u16()
if self.version not in (4, 5):
raise ValueError(F'unsupported HFS+ version: {self.version}')
reader.seekset(0x20)
self.num_files = reader.u32()
self.num_folders = reader.u32()
self.block_size = reader.u32()
self.total_blocks = reader.u32()
reader.seekset(0x70)
self.allocation_fork = HFSFork(reader)
self.extents_fork = HFSFork(reader)
self.catalog_fork = HFSFork(reader)
self.attributes_fork = HFSFork(reader)
self.startup_fork = HFSFork(reader)
class HFSBTreeNode:
__slots__ = 'flink', 'blink', 'kind', 'height', 'num_records'
def __init__(self, reader: StructReader):
self.flink = reader.u32()
self.blink = reader.u32()
self.kind = reader.u8()
self.height = reader.u8()
self.num_records = reader.u16()
class HFSBTreeHeader:
__slots__ = (
'tree_depth',
'root_node',
'leaf_records',
'first_leaf_node',
'last_leaf_node',
'node_size',
'total_nodes',
)
def __init__(self, reader: StructReader):
self.tree_depth = reader.u16()
self.root_node = reader.u32()
self.leaf_records = reader.u32()
self.first_leaf_node = reader.u32()
self.last_leaf_node = reader.u32()
self.node_size = reader.u16()
reader.skip(2) # max_key_length
self.total_nodes = reader.u32()
class HFSCatalogItem:
__slots__ = (
'record_type',
'item_id',
'parent_id',
'name',
'create_time',
'modify_time',
'data_fork',
'resource_fork',
)
def __init__(self):
self.record_type: int = 0
self.item_id: int = 0
self.parent_id: int = 0
self.name: str = ''
self.create_time: int = 0
self.modify_time: int = 0
self.data_fork: HFSFork | None = None
self.resource_fork: HFSFork | None = None
def _iter_node_records(
node_data: memoryview, num_records: int, node_size: int,
) -> Generator[memoryview, None, None]:
"""
Yield memoryview slices of each record from a B-tree node, reading the
offset table at the end of the node.
"""
reader = StructReader(node_data, bigendian=True)
for i in range(num_records):
offset_pos = node_size - 2 * (i + 1)
if offset_pos < 14:
break
reader.seekset(offset_pos)
rec_offset = reader.u16()
if rec_offset < 14 or rec_offset >= node_size:
continue
next_offset_pos = node_size - 2 * (i + 2)
if next_offset_pos >= 14:
reader.seekset(next_offset_pos)
next_rec_offset = reader.u16()
else:
next_rec_offset = node_size - 2 * (num_records + 1)
yield node_data[rec_offset:next_rec_offset]
class HFSVolume:
def __init__(self, data: bytes | bytearray | memoryview):
self.raw = data
self.header = HFSVolumeHeader(data)
self.block_size = self.header.block_size
self._items: list[HFSCatalogItem] = []
self._id_map: dict[int, HFSCatalogItem] = {}
self._extent_overflow: dict[tuple[int, int, int], list[tuple[int, int]]] = {}
self._attributes: dict[tuple[int, str], memoryview] = {}
self._parse_extents_overflow()
self._parse_catalog()
self._parse_attributes()
def _read_fork_inline(self, fork: HFSFork) -> bytearray:
"""
Read fork data using only the 8 inline extents (no overflow lookup).
"""
out = bytearray()
remaining = fork.size
for pos, count in fork.extents:
if count == 0:
break
offset = pos * self.block_size
length = min(count * self.block_size, remaining)
out.extend(self.raw[offset:offset + length])
remaining -= length
if remaining <= 0:
break
return out
def _read_fork(self, fork: HFSFork, file_id: int = 0, fork_type: int = _FORK_DATA) -> bytearray:
out = bytearray()
remaining = fork.size
block_cursor = 0
for pos, count in fork.extents:
if count == 0:
break
offset = pos * self.block_size
length = min(count * self.block_size, remaining)
out.extend(self.raw[offset:offset + length])
remaining -= length
block_cursor += count
if remaining <= 0:
break
if remaining > 0 and file_id and self._extent_overflow:
while remaining > 0:
key = (file_id, fork_type, block_cursor)
extra = self._extent_overflow.get(key)
if not extra:
break
for pos, count in extra:
if count == 0:
break
offset = pos * self.block_size
length = min(count * self.block_size, remaining)
out.extend(self.raw[offset:offset + length])
remaining -= length
block_cursor += count
if remaining <= 0:
break
return out
def _read_btree_header(self, tree_data: bytearray):
"""
Parse the B-tree header node and return (HFSBTreeNode, HFSBTreeHeader).
"""
reader = StructReader(memoryview(tree_data), bigendian=True)
node = HFSBTreeNode(reader)
reader.skip(2) # 2 reserved bytes before header record
header = HFSBTreeHeader(reader)
return node, header
def _parse_extents_overflow(self):
if self.header.extents_fork.size == 0:
return
extents_data = self._read_fork_inline(self.header.extents_fork)
if not extents_data:
return
try:
node0, bth = self._read_btree_header(extents_data)
if node0.kind != _NODE.HEADER:
return
except Exception:
return
node_size = bth.node_size
if node_size == 0:
return
current = bth.first_leaf_node
while current != 0:
node_offset = current * node_size
if node_offset + node_size > len(extents_data):
break
node_data = memoryview(extents_data)[node_offset:node_offset + node_size]
reader = StructReader(node_data, bigendian=True)
node = HFSBTreeNode(reader)
if node.kind != _NODE.LEAF:
break
self._parse_extent_leaf_node(node_data, node.num_records, node_size)
current = node.flink
def _parse_extent_leaf_node(self, node_data: memoryview, num_records: int, node_size: int):
for rec in _iter_node_records(node_data, num_records, node_size):
if len(rec) < 12 + 64:
continue
reader = StructReader(rec, bigendian=True)
key_length = reader.u16()
if key_length < 10:
continue
fork_type = reader.u8()
reader.skip(1) # padding
file_id = reader.u32()
start_block = reader.u32()
reader.seekset(2 + key_length)
reader.byte_align(2)
extents: list[tuple[int, int]] = []
for _ in range(8):
if reader.tell() + 8 > len(rec):
break
sb = reader.u32()
bc = reader.u32()
extents.append((sb, bc))
self._extent_overflow[(file_id, fork_type, start_block)] = extents
def _parse_attributes(self):
if self.header.attributes_fork.size == 0:
return
attr_data = self._read_fork(self.header.attributes_fork, _ATTRIBUTES_FILE_ID, _FORK_DATA)
if not attr_data:
return
try:
node0, bth = self._read_btree_header(attr_data)
if node0.kind != _NODE.HEADER:
return
except Exception:
return
node_size = bth.node_size
if node_size == 0:
return
current = bth.first_leaf_node
while current != 0:
node_offset = current * node_size
if node_offset + node_size > len(attr_data):
break
node_data = memoryview(attr_data)[node_offset:node_offset + node_size]
reader = StructReader(node_data, bigendian=True)
node = HFSBTreeNode(reader)
if node.kind != _NODE.LEAF:
break
self._parse_attribute_leaf_node(node_data, node.num_records, node_size)
current = node.flink
def _parse_attribute_leaf_node(self, node_data: memoryview, num_records: int, node_size: int):
for rec in _iter_node_records(node_data, num_records, node_size):
try:
self._parse_attribute_record(rec)
except Exception:
continue
def _parse_attribute_record(self, rec: buf):
if len(rec) < 12:
return
reader = StructReader(memoryview(rec), bigendian=True)
key_length = reader.u16()
if key_length < 10 or 2 + key_length > len(rec):
return
reader.skip(4) # pad16 + record_type in key
file_id = reader.u32()
reader.skip(4) # start_block
name_length = reader.u16()
name_byte_len = name_length * 2
if 16 + name_byte_len > 2 + key_length:
return
try:
attr_name = codecs.decode(reader.read(name_byte_len), 'utf-16-be')
except (UnicodeDecodeError, ValueError):
return
reader.seekset(2 + key_length)
reader.byte_align(2)
if reader.tell() + 8 > len(rec):
return
record_type = reader.u32()
if record_type != 0x10:
return
reader.skip(4) # reserved
attr_size = reader.u32()
attr_data = reader.read(attr_size)
self._attributes[file_id, attr_name] = attr_data
def _decompress_decmpfs(self, item: HFSCatalogItem) -> bytes | None:
"""
Attempt to decompress a file using HFS+ transparent compression.
Returns decompressed bytes, or None if the file is not compressed.
"""
attr = self._attributes.get((item.item_id, _DECMPFS_ATTR_NAME))
if attr is None:
return None
if len(attr) < 16 or attr[:4] != _DECMPFS_MAGIC:
return None
reader = StructReader(attr, bigendian=False)
reader.skip(4) # magic already validated
method = reader.u32()
uncompressed_size = reader.u64()
if method in (3, 7, 9):
payload = attr[16:]
if not payload:
return b''
return self._decmpfs_decompress_inline(payload, method, uncompressed_size)
if method in (4, 8, 12):
if item.resource_fork is None or item.resource_fork.size == 0:
return None
rsrc_data = self._read_fork(item.resource_fork, item.item_id, _FORK_RSRC)
if not rsrc_data:
return None
return self._decmpfs_decompress_resource(rsrc_data, method, uncompressed_size)
return None
def _decmpfs_decompress_inline(self, payload: bytes, method: int, size: int) -> bytes:
if method == 3:
if payload[0] == 0x0F:
return payload[1:1 + size]
return zlib.decompress(payload, -15)[:size]
if method == 7:
return _lzvn_decompress_attr(payload, size)
if method == 9:
return payload[:size]
return payload[:size]
def _decmpfs_decompress_resource(self, rsrc_data: buf, method: int, size: int) -> bytearray:
out = bytearray()
mem = memoryview(rsrc_data)
if len(rsrc_data) < 260:
return out
be_reader = StructReader(mem, bigendian=True)
data_offset = be_reader.u32()
if data_offset + 4 > len(rsrc_data):
return out
block_data_start = data_offset + 4
block_table_offset = block_data_start
if block_table_offset + 8 > len(rsrc_data):
return out
le_reader = StructReader(mem[block_table_offset:], bigendian=False)
le_reader.skip(4) # skip first field
num_blocks = le_reader.u32()
for _ in range(num_blocks):
if le_reader.tell() + 8 > len(le_reader.getvalue()):
break
chunk_offset = le_reader.u32()
chunk_size = le_reader.u32()
abs_offset = block_data_start + chunk_offset
if abs_offset + chunk_size > len(rsrc_data):
break
chunk = rsrc_data[abs_offset:abs_offset + chunk_size]
if not chunk:
continue
if method == 4:
try:
out.extend(zlib.decompress(chunk, -15))
except zlib.error:
out.extend(chunk)
elif method == 8:
out.extend(_lzvn_decompress_attr(chunk, size - len(out)))
elif method == 12:
from refinery.lib.fast.lzfse import lzfse_decompress
out.extend(lzfse_decompress(chunk))
else:
out.extend(chunk)
del out[size:]
return out
def _parse_catalog(self):
catalog_data = self._read_fork(self.header.catalog_fork, 4, _FORK_DATA)
if not catalog_data:
return
node0, bth = self._read_btree_header(catalog_data)
if node0.kind != _NODE.HEADER:
raise ValueError('catalog B-tree node 0 is not a header node')
node_size = bth.node_size
if node_size == 0:
return
current = bth.first_leaf_node
while current != 0:
node_offset = current * node_size
if node_offset + node_size > len(catalog_data):
break
node_data = memoryview(catalog_data)[node_offset:node_offset + node_size]
reader = StructReader(node_data, bigendian=True)
node = HFSBTreeNode(reader)
if node.kind != _NODE.LEAF:
break
self._parse_leaf_node(node_data, node.num_records, node_size)
current = node.flink
for item in self._items:
if item.record_type in (_RECORD.FOLDER, _RECORD.FILE):
self._id_map[item.item_id] = item
def _parse_leaf_node(self, node_data: memoryview, num_records: int, node_size: int):
for rec in _iter_node_records(node_data, num_records, node_size):
if len(rec) < 6:
continue
try:
self._parse_catalog_record(rec)
except Exception:
continue
def _parse_catalog_record(self, rec: buf):
if len(rec) < 6:
return
view = memoryview(rec)
reader = StructReader(view, bigendian=True)
key_length = reader.u16()
if key_length < 6 or 2 + key_length > len(rec):
return
parent_id = reader.u32()
name_length = reader.u16()
name_byte_len = name_length * 2
if 8 + name_byte_len > 2 + key_length:
return
try:
name = codecs.decode(reader.read(name_byte_len), 'utf-16-be')
except (UnicodeDecodeError, ValueError):
name = ''
reader.seekset(2 + key_length)
reader.byte_align(2)
if reader.tell() + 2 > len(rec):
return
val_offset = reader.tell()
val_length = reader.remaining_bytes
record_type = reader.u16(peek=True)
item = HFSCatalogItem()
item.parent_id = parent_id
item.name = name
item.record_type = record_type
if record_type == _RECORD.FOLDER:
if val_length < 12:
return
reader.seekset(val_offset + 8)
item.item_id = reader.u32()
if val_length >= 18:
item.create_time = reader.u32()
item.modify_time = reader.u32()
self._items.append(item)
return
if record_type == _RECORD.FILE:
if val_length < 14:
return
reader.seekset(val_offset + 8)
item.item_id = reader.u32()
if val_length >= 18:
item.create_time = reader.u32()
item.modify_time = reader.u32()
if val_length >= 0xA8:
reader.seekset(val_offset + 0x58)
item.data_fork = HFSFork(reader)
if val_length >= 0xF8:
reader.seekset(val_offset + 0xA8)
item.resource_fork = HFSFork(reader)
self._items.append(item)
def _build_path(self, item: HFSCatalogItem) -> str | None:
parts = [item.name]
current_id = item.parent_id
seen = set()
while current_id not in (0, 1):
if current_id in seen:
return None
seen.add(current_id)
parent = self._id_map.get(current_id)
if parent is None:
return None
parts.append(parent.name)
current_id = parent.parent_id
parts.reverse()
return '/'.join(parts)
def files(self) -> Generator[tuple[str, bytes, datetime | None], None, None]:
for item in self._items:
if item.record_type != _RECORD.FILE:
continue
path = self._build_path(item)
if path is None:
continue
data = B''
if item.data_fork is not None and item.data_fork.size > 0:
data = self._read_fork(item.data_fork, item.item_id, _FORK_DATA)
if not data and self._attributes:
decompressed = self._decompress_decmpfs(item)
if decompressed is not None:
data = decompressed
mtime = _mac_to_datetime(item.modify_time)
yield path, data, mtime
def _lzvn_decompress_attr(payload: bytes, expected_size: int) -> bytearray:
"""
Decompress LZVN-compressed data from a decmpfs attribute or resource fork.
"""
from refinery.lib.fast.lzfse import _lzvn_decode
output = bytearray()
_lzvn_decode(payload, 0, len(payload), expected_size, output)
return output
Classes
class HFSFork (reader)-
Expand source code Browse git
class HFSFork: __slots__ = 'size', 'clump_size', 'num_blocks', 'extents' def __init__(self, reader: StructReader): self.size = reader.u64() self.clump_size = reader.u32() self.num_blocks = reader.u32() self.extents = [] for _ in range(8): start_block = reader.u32() block_count = reader.u32() self.extents.append((start_block, block_count))Instance variables
var clump_size-
Expand source code Browse git
class HFSFork: __slots__ = 'size', 'clump_size', 'num_blocks', 'extents' def __init__(self, reader: StructReader): self.size = reader.u64() self.clump_size = reader.u32() self.num_blocks = reader.u32() self.extents = [] for _ in range(8): start_block = reader.u32() block_count = reader.u32() self.extents.append((start_block, block_count)) var extents-
Expand source code Browse git
class HFSFork: __slots__ = 'size', 'clump_size', 'num_blocks', 'extents' def __init__(self, reader: StructReader): self.size = reader.u64() self.clump_size = reader.u32() self.num_blocks = reader.u32() self.extents = [] for _ in range(8): start_block = reader.u32() block_count = reader.u32() self.extents.append((start_block, block_count)) var num_blocks-
Expand source code Browse git
class HFSFork: __slots__ = 'size', 'clump_size', 'num_blocks', 'extents' def __init__(self, reader: StructReader): self.size = reader.u64() self.clump_size = reader.u32() self.num_blocks = reader.u32() self.extents = [] for _ in range(8): start_block = reader.u32() block_count = reader.u32() self.extents.append((start_block, block_count)) var size-
Expand source code Browse git
class HFSFork: __slots__ = 'size', 'clump_size', 'num_blocks', 'extents' def __init__(self, reader: StructReader): self.size = reader.u64() self.clump_size = reader.u32() self.num_blocks = reader.u32() self.extents = [] for _ in range(8): start_block = reader.u32() block_count = reader.u32() self.extents.append((start_block, block_count))
class HFSVolumeHeader (data)-
Expand source code Browse git
class HFSVolumeHeader: __slots__ = ( 'signature', 'version', 'block_size', 'total_blocks', 'num_files', 'num_folders', 'allocation_fork', 'extents_fork', 'catalog_fork', 'attributes_fork', 'startup_fork', ) def __init__(self, data: bytes | bytearray | memoryview): if len(data) < _VOLUME_HEADER_OFFSET + 0x200: raise ValueError('partition data too small for HFS+ volume header') reader = StructReader(memoryview(data)[_VOLUME_HEADER_OFFSET:], bigendian=True) self.signature = reader.read_bytes(2) if self.signature not in (_HFS_SIGNATURE_PLUS, _HFS_SIGNATURE_X): raise ValueError(F'invalid HFS+ signature: {self.signature!r}') self.version = reader.u16() if self.version not in (4, 5): raise ValueError(F'unsupported HFS+ version: {self.version}') reader.seekset(0x20) self.num_files = reader.u32() self.num_folders = reader.u32() self.block_size = reader.u32() self.total_blocks = reader.u32() reader.seekset(0x70) self.allocation_fork = HFSFork(reader) self.extents_fork = HFSFork(reader) self.catalog_fork = HFSFork(reader) self.attributes_fork = HFSFork(reader) self.startup_fork = HFSFork(reader)Instance variables
var allocation_fork-
Expand source code Browse git
class HFSVolumeHeader: __slots__ = ( 'signature', 'version', 'block_size', 'total_blocks', 'num_files', 'num_folders', 'allocation_fork', 'extents_fork', 'catalog_fork', 'attributes_fork', 'startup_fork', ) def __init__(self, data: bytes | bytearray | memoryview): if len(data) < _VOLUME_HEADER_OFFSET + 0x200: raise ValueError('partition data too small for HFS+ volume header') reader = StructReader(memoryview(data)[_VOLUME_HEADER_OFFSET:], bigendian=True) self.signature = reader.read_bytes(2) if self.signature not in (_HFS_SIGNATURE_PLUS, _HFS_SIGNATURE_X): raise ValueError(F'invalid HFS+ signature: {self.signature!r}') self.version = reader.u16() if self.version not in (4, 5): raise ValueError(F'unsupported HFS+ version: {self.version}') reader.seekset(0x20) self.num_files = reader.u32() self.num_folders = reader.u32() self.block_size = reader.u32() self.total_blocks = reader.u32() reader.seekset(0x70) self.allocation_fork = HFSFork(reader) self.extents_fork = HFSFork(reader) self.catalog_fork = HFSFork(reader) self.attributes_fork = HFSFork(reader) self.startup_fork = HFSFork(reader) var attributes_fork-
Expand source code Browse git
class HFSVolumeHeader: __slots__ = ( 'signature', 'version', 'block_size', 'total_blocks', 'num_files', 'num_folders', 'allocation_fork', 'extents_fork', 'catalog_fork', 'attributes_fork', 'startup_fork', ) def __init__(self, data: bytes | bytearray | memoryview): if len(data) < _VOLUME_HEADER_OFFSET + 0x200: raise ValueError('partition data too small for HFS+ volume header') reader = StructReader(memoryview(data)[_VOLUME_HEADER_OFFSET:], bigendian=True) self.signature = reader.read_bytes(2) if self.signature not in (_HFS_SIGNATURE_PLUS, _HFS_SIGNATURE_X): raise ValueError(F'invalid HFS+ signature: {self.signature!r}') self.version = reader.u16() if self.version not in (4, 5): raise ValueError(F'unsupported HFS+ version: {self.version}') reader.seekset(0x20) self.num_files = reader.u32() self.num_folders = reader.u32() self.block_size = reader.u32() self.total_blocks = reader.u32() reader.seekset(0x70) self.allocation_fork = HFSFork(reader) self.extents_fork = HFSFork(reader) self.catalog_fork = HFSFork(reader) self.attributes_fork = HFSFork(reader) self.startup_fork = HFSFork(reader) var block_size-
Expand source code Browse git
class HFSVolumeHeader: __slots__ = ( 'signature', 'version', 'block_size', 'total_blocks', 'num_files', 'num_folders', 'allocation_fork', 'extents_fork', 'catalog_fork', 'attributes_fork', 'startup_fork', ) def __init__(self, data: bytes | bytearray | memoryview): if len(data) < _VOLUME_HEADER_OFFSET + 0x200: raise ValueError('partition data too small for HFS+ volume header') reader = StructReader(memoryview(data)[_VOLUME_HEADER_OFFSET:], bigendian=True) self.signature = reader.read_bytes(2) if self.signature not in (_HFS_SIGNATURE_PLUS, _HFS_SIGNATURE_X): raise ValueError(F'invalid HFS+ signature: {self.signature!r}') self.version = reader.u16() if self.version not in (4, 5): raise ValueError(F'unsupported HFS+ version: {self.version}') reader.seekset(0x20) self.num_files = reader.u32() self.num_folders = reader.u32() self.block_size = reader.u32() self.total_blocks = reader.u32() reader.seekset(0x70) self.allocation_fork = HFSFork(reader) self.extents_fork = HFSFork(reader) self.catalog_fork = HFSFork(reader) self.attributes_fork = HFSFork(reader) self.startup_fork = HFSFork(reader) var catalog_fork-
Expand source code Browse git
class HFSVolumeHeader: __slots__ = ( 'signature', 'version', 'block_size', 'total_blocks', 'num_files', 'num_folders', 'allocation_fork', 'extents_fork', 'catalog_fork', 'attributes_fork', 'startup_fork', ) def __init__(self, data: bytes | bytearray | memoryview): if len(data) < _VOLUME_HEADER_OFFSET + 0x200: raise ValueError('partition data too small for HFS+ volume header') reader = StructReader(memoryview(data)[_VOLUME_HEADER_OFFSET:], bigendian=True) self.signature = reader.read_bytes(2) if self.signature not in (_HFS_SIGNATURE_PLUS, _HFS_SIGNATURE_X): raise ValueError(F'invalid HFS+ signature: {self.signature!r}') self.version = reader.u16() if self.version not in (4, 5): raise ValueError(F'unsupported HFS+ version: {self.version}') reader.seekset(0x20) self.num_files = reader.u32() self.num_folders = reader.u32() self.block_size = reader.u32() self.total_blocks = reader.u32() reader.seekset(0x70) self.allocation_fork = HFSFork(reader) self.extents_fork = HFSFork(reader) self.catalog_fork = HFSFork(reader) self.attributes_fork = HFSFork(reader) self.startup_fork = HFSFork(reader) var extents_fork-
Expand source code Browse git
class HFSVolumeHeader: __slots__ = ( 'signature', 'version', 'block_size', 'total_blocks', 'num_files', 'num_folders', 'allocation_fork', 'extents_fork', 'catalog_fork', 'attributes_fork', 'startup_fork', ) def __init__(self, data: bytes | bytearray | memoryview): if len(data) < _VOLUME_HEADER_OFFSET + 0x200: raise ValueError('partition data too small for HFS+ volume header') reader = StructReader(memoryview(data)[_VOLUME_HEADER_OFFSET:], bigendian=True) self.signature = reader.read_bytes(2) if self.signature not in (_HFS_SIGNATURE_PLUS, _HFS_SIGNATURE_X): raise ValueError(F'invalid HFS+ signature: {self.signature!r}') self.version = reader.u16() if self.version not in (4, 5): raise ValueError(F'unsupported HFS+ version: {self.version}') reader.seekset(0x20) self.num_files = reader.u32() self.num_folders = reader.u32() self.block_size = reader.u32() self.total_blocks = reader.u32() reader.seekset(0x70) self.allocation_fork = HFSFork(reader) self.extents_fork = HFSFork(reader) self.catalog_fork = HFSFork(reader) self.attributes_fork = HFSFork(reader) self.startup_fork = HFSFork(reader) var num_files-
Expand source code Browse git
class HFSVolumeHeader: __slots__ = ( 'signature', 'version', 'block_size', 'total_blocks', 'num_files', 'num_folders', 'allocation_fork', 'extents_fork', 'catalog_fork', 'attributes_fork', 'startup_fork', ) def __init__(self, data: bytes | bytearray | memoryview): if len(data) < _VOLUME_HEADER_OFFSET + 0x200: raise ValueError('partition data too small for HFS+ volume header') reader = StructReader(memoryview(data)[_VOLUME_HEADER_OFFSET:], bigendian=True) self.signature = reader.read_bytes(2) if self.signature not in (_HFS_SIGNATURE_PLUS, _HFS_SIGNATURE_X): raise ValueError(F'invalid HFS+ signature: {self.signature!r}') self.version = reader.u16() if self.version not in (4, 5): raise ValueError(F'unsupported HFS+ version: {self.version}') reader.seekset(0x20) self.num_files = reader.u32() self.num_folders = reader.u32() self.block_size = reader.u32() self.total_blocks = reader.u32() reader.seekset(0x70) self.allocation_fork = HFSFork(reader) self.extents_fork = HFSFork(reader) self.catalog_fork = HFSFork(reader) self.attributes_fork = HFSFork(reader) self.startup_fork = HFSFork(reader) var num_folders-
Expand source code Browse git
class HFSVolumeHeader: __slots__ = ( 'signature', 'version', 'block_size', 'total_blocks', 'num_files', 'num_folders', 'allocation_fork', 'extents_fork', 'catalog_fork', 'attributes_fork', 'startup_fork', ) def __init__(self, data: bytes | bytearray | memoryview): if len(data) < _VOLUME_HEADER_OFFSET + 0x200: raise ValueError('partition data too small for HFS+ volume header') reader = StructReader(memoryview(data)[_VOLUME_HEADER_OFFSET:], bigendian=True) self.signature = reader.read_bytes(2) if self.signature not in (_HFS_SIGNATURE_PLUS, _HFS_SIGNATURE_X): raise ValueError(F'invalid HFS+ signature: {self.signature!r}') self.version = reader.u16() if self.version not in (4, 5): raise ValueError(F'unsupported HFS+ version: {self.version}') reader.seekset(0x20) self.num_files = reader.u32() self.num_folders = reader.u32() self.block_size = reader.u32() self.total_blocks = reader.u32() reader.seekset(0x70) self.allocation_fork = HFSFork(reader) self.extents_fork = HFSFork(reader) self.catalog_fork = HFSFork(reader) self.attributes_fork = HFSFork(reader) self.startup_fork = HFSFork(reader) var signature-
Expand source code Browse git
class HFSVolumeHeader: __slots__ = ( 'signature', 'version', 'block_size', 'total_blocks', 'num_files', 'num_folders', 'allocation_fork', 'extents_fork', 'catalog_fork', 'attributes_fork', 'startup_fork', ) def __init__(self, data: bytes | bytearray | memoryview): if len(data) < _VOLUME_HEADER_OFFSET + 0x200: raise ValueError('partition data too small for HFS+ volume header') reader = StructReader(memoryview(data)[_VOLUME_HEADER_OFFSET:], bigendian=True) self.signature = reader.read_bytes(2) if self.signature not in (_HFS_SIGNATURE_PLUS, _HFS_SIGNATURE_X): raise ValueError(F'invalid HFS+ signature: {self.signature!r}') self.version = reader.u16() if self.version not in (4, 5): raise ValueError(F'unsupported HFS+ version: {self.version}') reader.seekset(0x20) self.num_files = reader.u32() self.num_folders = reader.u32() self.block_size = reader.u32() self.total_blocks = reader.u32() reader.seekset(0x70) self.allocation_fork = HFSFork(reader) self.extents_fork = HFSFork(reader) self.catalog_fork = HFSFork(reader) self.attributes_fork = HFSFork(reader) self.startup_fork = HFSFork(reader) var startup_fork-
Expand source code Browse git
class HFSVolumeHeader: __slots__ = ( 'signature', 'version', 'block_size', 'total_blocks', 'num_files', 'num_folders', 'allocation_fork', 'extents_fork', 'catalog_fork', 'attributes_fork', 'startup_fork', ) def __init__(self, data: bytes | bytearray | memoryview): if len(data) < _VOLUME_HEADER_OFFSET + 0x200: raise ValueError('partition data too small for HFS+ volume header') reader = StructReader(memoryview(data)[_VOLUME_HEADER_OFFSET:], bigendian=True) self.signature = reader.read_bytes(2) if self.signature not in (_HFS_SIGNATURE_PLUS, _HFS_SIGNATURE_X): raise ValueError(F'invalid HFS+ signature: {self.signature!r}') self.version = reader.u16() if self.version not in (4, 5): raise ValueError(F'unsupported HFS+ version: {self.version}') reader.seekset(0x20) self.num_files = reader.u32() self.num_folders = reader.u32() self.block_size = reader.u32() self.total_blocks = reader.u32() reader.seekset(0x70) self.allocation_fork = HFSFork(reader) self.extents_fork = HFSFork(reader) self.catalog_fork = HFSFork(reader) self.attributes_fork = HFSFork(reader) self.startup_fork = HFSFork(reader) var total_blocks-
Expand source code Browse git
class HFSVolumeHeader: __slots__ = ( 'signature', 'version', 'block_size', 'total_blocks', 'num_files', 'num_folders', 'allocation_fork', 'extents_fork', 'catalog_fork', 'attributes_fork', 'startup_fork', ) def __init__(self, data: bytes | bytearray | memoryview): if len(data) < _VOLUME_HEADER_OFFSET + 0x200: raise ValueError('partition data too small for HFS+ volume header') reader = StructReader(memoryview(data)[_VOLUME_HEADER_OFFSET:], bigendian=True) self.signature = reader.read_bytes(2) if self.signature not in (_HFS_SIGNATURE_PLUS, _HFS_SIGNATURE_X): raise ValueError(F'invalid HFS+ signature: {self.signature!r}') self.version = reader.u16() if self.version not in (4, 5): raise ValueError(F'unsupported HFS+ version: {self.version}') reader.seekset(0x20) self.num_files = reader.u32() self.num_folders = reader.u32() self.block_size = reader.u32() self.total_blocks = reader.u32() reader.seekset(0x70) self.allocation_fork = HFSFork(reader) self.extents_fork = HFSFork(reader) self.catalog_fork = HFSFork(reader) self.attributes_fork = HFSFork(reader) self.startup_fork = HFSFork(reader) var version-
Expand source code Browse git
class HFSVolumeHeader: __slots__ = ( 'signature', 'version', 'block_size', 'total_blocks', 'num_files', 'num_folders', 'allocation_fork', 'extents_fork', 'catalog_fork', 'attributes_fork', 'startup_fork', ) def __init__(self, data: bytes | bytearray | memoryview): if len(data) < _VOLUME_HEADER_OFFSET + 0x200: raise ValueError('partition data too small for HFS+ volume header') reader = StructReader(memoryview(data)[_VOLUME_HEADER_OFFSET:], bigendian=True) self.signature = reader.read_bytes(2) if self.signature not in (_HFS_SIGNATURE_PLUS, _HFS_SIGNATURE_X): raise ValueError(F'invalid HFS+ signature: {self.signature!r}') self.version = reader.u16() if self.version not in (4, 5): raise ValueError(F'unsupported HFS+ version: {self.version}') reader.seekset(0x20) self.num_files = reader.u32() self.num_folders = reader.u32() self.block_size = reader.u32() self.total_blocks = reader.u32() reader.seekset(0x70) self.allocation_fork = HFSFork(reader) self.extents_fork = HFSFork(reader) self.catalog_fork = HFSFork(reader) self.attributes_fork = HFSFork(reader) self.startup_fork = HFSFork(reader)
class HFSBTreeNode (reader)-
Expand source code Browse git
class HFSBTreeNode: __slots__ = 'flink', 'blink', 'kind', 'height', 'num_records' def __init__(self, reader: StructReader): self.flink = reader.u32() self.blink = reader.u32() self.kind = reader.u8() self.height = reader.u8() self.num_records = reader.u16()Instance variables
var blink-
Expand source code Browse git
class HFSBTreeNode: __slots__ = 'flink', 'blink', 'kind', 'height', 'num_records' def __init__(self, reader: StructReader): self.flink = reader.u32() self.blink = reader.u32() self.kind = reader.u8() self.height = reader.u8() self.num_records = reader.u16() var flink-
Expand source code Browse git
class HFSBTreeNode: __slots__ = 'flink', 'blink', 'kind', 'height', 'num_records' def __init__(self, reader: StructReader): self.flink = reader.u32() self.blink = reader.u32() self.kind = reader.u8() self.height = reader.u8() self.num_records = reader.u16() var height-
Expand source code Browse git
class HFSBTreeNode: __slots__ = 'flink', 'blink', 'kind', 'height', 'num_records' def __init__(self, reader: StructReader): self.flink = reader.u32() self.blink = reader.u32() self.kind = reader.u8() self.height = reader.u8() self.num_records = reader.u16() var kind-
Expand source code Browse git
class HFSBTreeNode: __slots__ = 'flink', 'blink', 'kind', 'height', 'num_records' def __init__(self, reader: StructReader): self.flink = reader.u32() self.blink = reader.u32() self.kind = reader.u8() self.height = reader.u8() self.num_records = reader.u16() var num_records-
Expand source code Browse git
class HFSBTreeNode: __slots__ = 'flink', 'blink', 'kind', 'height', 'num_records' def __init__(self, reader: StructReader): self.flink = reader.u32() self.blink = reader.u32() self.kind = reader.u8() self.height = reader.u8() self.num_records = reader.u16()
class HFSBTreeHeader (reader)-
Expand source code Browse git
class HFSBTreeHeader: __slots__ = ( 'tree_depth', 'root_node', 'leaf_records', 'first_leaf_node', 'last_leaf_node', 'node_size', 'total_nodes', ) def __init__(self, reader: StructReader): self.tree_depth = reader.u16() self.root_node = reader.u32() self.leaf_records = reader.u32() self.first_leaf_node = reader.u32() self.last_leaf_node = reader.u32() self.node_size = reader.u16() reader.skip(2) # max_key_length self.total_nodes = reader.u32()Instance variables
var first_leaf_node-
Expand source code Browse git
class HFSBTreeHeader: __slots__ = ( 'tree_depth', 'root_node', 'leaf_records', 'first_leaf_node', 'last_leaf_node', 'node_size', 'total_nodes', ) def __init__(self, reader: StructReader): self.tree_depth = reader.u16() self.root_node = reader.u32() self.leaf_records = reader.u32() self.first_leaf_node = reader.u32() self.last_leaf_node = reader.u32() self.node_size = reader.u16() reader.skip(2) # max_key_length self.total_nodes = reader.u32() var last_leaf_node-
Expand source code Browse git
class HFSBTreeHeader: __slots__ = ( 'tree_depth', 'root_node', 'leaf_records', 'first_leaf_node', 'last_leaf_node', 'node_size', 'total_nodes', ) def __init__(self, reader: StructReader): self.tree_depth = reader.u16() self.root_node = reader.u32() self.leaf_records = reader.u32() self.first_leaf_node = reader.u32() self.last_leaf_node = reader.u32() self.node_size = reader.u16() reader.skip(2) # max_key_length self.total_nodes = reader.u32() var leaf_records-
Expand source code Browse git
class HFSBTreeHeader: __slots__ = ( 'tree_depth', 'root_node', 'leaf_records', 'first_leaf_node', 'last_leaf_node', 'node_size', 'total_nodes', ) def __init__(self, reader: StructReader): self.tree_depth = reader.u16() self.root_node = reader.u32() self.leaf_records = reader.u32() self.first_leaf_node = reader.u32() self.last_leaf_node = reader.u32() self.node_size = reader.u16() reader.skip(2) # max_key_length self.total_nodes = reader.u32() var node_size-
Expand source code Browse git
class HFSBTreeHeader: __slots__ = ( 'tree_depth', 'root_node', 'leaf_records', 'first_leaf_node', 'last_leaf_node', 'node_size', 'total_nodes', ) def __init__(self, reader: StructReader): self.tree_depth = reader.u16() self.root_node = reader.u32() self.leaf_records = reader.u32() self.first_leaf_node = reader.u32() self.last_leaf_node = reader.u32() self.node_size = reader.u16() reader.skip(2) # max_key_length self.total_nodes = reader.u32() var root_node-
Expand source code Browse git
class HFSBTreeHeader: __slots__ = ( 'tree_depth', 'root_node', 'leaf_records', 'first_leaf_node', 'last_leaf_node', 'node_size', 'total_nodes', ) def __init__(self, reader: StructReader): self.tree_depth = reader.u16() self.root_node = reader.u32() self.leaf_records = reader.u32() self.first_leaf_node = reader.u32() self.last_leaf_node = reader.u32() self.node_size = reader.u16() reader.skip(2) # max_key_length self.total_nodes = reader.u32() var total_nodes-
Expand source code Browse git
class HFSBTreeHeader: __slots__ = ( 'tree_depth', 'root_node', 'leaf_records', 'first_leaf_node', 'last_leaf_node', 'node_size', 'total_nodes', ) def __init__(self, reader: StructReader): self.tree_depth = reader.u16() self.root_node = reader.u32() self.leaf_records = reader.u32() self.first_leaf_node = reader.u32() self.last_leaf_node = reader.u32() self.node_size = reader.u16() reader.skip(2) # max_key_length self.total_nodes = reader.u32() var tree_depth-
Expand source code Browse git
class HFSBTreeHeader: __slots__ = ( 'tree_depth', 'root_node', 'leaf_records', 'first_leaf_node', 'last_leaf_node', 'node_size', 'total_nodes', ) def __init__(self, reader: StructReader): self.tree_depth = reader.u16() self.root_node = reader.u32() self.leaf_records = reader.u32() self.first_leaf_node = reader.u32() self.last_leaf_node = reader.u32() self.node_size = reader.u16() reader.skip(2) # max_key_length self.total_nodes = reader.u32()
class HFSCatalogItem-
Expand source code Browse git
class HFSCatalogItem: __slots__ = ( 'record_type', 'item_id', 'parent_id', 'name', 'create_time', 'modify_time', 'data_fork', 'resource_fork', ) def __init__(self): self.record_type: int = 0 self.item_id: int = 0 self.parent_id: int = 0 self.name: str = '' self.create_time: int = 0 self.modify_time: int = 0 self.data_fork: HFSFork | None = None self.resource_fork: HFSFork | None = NoneInstance variables
var create_time-
Expand source code Browse git
class HFSCatalogItem: __slots__ = ( 'record_type', 'item_id', 'parent_id', 'name', 'create_time', 'modify_time', 'data_fork', 'resource_fork', ) def __init__(self): self.record_type: int = 0 self.item_id: int = 0 self.parent_id: int = 0 self.name: str = '' self.create_time: int = 0 self.modify_time: int = 0 self.data_fork: HFSFork | None = None self.resource_fork: HFSFork | None = None var data_fork-
Expand source code Browse git
class HFSCatalogItem: __slots__ = ( 'record_type', 'item_id', 'parent_id', 'name', 'create_time', 'modify_time', 'data_fork', 'resource_fork', ) def __init__(self): self.record_type: int = 0 self.item_id: int = 0 self.parent_id: int = 0 self.name: str = '' self.create_time: int = 0 self.modify_time: int = 0 self.data_fork: HFSFork | None = None self.resource_fork: HFSFork | None = None var item_id-
Expand source code Browse git
class HFSCatalogItem: __slots__ = ( 'record_type', 'item_id', 'parent_id', 'name', 'create_time', 'modify_time', 'data_fork', 'resource_fork', ) def __init__(self): self.record_type: int = 0 self.item_id: int = 0 self.parent_id: int = 0 self.name: str = '' self.create_time: int = 0 self.modify_time: int = 0 self.data_fork: HFSFork | None = None self.resource_fork: HFSFork | None = None var modify_time-
Expand source code Browse git
class HFSCatalogItem: __slots__ = ( 'record_type', 'item_id', 'parent_id', 'name', 'create_time', 'modify_time', 'data_fork', 'resource_fork', ) def __init__(self): self.record_type: int = 0 self.item_id: int = 0 self.parent_id: int = 0 self.name: str = '' self.create_time: int = 0 self.modify_time: int = 0 self.data_fork: HFSFork | None = None self.resource_fork: HFSFork | None = None var name-
Expand source code Browse git
class HFSCatalogItem: __slots__ = ( 'record_type', 'item_id', 'parent_id', 'name', 'create_time', 'modify_time', 'data_fork', 'resource_fork', ) def __init__(self): self.record_type: int = 0 self.item_id: int = 0 self.parent_id: int = 0 self.name: str = '' self.create_time: int = 0 self.modify_time: int = 0 self.data_fork: HFSFork | None = None self.resource_fork: HFSFork | None = None var parent_id-
Expand source code Browse git
class HFSCatalogItem: __slots__ = ( 'record_type', 'item_id', 'parent_id', 'name', 'create_time', 'modify_time', 'data_fork', 'resource_fork', ) def __init__(self): self.record_type: int = 0 self.item_id: int = 0 self.parent_id: int = 0 self.name: str = '' self.create_time: int = 0 self.modify_time: int = 0 self.data_fork: HFSFork | None = None self.resource_fork: HFSFork | None = None var record_type-
Expand source code Browse git
class HFSCatalogItem: __slots__ = ( 'record_type', 'item_id', 'parent_id', 'name', 'create_time', 'modify_time', 'data_fork', 'resource_fork', ) def __init__(self): self.record_type: int = 0 self.item_id: int = 0 self.parent_id: int = 0 self.name: str = '' self.create_time: int = 0 self.modify_time: int = 0 self.data_fork: HFSFork | None = None self.resource_fork: HFSFork | None = None var resource_fork-
Expand source code Browse git
class HFSCatalogItem: __slots__ = ( 'record_type', 'item_id', 'parent_id', 'name', 'create_time', 'modify_time', 'data_fork', 'resource_fork', ) def __init__(self): self.record_type: int = 0 self.item_id: int = 0 self.parent_id: int = 0 self.name: str = '' self.create_time: int = 0 self.modify_time: int = 0 self.data_fork: HFSFork | None = None self.resource_fork: HFSFork | None = None
class HFSVolume (data)-
Expand source code Browse git
class HFSVolume: def __init__(self, data: bytes | bytearray | memoryview): self.raw = data self.header = HFSVolumeHeader(data) self.block_size = self.header.block_size self._items: list[HFSCatalogItem] = [] self._id_map: dict[int, HFSCatalogItem] = {} self._extent_overflow: dict[tuple[int, int, int], list[tuple[int, int]]] = {} self._attributes: dict[tuple[int, str], memoryview] = {} self._parse_extents_overflow() self._parse_catalog() self._parse_attributes() def _read_fork_inline(self, fork: HFSFork) -> bytearray: """ Read fork data using only the 8 inline extents (no overflow lookup). """ out = bytearray() remaining = fork.size for pos, count in fork.extents: if count == 0: break offset = pos * self.block_size length = min(count * self.block_size, remaining) out.extend(self.raw[offset:offset + length]) remaining -= length if remaining <= 0: break return out def _read_fork(self, fork: HFSFork, file_id: int = 0, fork_type: int = _FORK_DATA) -> bytearray: out = bytearray() remaining = fork.size block_cursor = 0 for pos, count in fork.extents: if count == 0: break offset = pos * self.block_size length = min(count * self.block_size, remaining) out.extend(self.raw[offset:offset + length]) remaining -= length block_cursor += count if remaining <= 0: break if remaining > 0 and file_id and self._extent_overflow: while remaining > 0: key = (file_id, fork_type, block_cursor) extra = self._extent_overflow.get(key) if not extra: break for pos, count in extra: if count == 0: break offset = pos * self.block_size length = min(count * self.block_size, remaining) out.extend(self.raw[offset:offset + length]) remaining -= length block_cursor += count if remaining <= 0: break return out def _read_btree_header(self, tree_data: bytearray): """ Parse the B-tree header node and return (HFSBTreeNode, HFSBTreeHeader). """ reader = StructReader(memoryview(tree_data), bigendian=True) node = HFSBTreeNode(reader) reader.skip(2) # 2 reserved bytes before header record header = HFSBTreeHeader(reader) return node, header def _parse_extents_overflow(self): if self.header.extents_fork.size == 0: return extents_data = self._read_fork_inline(self.header.extents_fork) if not extents_data: return try: node0, bth = self._read_btree_header(extents_data) if node0.kind != _NODE.HEADER: return except Exception: return node_size = bth.node_size if node_size == 0: return current = bth.first_leaf_node while current != 0: node_offset = current * node_size if node_offset + node_size > len(extents_data): break node_data = memoryview(extents_data)[node_offset:node_offset + node_size] reader = StructReader(node_data, bigendian=True) node = HFSBTreeNode(reader) if node.kind != _NODE.LEAF: break self._parse_extent_leaf_node(node_data, node.num_records, node_size) current = node.flink def _parse_extent_leaf_node(self, node_data: memoryview, num_records: int, node_size: int): for rec in _iter_node_records(node_data, num_records, node_size): if len(rec) < 12 + 64: continue reader = StructReader(rec, bigendian=True) key_length = reader.u16() if key_length < 10: continue fork_type = reader.u8() reader.skip(1) # padding file_id = reader.u32() start_block = reader.u32() reader.seekset(2 + key_length) reader.byte_align(2) extents: list[tuple[int, int]] = [] for _ in range(8): if reader.tell() + 8 > len(rec): break sb = reader.u32() bc = reader.u32() extents.append((sb, bc)) self._extent_overflow[(file_id, fork_type, start_block)] = extents def _parse_attributes(self): if self.header.attributes_fork.size == 0: return attr_data = self._read_fork(self.header.attributes_fork, _ATTRIBUTES_FILE_ID, _FORK_DATA) if not attr_data: return try: node0, bth = self._read_btree_header(attr_data) if node0.kind != _NODE.HEADER: return except Exception: return node_size = bth.node_size if node_size == 0: return current = bth.first_leaf_node while current != 0: node_offset = current * node_size if node_offset + node_size > len(attr_data): break node_data = memoryview(attr_data)[node_offset:node_offset + node_size] reader = StructReader(node_data, bigendian=True) node = HFSBTreeNode(reader) if node.kind != _NODE.LEAF: break self._parse_attribute_leaf_node(node_data, node.num_records, node_size) current = node.flink def _parse_attribute_leaf_node(self, node_data: memoryview, num_records: int, node_size: int): for rec in _iter_node_records(node_data, num_records, node_size): try: self._parse_attribute_record(rec) except Exception: continue def _parse_attribute_record(self, rec: buf): if len(rec) < 12: return reader = StructReader(memoryview(rec), bigendian=True) key_length = reader.u16() if key_length < 10 or 2 + key_length > len(rec): return reader.skip(4) # pad16 + record_type in key file_id = reader.u32() reader.skip(4) # start_block name_length = reader.u16() name_byte_len = name_length * 2 if 16 + name_byte_len > 2 + key_length: return try: attr_name = codecs.decode(reader.read(name_byte_len), 'utf-16-be') except (UnicodeDecodeError, ValueError): return reader.seekset(2 + key_length) reader.byte_align(2) if reader.tell() + 8 > len(rec): return record_type = reader.u32() if record_type != 0x10: return reader.skip(4) # reserved attr_size = reader.u32() attr_data = reader.read(attr_size) self._attributes[file_id, attr_name] = attr_data def _decompress_decmpfs(self, item: HFSCatalogItem) -> bytes | None: """ Attempt to decompress a file using HFS+ transparent compression. Returns decompressed bytes, or None if the file is not compressed. """ attr = self._attributes.get((item.item_id, _DECMPFS_ATTR_NAME)) if attr is None: return None if len(attr) < 16 or attr[:4] != _DECMPFS_MAGIC: return None reader = StructReader(attr, bigendian=False) reader.skip(4) # magic already validated method = reader.u32() uncompressed_size = reader.u64() if method in (3, 7, 9): payload = attr[16:] if not payload: return b'' return self._decmpfs_decompress_inline(payload, method, uncompressed_size) if method in (4, 8, 12): if item.resource_fork is None or item.resource_fork.size == 0: return None rsrc_data = self._read_fork(item.resource_fork, item.item_id, _FORK_RSRC) if not rsrc_data: return None return self._decmpfs_decompress_resource(rsrc_data, method, uncompressed_size) return None def _decmpfs_decompress_inline(self, payload: bytes, method: int, size: int) -> bytes: if method == 3: if payload[0] == 0x0F: return payload[1:1 + size] return zlib.decompress(payload, -15)[:size] if method == 7: return _lzvn_decompress_attr(payload, size) if method == 9: return payload[:size] return payload[:size] def _decmpfs_decompress_resource(self, rsrc_data: buf, method: int, size: int) -> bytearray: out = bytearray() mem = memoryview(rsrc_data) if len(rsrc_data) < 260: return out be_reader = StructReader(mem, bigendian=True) data_offset = be_reader.u32() if data_offset + 4 > len(rsrc_data): return out block_data_start = data_offset + 4 block_table_offset = block_data_start if block_table_offset + 8 > len(rsrc_data): return out le_reader = StructReader(mem[block_table_offset:], bigendian=False) le_reader.skip(4) # skip first field num_blocks = le_reader.u32() for _ in range(num_blocks): if le_reader.tell() + 8 > len(le_reader.getvalue()): break chunk_offset = le_reader.u32() chunk_size = le_reader.u32() abs_offset = block_data_start + chunk_offset if abs_offset + chunk_size > len(rsrc_data): break chunk = rsrc_data[abs_offset:abs_offset + chunk_size] if not chunk: continue if method == 4: try: out.extend(zlib.decompress(chunk, -15)) except zlib.error: out.extend(chunk) elif method == 8: out.extend(_lzvn_decompress_attr(chunk, size - len(out))) elif method == 12: from refinery.lib.fast.lzfse import lzfse_decompress out.extend(lzfse_decompress(chunk)) else: out.extend(chunk) del out[size:] return out def _parse_catalog(self): catalog_data = self._read_fork(self.header.catalog_fork, 4, _FORK_DATA) if not catalog_data: return node0, bth = self._read_btree_header(catalog_data) if node0.kind != _NODE.HEADER: raise ValueError('catalog B-tree node 0 is not a header node') node_size = bth.node_size if node_size == 0: return current = bth.first_leaf_node while current != 0: node_offset = current * node_size if node_offset + node_size > len(catalog_data): break node_data = memoryview(catalog_data)[node_offset:node_offset + node_size] reader = StructReader(node_data, bigendian=True) node = HFSBTreeNode(reader) if node.kind != _NODE.LEAF: break self._parse_leaf_node(node_data, node.num_records, node_size) current = node.flink for item in self._items: if item.record_type in (_RECORD.FOLDER, _RECORD.FILE): self._id_map[item.item_id] = item def _parse_leaf_node(self, node_data: memoryview, num_records: int, node_size: int): for rec in _iter_node_records(node_data, num_records, node_size): if len(rec) < 6: continue try: self._parse_catalog_record(rec) except Exception: continue def _parse_catalog_record(self, rec: buf): if len(rec) < 6: return view = memoryview(rec) reader = StructReader(view, bigendian=True) key_length = reader.u16() if key_length < 6 or 2 + key_length > len(rec): return parent_id = reader.u32() name_length = reader.u16() name_byte_len = name_length * 2 if 8 + name_byte_len > 2 + key_length: return try: name = codecs.decode(reader.read(name_byte_len), 'utf-16-be') except (UnicodeDecodeError, ValueError): name = '' reader.seekset(2 + key_length) reader.byte_align(2) if reader.tell() + 2 > len(rec): return val_offset = reader.tell() val_length = reader.remaining_bytes record_type = reader.u16(peek=True) item = HFSCatalogItem() item.parent_id = parent_id item.name = name item.record_type = record_type if record_type == _RECORD.FOLDER: if val_length < 12: return reader.seekset(val_offset + 8) item.item_id = reader.u32() if val_length >= 18: item.create_time = reader.u32() item.modify_time = reader.u32() self._items.append(item) return if record_type == _RECORD.FILE: if val_length < 14: return reader.seekset(val_offset + 8) item.item_id = reader.u32() if val_length >= 18: item.create_time = reader.u32() item.modify_time = reader.u32() if val_length >= 0xA8: reader.seekset(val_offset + 0x58) item.data_fork = HFSFork(reader) if val_length >= 0xF8: reader.seekset(val_offset + 0xA8) item.resource_fork = HFSFork(reader) self._items.append(item) def _build_path(self, item: HFSCatalogItem) -> str | None: parts = [item.name] current_id = item.parent_id seen = set() while current_id not in (0, 1): if current_id in seen: return None seen.add(current_id) parent = self._id_map.get(current_id) if parent is None: return None parts.append(parent.name) current_id = parent.parent_id parts.reverse() return '/'.join(parts) def files(self) -> Generator[tuple[str, bytes, datetime | None], None, None]: for item in self._items: if item.record_type != _RECORD.FILE: continue path = self._build_path(item) if path is None: continue data = B'' if item.data_fork is not None and item.data_fork.size > 0: data = self._read_fork(item.data_fork, item.item_id, _FORK_DATA) if not data and self._attributes: decompressed = self._decompress_decmpfs(item) if decompressed is not None: data = decompressed mtime = _mac_to_datetime(item.modify_time) yield path, data, mtimeMethods
def files(self)-
Expand source code Browse git
def files(self) -> Generator[tuple[str, bytes, datetime | None], None, None]: for item in self._items: if item.record_type != _RECORD.FILE: continue path = self._build_path(item) if path is None: continue data = B'' if item.data_fork is not None and item.data_fork.size > 0: data = self._read_fork(item.data_fork, item.item_id, _FORK_DATA) if not data and self._attributes: decompressed = self._decompress_decmpfs(item) if decompressed is not None: data = decompressed mtime = _mac_to_datetime(item.modify_time) yield path, data, mtime