Module refinery.lib.iso.udf
UDF (ECMA-167) filesystem parser ported from 7zip's Archive/Udf/ implementation.
Expand source code Browse git
"""
UDF (ECMA-167) filesystem parser ported from 7zip's Archive/Udf/ implementation.
"""
from __future__ import annotations
import itertools
import struct
from datetime import datetime, timedelta, timezone
from typing import Iterator
DEFAULT_SECTOR_SIZE = 2048
ANCHOR_SECTOR = 256
TAG_ID_PVD = 1
TAG_ID_ANCHOR = 2
TAG_ID_POINTER = 3
TAG_ID_IMPL_USE = 4
TAG_ID_PARTITION = 5
TAG_ID_LOGICAL_VOLUME = 6
TAG_ID_UNALLOC_SPACE = 7
TAG_ID_TERMINATOR = 8
TAG_ID_LOGICAL_VOLUME_INTEGRITY = 9
TAG_ID_FILE_SET = 256
TAG_ID_FILE_ID = 257
TAG_ID_ALLOC_EXTENT = 258
TAG_ID_INDIRECT_ENTRY = 259
TAG_ID_TERMINAL_ENTRY = 260
TAG_ID_FILE_ENTRY = 261
TAG_ID_EXTENDED_ATTR_HEADER = 262
TAG_ID_EXTENDED_FILE_ENTRY = 266
ICB_FILE_TYPE_DIR = 4
ICB_FILE_TYPE_FILE = 5
ICB_FILE_TYPE_SYMLINK = 12
ICB_DESC_TYPE_SHORT = 0
ICB_DESC_TYPE_LONG = 1
ICB_DESC_TYPE_EXTENDED = 2
ICB_DESC_TYPE_INLINE = 3
MAX_DIR_DEPTH = 256
MAX_ITEMS = 0x100000
class LogicalVolumeDescriptor:
__slots__ = ('block_size', 'fsd_location', 'fsd_length', 'fsd_partition')
def __init__(
self,
block_size: int,
fsd_location: int,
fsd_length: int,
fsd_partition: int,
):
self.block_size = block_size
self.fsd_location = fsd_location
self.fsd_length = fsd_length
self.fsd_partition = fsd_partition
class UDFRef:
__slots__ = ('path', 'date', 'extents', 'is_dir', 'inline_data', 'file_version', '_info_length')
def __init__(self, path: str, date: datetime | None, is_dir: bool = False):
self.path = path
self.date = date
self.extents: list[tuple[int, int]] = []
self.is_dir = is_dir
self.inline_data: bytes | memoryview | None = None
self.file_version: int = 0
self._info_length: int = -1
@property
def total_size(self) -> int:
if self._info_length >= 0:
return self._info_length
if self.inline_data is not None:
return len(self.inline_data)
return sum(s for _, s in self.extents)
def _read_u16(data: bytes | memoryview, offset: int) -> int:
return struct.unpack_from('<H', data, offset)[0]
def _read_u32(data: bytes | memoryview, offset: int) -> int:
return struct.unpack_from('<I', data, offset)[0]
def _read_u64(data: bytes | memoryview, offset: int) -> int:
return struct.unpack_from('<Q', data, offset)[0]
def _verify_tag(data: bytes | memoryview, offset: int = 0) -> int | None:
if offset + 16 > len(data):
return None
tag_id = _read_u16(data, offset)
checksum = 0
for i in range(16):
if i != 4:
checksum += data[offset + i]
checksum &= 0xFF
if checksum != data[offset + 4]:
return None
return tag_id
def _parse_timestamp(data: bytes | memoryview, offset: int) -> datetime | None:
if offset + 12 > len(data):
return None
type_and_tz = _read_u16(data, offset)
year = struct.unpack_from('<h', data, offset + 2)[0]
month = data[offset + 4]
day = data[offset + 5]
hour = data[offset + 6]
minute = data[offset + 7]
second = data[offset + 8]
if month < 1 or month > 12 or day < 1 or day > 31:
return None
tz_raw = type_and_tz & 0x0FFF
if tz_raw & 0x0800:
tz_raw = tz_raw - 0x1000
try:
if -1440 <= tz_raw <= 1440:
tz = timezone(timedelta(minutes=tz_raw))
else:
tz = timezone.utc
return datetime(year, month, day, hour, minute, second, tzinfo=tz)
except (ValueError, OverflowError):
return None
def _parse_short_ad(data: bytes | memoryview, offset: int) -> tuple[int, int]:
length = _read_u32(data, offset) & 0x3FFFFFFF
position = _read_u32(data, offset + 4)
return (position, length)
def _parse_long_ad(data: bytes | memoryview, offset: int) -> tuple[int, int, int]:
length = _read_u32(data, offset) & 0x3FFFFFFF
location = _read_u32(data, offset + 4)
partition = _read_u16(data, offset + 8)
return (location, length, partition)
class UDFArchive:
def __init__(self):
self.refs: list[UDFRef] = []
self._data: memoryview = memoryview(b'')
self._sector_size: int = DEFAULT_SECTOR_SIZE
self._partitions: dict[int, tuple[int, int]] = {}
self._logical_volumes: list[LogicalVolumeDescriptor] = []
def open(self, data: bytes | bytearray | memoryview) -> None:
self._data = memoryview(data)
if not self._find_anchor():
return
self._read_volume_descriptor_sequence()
self._read_file_sets()
def _find_anchor(self) -> bool:
for sector_size in (2048, 512, 4096):
self._sector_size = sector_size
anchor_pos = ANCHOR_SECTOR * sector_size
if anchor_pos + 16 > len(self._data):
continue
tag_id = _verify_tag(self._data, anchor_pos)
if tag_id == TAG_ID_ANCHOR:
return True
return False
def _sector_offset(self, sector: int) -> int:
return sector * self._sector_size
def _partition_offset(self, partition_num: int, block: int) -> int:
if partition_num in self._partitions:
start_sector, _ = self._partitions[partition_num]
return (start_sector + block) * self._sector_size
return block * self._sector_size
def _read_sector(self, sector: int) -> memoryview:
pos = self._sector_offset(sector)
end = pos + self._sector_size
if end > len(self._data):
return memoryview(b'')
return self._data[pos:end]
def _read_volume_descriptor_sequence(self) -> None:
anchor_pos = ANCHOR_SECTOR * self._sector_size
anchor_data = self._data[anchor_pos:anchor_pos + 512]
if len(anchor_data) < 32:
return
main_extent_len = _read_u32(anchor_data, 16)
main_extent_loc = _read_u32(anchor_data, 20)
sector = main_extent_loc
sectors_count = main_extent_len // self._sector_size
for _ in range(sectors_count):
sector_data = self._read_sector(sector)
if len(sector_data) < 16:
break
tag_id = _verify_tag(sector_data)
if tag_id is None or tag_id == TAG_ID_TERMINATOR:
break
if tag_id == TAG_ID_PARTITION:
self._parse_partition_descriptor(sector_data)
elif tag_id == TAG_ID_LOGICAL_VOLUME:
self._parse_logical_volume_descriptor(sector_data)
sector += 1
def _parse_partition_descriptor(self, data: bytes | memoryview) -> None:
if len(data) < 264:
return
partition_number = _read_u16(data, 22)
start_location = _read_u32(data, 188)
length = _read_u32(data, 192)
self._partitions[partition_number] = (start_location, length)
def _parse_logical_volume_descriptor(self, data: bytes | memoryview) -> None:
if len(data) < 248:
return
block_size = _read_u32(data, 212)
fsd_length = _read_u32(data, 248)
fsd_location = _read_u32(data, 252)
fsd_partition = _read_u16(data, 256)
self._logical_volumes.append(LogicalVolumeDescriptor(
block_size=block_size,
fsd_location=fsd_location,
fsd_length=fsd_length,
fsd_partition=fsd_partition,
))
def _read_file_sets(self) -> None:
for lv in self._logical_volumes:
fsd_loc = lv.fsd_location
fsd_part = lv.fsd_partition
offset = self._partition_offset(fsd_part, fsd_loc)
if offset + 512 > len(self._data):
continue
fsd_data = self._data[offset:offset + 512]
tag_id = _verify_tag(fsd_data)
if tag_id != TAG_ID_FILE_SET:
continue
if len(fsd_data) < 410:
continue
root_icb_loc = _read_u32(fsd_data, 404)
root_icb_part = _read_u16(fsd_data, 408)
visited: set[int] = set()
self._read_directory(root_icb_loc, root_icb_part, '', visited, 0)
def _read_directory(
self,
icb_loc: int,
icb_part: int,
parent_path: str,
visited: set[int],
depth: int
) -> None:
if depth > MAX_DIR_DEPTH:
return
key = (icb_part << 32) | icb_loc
if key in visited:
return
visited.add(key)
dir_data, _ = self._read_file_entry(icb_loc, icb_part)
if dir_data is None:
return
pos = 0
item_count = 0
while pos < len(dir_data) and item_count < MAX_ITEMS:
if pos + 38 > len(dir_data):
break
tag_id = _verify_tag(dir_data, pos)
if tag_id != TAG_ID_FILE_ID:
break
fid_len = self._parse_file_identifier(
dir_data, pos, parent_path, visited, depth)
if fid_len <= 0:
break
pos += fid_len
item_count += 1
def _parse_file_identifier(
self,
data: bytes | memoryview,
offset: int,
parent_path: str,
visited: set[int],
depth: int,
) -> int:
if offset + 38 > len(data):
return -1
file_version = _read_u16(data, offset + 16)
file_char = data[offset + 18]
icb_loc = _read_u32(data, offset + 24)
icb_part = _read_u16(data, offset + 28)
impl_use_len = _read_u16(data, offset + 36)
name_len = data[offset + 19] if offset + 19 < len(data) else 0
total_len = 38 + impl_use_len + name_len
padding = (4 - (total_len % 4)) % 4
total_len += padding
is_parent = bool(file_char & 0x08)
is_deleted = bool(file_char & 0x04)
is_dir = bool(file_char & 0x02)
if is_parent or is_deleted:
return total_len
name_start = offset + 38 + impl_use_len
if name_start + name_len > len(data):
return total_len
raw_name = data[name_start:name_start + name_len]
name = self._decode_name(raw_name)
if not name:
return total_len
full_path = F'{parent_path}/{name}' if parent_path else name
if is_dir:
ref = UDFRef(full_path, None, is_dir=True)
ref.file_version = file_version
self.refs.append(ref)
self._read_directory(icb_loc, icb_part, full_path, visited, depth + 1)
else:
file_data, file_date, file_size = self._read_file_entry_metadata(icb_loc, icb_part)
ref = UDFRef(full_path, file_date)
ref.file_version = file_version
ref._info_length = file_size
if file_data is not None:
ref.inline_data = file_data
else:
extents = self._read_file_entry_extents(icb_loc, icb_part)
ref.extents = extents
self.refs.append(ref)
return total_len
def _decode_name(self, raw: bytes | memoryview) -> str:
if not raw:
return ''
if raw[0] == 8:
try:
return bytes(raw[1:]).decode('utf-8', errors='replace')
except Exception:
return bytes(raw[1:]).decode('latin-1')
elif raw[0] == 16:
try:
return bytes(raw[1:]).decode('utf-16-be', errors='replace')
except Exception:
return bytes(raw[1:]).decode('latin-1')
return bytes(raw).decode('latin-1', errors='replace')
def _read_file_entry(
self,
icb_loc: int,
icb_part: int
) -> tuple[bytes | bytearray | memoryview | None, datetime | None]:
offset = self._partition_offset(icb_part, icb_loc)
if offset + 176 > len(self._data):
return None, None
entry_data = self._data[offset:offset + self._sector_size]
tag_id = _verify_tag(entry_data)
is_extended = (tag_id == TAG_ID_EXTENDED_FILE_ENTRY)
if tag_id != TAG_ID_FILE_ENTRY and not is_extended:
return None, None
icb_flags = _read_u16(entry_data, 34) if len(entry_data) > 36 else 0
desc_type = icb_flags & 0x07
info_length = _read_u64(entry_data, 56)
if is_extended:
ea_length = _read_u32(entry_data, 204)
ad_length = _read_u32(entry_data, 208)
ad_offset = 212 + ea_length
else:
ea_length = _read_u32(entry_data, 168)
ad_length = _read_u32(entry_data, 172)
ad_offset = 176 + ea_length
date = _parse_timestamp(entry_data, 92 if is_extended else 84) if len(entry_data) > 96 else None
if ad_offset + ad_length > len(entry_data):
ad_length = max(0, len(entry_data) - ad_offset)
if desc_type == ICB_DESC_TYPE_INLINE:
inline = entry_data[ad_offset:ad_offset + ad_length]
return inline[:info_length], date
result = self._resolve_allocations(entry_data, ad_offset, ad_length, desc_type, icb_part)
return result[:info_length], date
def _read_file_entry_metadata(
self,
icb_loc: int,
icb_part: int
) -> tuple[bytes | bytearray | memoryview | None, datetime | None, int]:
offset = self._partition_offset(icb_part, icb_loc)
if offset + 176 > len(self._data):
return None, None, 0
entry_data = self._data[offset:offset + self._sector_size]
tag_id = _verify_tag(entry_data)
is_extended = (tag_id == TAG_ID_EXTENDED_FILE_ENTRY)
if tag_id != TAG_ID_FILE_ENTRY and not is_extended:
return None, None, 0
icb_flags = _read_u16(entry_data, 34) if len(entry_data) > 36 else 0
desc_type = icb_flags & 0x07
info_length = _read_u64(entry_data, 56)
if is_extended:
ea_length = _read_u32(entry_data, 204)
ad_length = _read_u32(entry_data, 208)
ad_offset = 212 + ea_length
else:
ea_length = _read_u32(entry_data, 168)
ad_length = _read_u32(entry_data, 172)
ad_offset = 176 + ea_length
date = _parse_timestamp(entry_data, 92 if is_extended else 84) if len(entry_data) > 96 else None
if desc_type == ICB_DESC_TYPE_INLINE:
if ad_offset + ad_length <= len(entry_data):
return entry_data[ad_offset:ad_offset + ad_length], date, info_length
return b'', date, info_length
return None, date, info_length
def _read_file_entry_extents(
self,
icb_loc: int,
icb_part: int
) -> list[tuple[int, int]]:
offset = self._partition_offset(icb_part, icb_loc)
if offset + 176 > len(self._data):
return []
entry_data = self._data[offset:offset + self._sector_size]
tag_id = _verify_tag(entry_data)
is_extended = (tag_id == TAG_ID_EXTENDED_FILE_ENTRY)
if tag_id != TAG_ID_FILE_ENTRY and not is_extended:
return []
icb_flags = _read_u16(entry_data, 34) if len(entry_data) > 36 else 0
desc_type = icb_flags & 0x07
if is_extended:
ea_length = _read_u32(entry_data, 204)
ad_length = _read_u32(entry_data, 208)
ad_offset = 212 + ea_length
else:
ea_length = _read_u32(entry_data, 168)
ad_length = _read_u32(entry_data, 172)
ad_offset = 176 + ea_length
if ad_offset + ad_length > len(entry_data):
ad_length = max(0, len(entry_data) - ad_offset)
return self._collect_extents(entry_data, ad_offset, ad_length, desc_type, icb_part)
def _resolve_allocations(
self,
entry_data: bytes | memoryview,
ad_offset: int,
ad_length: int,
desc_type: int,
partition: int,
) -> bytearray:
extents = self._collect_extents(entry_data, ad_offset, ad_length, desc_type, partition)
result = bytearray()
for byte_offset, length in extents:
end = byte_offset + length
if end > len(self._data):
end = len(self._data)
if byte_offset < len(self._data):
result.extend(self._data[byte_offset:end])
else:
result.extend(itertools.repeat(0, length))
return result
def _collect_extents(
self,
entry_data: bytes | memoryview,
ad_offset: int,
ad_length: int,
desc_type: int,
partition: int,
) -> list[tuple[int, int]]:
extents: list[tuple[int, int]] = []
pos = ad_offset
end_pos = ad_offset + ad_length
if desc_type == ICB_DESC_TYPE_SHORT:
while pos + 8 <= end_pos:
block, length = _parse_short_ad(entry_data, pos)
if length == 0:
break
byte_off = self._partition_offset(partition, block)
extents.append((byte_off, length))
pos += 8
elif desc_type == ICB_DESC_TYPE_LONG:
while pos + 16 <= end_pos:
block, length, part = _parse_long_ad(entry_data, pos)
if length == 0:
break
byte_off = self._partition_offset(part, block)
extents.append((byte_off, length))
pos += 16
elif desc_type == ICB_DESC_TYPE_EXTENDED:
while pos + 20 <= end_pos:
length = _read_u32(entry_data, pos) & 0x3FFFFFFF
block = _read_u32(entry_data, pos + 12)
part_ref = _read_u16(entry_data, pos + 16)
if length == 0:
break
byte_off = self._partition_offset(part_ref, block)
extents.append((byte_off, length))
pos += 20
return extents
def entries(self) -> Iterator[UDFRef]:
for ref in self.refs:
if not ref.is_dir:
yield ref
def extract(self, ref: UDFRef) -> bytearray:
if ref.inline_data is None:
result = bytearray()
for offset, length in ref.extents:
end = offset + length
if end > len(self._data):
end = len(self._data)
if offset >= len(self._data):
result.extend(itertools.repeat(0, length))
else:
result.extend(self._data[offset:end])
else:
result = bytearray(ref.inline_data)
if (_r := ref._info_length) >= 0 and len(result) > _r:
del result[_r:]
return result
Classes
class LogicalVolumeDescriptor (block_size, fsd_location, fsd_length, fsd_partition)-
Expand source code Browse git
class LogicalVolumeDescriptor: __slots__ = ('block_size', 'fsd_location', 'fsd_length', 'fsd_partition') def __init__( self, block_size: int, fsd_location: int, fsd_length: int, fsd_partition: int, ): self.block_size = block_size self.fsd_location = fsd_location self.fsd_length = fsd_length self.fsd_partition = fsd_partitionInstance variables
var block_size-
Expand source code Browse git
class LogicalVolumeDescriptor: __slots__ = ('block_size', 'fsd_location', 'fsd_length', 'fsd_partition') def __init__( self, block_size: int, fsd_location: int, fsd_length: int, fsd_partition: int, ): self.block_size = block_size self.fsd_location = fsd_location self.fsd_length = fsd_length self.fsd_partition = fsd_partition var fsd_length-
Expand source code Browse git
class LogicalVolumeDescriptor: __slots__ = ('block_size', 'fsd_location', 'fsd_length', 'fsd_partition') def __init__( self, block_size: int, fsd_location: int, fsd_length: int, fsd_partition: int, ): self.block_size = block_size self.fsd_location = fsd_location self.fsd_length = fsd_length self.fsd_partition = fsd_partition var fsd_location-
Expand source code Browse git
class LogicalVolumeDescriptor: __slots__ = ('block_size', 'fsd_location', 'fsd_length', 'fsd_partition') def __init__( self, block_size: int, fsd_location: int, fsd_length: int, fsd_partition: int, ): self.block_size = block_size self.fsd_location = fsd_location self.fsd_length = fsd_length self.fsd_partition = fsd_partition var fsd_partition-
Expand source code Browse git
class LogicalVolumeDescriptor: __slots__ = ('block_size', 'fsd_location', 'fsd_length', 'fsd_partition') def __init__( self, block_size: int, fsd_location: int, fsd_length: int, fsd_partition: int, ): self.block_size = block_size self.fsd_location = fsd_location self.fsd_length = fsd_length self.fsd_partition = fsd_partition
class UDFRef (path, date, is_dir=False)-
Expand source code Browse git
class UDFRef: __slots__ = ('path', 'date', 'extents', 'is_dir', 'inline_data', 'file_version', '_info_length') def __init__(self, path: str, date: datetime | None, is_dir: bool = False): self.path = path self.date = date self.extents: list[tuple[int, int]] = [] self.is_dir = is_dir self.inline_data: bytes | memoryview | None = None self.file_version: int = 0 self._info_length: int = -1 @property def total_size(self) -> int: if self._info_length >= 0: return self._info_length if self.inline_data is not None: return len(self.inline_data) return sum(s for _, s in self.extents)Instance variables
var total_size-
Expand source code Browse git
@property def total_size(self) -> int: if self._info_length >= 0: return self._info_length if self.inline_data is not None: return len(self.inline_data) return sum(s for _, s in self.extents) var date-
Expand source code Browse git
class UDFRef: __slots__ = ('path', 'date', 'extents', 'is_dir', 'inline_data', 'file_version', '_info_length') def __init__(self, path: str, date: datetime | None, is_dir: bool = False): self.path = path self.date = date self.extents: list[tuple[int, int]] = [] self.is_dir = is_dir self.inline_data: bytes | memoryview | None = None self.file_version: int = 0 self._info_length: int = -1 @property def total_size(self) -> int: if self._info_length >= 0: return self._info_length if self.inline_data is not None: return len(self.inline_data) return sum(s for _, s in self.extents) var extents-
Expand source code Browse git
class UDFRef: __slots__ = ('path', 'date', 'extents', 'is_dir', 'inline_data', 'file_version', '_info_length') def __init__(self, path: str, date: datetime | None, is_dir: bool = False): self.path = path self.date = date self.extents: list[tuple[int, int]] = [] self.is_dir = is_dir self.inline_data: bytes | memoryview | None = None self.file_version: int = 0 self._info_length: int = -1 @property def total_size(self) -> int: if self._info_length >= 0: return self._info_length if self.inline_data is not None: return len(self.inline_data) return sum(s for _, s in self.extents) var file_version-
Expand source code Browse git
class UDFRef: __slots__ = ('path', 'date', 'extents', 'is_dir', 'inline_data', 'file_version', '_info_length') def __init__(self, path: str, date: datetime | None, is_dir: bool = False): self.path = path self.date = date self.extents: list[tuple[int, int]] = [] self.is_dir = is_dir self.inline_data: bytes | memoryview | None = None self.file_version: int = 0 self._info_length: int = -1 @property def total_size(self) -> int: if self._info_length >= 0: return self._info_length if self.inline_data is not None: return len(self.inline_data) return sum(s for _, s in self.extents) var inline_data-
Expand source code Browse git
class UDFRef: __slots__ = ('path', 'date', 'extents', 'is_dir', 'inline_data', 'file_version', '_info_length') def __init__(self, path: str, date: datetime | None, is_dir: bool = False): self.path = path self.date = date self.extents: list[tuple[int, int]] = [] self.is_dir = is_dir self.inline_data: bytes | memoryview | None = None self.file_version: int = 0 self._info_length: int = -1 @property def total_size(self) -> int: if self._info_length >= 0: return self._info_length if self.inline_data is not None: return len(self.inline_data) return sum(s for _, s in self.extents) var is_dir-
Expand source code Browse git
class UDFRef: __slots__ = ('path', 'date', 'extents', 'is_dir', 'inline_data', 'file_version', '_info_length') def __init__(self, path: str, date: datetime | None, is_dir: bool = False): self.path = path self.date = date self.extents: list[tuple[int, int]] = [] self.is_dir = is_dir self.inline_data: bytes | memoryview | None = None self.file_version: int = 0 self._info_length: int = -1 @property def total_size(self) -> int: if self._info_length >= 0: return self._info_length if self.inline_data is not None: return len(self.inline_data) return sum(s for _, s in self.extents) var path-
Expand source code Browse git
class UDFRef: __slots__ = ('path', 'date', 'extents', 'is_dir', 'inline_data', 'file_version', '_info_length') def __init__(self, path: str, date: datetime | None, is_dir: bool = False): self.path = path self.date = date self.extents: list[tuple[int, int]] = [] self.is_dir = is_dir self.inline_data: bytes | memoryview | None = None self.file_version: int = 0 self._info_length: int = -1 @property def total_size(self) -> int: if self._info_length >= 0: return self._info_length if self.inline_data is not None: return len(self.inline_data) return sum(s for _, s in self.extents)
class UDFArchive-
Expand source code Browse git
class UDFArchive: def __init__(self): self.refs: list[UDFRef] = [] self._data: memoryview = memoryview(b'') self._sector_size: int = DEFAULT_SECTOR_SIZE self._partitions: dict[int, tuple[int, int]] = {} self._logical_volumes: list[LogicalVolumeDescriptor] = [] def open(self, data: bytes | bytearray | memoryview) -> None: self._data = memoryview(data) if not self._find_anchor(): return self._read_volume_descriptor_sequence() self._read_file_sets() def _find_anchor(self) -> bool: for sector_size in (2048, 512, 4096): self._sector_size = sector_size anchor_pos = ANCHOR_SECTOR * sector_size if anchor_pos + 16 > len(self._data): continue tag_id = _verify_tag(self._data, anchor_pos) if tag_id == TAG_ID_ANCHOR: return True return False def _sector_offset(self, sector: int) -> int: return sector * self._sector_size def _partition_offset(self, partition_num: int, block: int) -> int: if partition_num in self._partitions: start_sector, _ = self._partitions[partition_num] return (start_sector + block) * self._sector_size return block * self._sector_size def _read_sector(self, sector: int) -> memoryview: pos = self._sector_offset(sector) end = pos + self._sector_size if end > len(self._data): return memoryview(b'') return self._data[pos:end] def _read_volume_descriptor_sequence(self) -> None: anchor_pos = ANCHOR_SECTOR * self._sector_size anchor_data = self._data[anchor_pos:anchor_pos + 512] if len(anchor_data) < 32: return main_extent_len = _read_u32(anchor_data, 16) main_extent_loc = _read_u32(anchor_data, 20) sector = main_extent_loc sectors_count = main_extent_len // self._sector_size for _ in range(sectors_count): sector_data = self._read_sector(sector) if len(sector_data) < 16: break tag_id = _verify_tag(sector_data) if tag_id is None or tag_id == TAG_ID_TERMINATOR: break if tag_id == TAG_ID_PARTITION: self._parse_partition_descriptor(sector_data) elif tag_id == TAG_ID_LOGICAL_VOLUME: self._parse_logical_volume_descriptor(sector_data) sector += 1 def _parse_partition_descriptor(self, data: bytes | memoryview) -> None: if len(data) < 264: return partition_number = _read_u16(data, 22) start_location = _read_u32(data, 188) length = _read_u32(data, 192) self._partitions[partition_number] = (start_location, length) def _parse_logical_volume_descriptor(self, data: bytes | memoryview) -> None: if len(data) < 248: return block_size = _read_u32(data, 212) fsd_length = _read_u32(data, 248) fsd_location = _read_u32(data, 252) fsd_partition = _read_u16(data, 256) self._logical_volumes.append(LogicalVolumeDescriptor( block_size=block_size, fsd_location=fsd_location, fsd_length=fsd_length, fsd_partition=fsd_partition, )) def _read_file_sets(self) -> None: for lv in self._logical_volumes: fsd_loc = lv.fsd_location fsd_part = lv.fsd_partition offset = self._partition_offset(fsd_part, fsd_loc) if offset + 512 > len(self._data): continue fsd_data = self._data[offset:offset + 512] tag_id = _verify_tag(fsd_data) if tag_id != TAG_ID_FILE_SET: continue if len(fsd_data) < 410: continue root_icb_loc = _read_u32(fsd_data, 404) root_icb_part = _read_u16(fsd_data, 408) visited: set[int] = set() self._read_directory(root_icb_loc, root_icb_part, '', visited, 0) def _read_directory( self, icb_loc: int, icb_part: int, parent_path: str, visited: set[int], depth: int ) -> None: if depth > MAX_DIR_DEPTH: return key = (icb_part << 32) | icb_loc if key in visited: return visited.add(key) dir_data, _ = self._read_file_entry(icb_loc, icb_part) if dir_data is None: return pos = 0 item_count = 0 while pos < len(dir_data) and item_count < MAX_ITEMS: if pos + 38 > len(dir_data): break tag_id = _verify_tag(dir_data, pos) if tag_id != TAG_ID_FILE_ID: break fid_len = self._parse_file_identifier( dir_data, pos, parent_path, visited, depth) if fid_len <= 0: break pos += fid_len item_count += 1 def _parse_file_identifier( self, data: bytes | memoryview, offset: int, parent_path: str, visited: set[int], depth: int, ) -> int: if offset + 38 > len(data): return -1 file_version = _read_u16(data, offset + 16) file_char = data[offset + 18] icb_loc = _read_u32(data, offset + 24) icb_part = _read_u16(data, offset + 28) impl_use_len = _read_u16(data, offset + 36) name_len = data[offset + 19] if offset + 19 < len(data) else 0 total_len = 38 + impl_use_len + name_len padding = (4 - (total_len % 4)) % 4 total_len += padding is_parent = bool(file_char & 0x08) is_deleted = bool(file_char & 0x04) is_dir = bool(file_char & 0x02) if is_parent or is_deleted: return total_len name_start = offset + 38 + impl_use_len if name_start + name_len > len(data): return total_len raw_name = data[name_start:name_start + name_len] name = self._decode_name(raw_name) if not name: return total_len full_path = F'{parent_path}/{name}' if parent_path else name if is_dir: ref = UDFRef(full_path, None, is_dir=True) ref.file_version = file_version self.refs.append(ref) self._read_directory(icb_loc, icb_part, full_path, visited, depth + 1) else: file_data, file_date, file_size = self._read_file_entry_metadata(icb_loc, icb_part) ref = UDFRef(full_path, file_date) ref.file_version = file_version ref._info_length = file_size if file_data is not None: ref.inline_data = file_data else: extents = self._read_file_entry_extents(icb_loc, icb_part) ref.extents = extents self.refs.append(ref) return total_len def _decode_name(self, raw: bytes | memoryview) -> str: if not raw: return '' if raw[0] == 8: try: return bytes(raw[1:]).decode('utf-8', errors='replace') except Exception: return bytes(raw[1:]).decode('latin-1') elif raw[0] == 16: try: return bytes(raw[1:]).decode('utf-16-be', errors='replace') except Exception: return bytes(raw[1:]).decode('latin-1') return bytes(raw).decode('latin-1', errors='replace') def _read_file_entry( self, icb_loc: int, icb_part: int ) -> tuple[bytes | bytearray | memoryview | None, datetime | None]: offset = self._partition_offset(icb_part, icb_loc) if offset + 176 > len(self._data): return None, None entry_data = self._data[offset:offset + self._sector_size] tag_id = _verify_tag(entry_data) is_extended = (tag_id == TAG_ID_EXTENDED_FILE_ENTRY) if tag_id != TAG_ID_FILE_ENTRY and not is_extended: return None, None icb_flags = _read_u16(entry_data, 34) if len(entry_data) > 36 else 0 desc_type = icb_flags & 0x07 info_length = _read_u64(entry_data, 56) if is_extended: ea_length = _read_u32(entry_data, 204) ad_length = _read_u32(entry_data, 208) ad_offset = 212 + ea_length else: ea_length = _read_u32(entry_data, 168) ad_length = _read_u32(entry_data, 172) ad_offset = 176 + ea_length date = _parse_timestamp(entry_data, 92 if is_extended else 84) if len(entry_data) > 96 else None if ad_offset + ad_length > len(entry_data): ad_length = max(0, len(entry_data) - ad_offset) if desc_type == ICB_DESC_TYPE_INLINE: inline = entry_data[ad_offset:ad_offset + ad_length] return inline[:info_length], date result = self._resolve_allocations(entry_data, ad_offset, ad_length, desc_type, icb_part) return result[:info_length], date def _read_file_entry_metadata( self, icb_loc: int, icb_part: int ) -> tuple[bytes | bytearray | memoryview | None, datetime | None, int]: offset = self._partition_offset(icb_part, icb_loc) if offset + 176 > len(self._data): return None, None, 0 entry_data = self._data[offset:offset + self._sector_size] tag_id = _verify_tag(entry_data) is_extended = (tag_id == TAG_ID_EXTENDED_FILE_ENTRY) if tag_id != TAG_ID_FILE_ENTRY and not is_extended: return None, None, 0 icb_flags = _read_u16(entry_data, 34) if len(entry_data) > 36 else 0 desc_type = icb_flags & 0x07 info_length = _read_u64(entry_data, 56) if is_extended: ea_length = _read_u32(entry_data, 204) ad_length = _read_u32(entry_data, 208) ad_offset = 212 + ea_length else: ea_length = _read_u32(entry_data, 168) ad_length = _read_u32(entry_data, 172) ad_offset = 176 + ea_length date = _parse_timestamp(entry_data, 92 if is_extended else 84) if len(entry_data) > 96 else None if desc_type == ICB_DESC_TYPE_INLINE: if ad_offset + ad_length <= len(entry_data): return entry_data[ad_offset:ad_offset + ad_length], date, info_length return b'', date, info_length return None, date, info_length def _read_file_entry_extents( self, icb_loc: int, icb_part: int ) -> list[tuple[int, int]]: offset = self._partition_offset(icb_part, icb_loc) if offset + 176 > len(self._data): return [] entry_data = self._data[offset:offset + self._sector_size] tag_id = _verify_tag(entry_data) is_extended = (tag_id == TAG_ID_EXTENDED_FILE_ENTRY) if tag_id != TAG_ID_FILE_ENTRY and not is_extended: return [] icb_flags = _read_u16(entry_data, 34) if len(entry_data) > 36 else 0 desc_type = icb_flags & 0x07 if is_extended: ea_length = _read_u32(entry_data, 204) ad_length = _read_u32(entry_data, 208) ad_offset = 212 + ea_length else: ea_length = _read_u32(entry_data, 168) ad_length = _read_u32(entry_data, 172) ad_offset = 176 + ea_length if ad_offset + ad_length > len(entry_data): ad_length = max(0, len(entry_data) - ad_offset) return self._collect_extents(entry_data, ad_offset, ad_length, desc_type, icb_part) def _resolve_allocations( self, entry_data: bytes | memoryview, ad_offset: int, ad_length: int, desc_type: int, partition: int, ) -> bytearray: extents = self._collect_extents(entry_data, ad_offset, ad_length, desc_type, partition) result = bytearray() for byte_offset, length in extents: end = byte_offset + length if end > len(self._data): end = len(self._data) if byte_offset < len(self._data): result.extend(self._data[byte_offset:end]) else: result.extend(itertools.repeat(0, length)) return result def _collect_extents( self, entry_data: bytes | memoryview, ad_offset: int, ad_length: int, desc_type: int, partition: int, ) -> list[tuple[int, int]]: extents: list[tuple[int, int]] = [] pos = ad_offset end_pos = ad_offset + ad_length if desc_type == ICB_DESC_TYPE_SHORT: while pos + 8 <= end_pos: block, length = _parse_short_ad(entry_data, pos) if length == 0: break byte_off = self._partition_offset(partition, block) extents.append((byte_off, length)) pos += 8 elif desc_type == ICB_DESC_TYPE_LONG: while pos + 16 <= end_pos: block, length, part = _parse_long_ad(entry_data, pos) if length == 0: break byte_off = self._partition_offset(part, block) extents.append((byte_off, length)) pos += 16 elif desc_type == ICB_DESC_TYPE_EXTENDED: while pos + 20 <= end_pos: length = _read_u32(entry_data, pos) & 0x3FFFFFFF block = _read_u32(entry_data, pos + 12) part_ref = _read_u16(entry_data, pos + 16) if length == 0: break byte_off = self._partition_offset(part_ref, block) extents.append((byte_off, length)) pos += 20 return extents def entries(self) -> Iterator[UDFRef]: for ref in self.refs: if not ref.is_dir: yield ref def extract(self, ref: UDFRef) -> bytearray: if ref.inline_data is None: result = bytearray() for offset, length in ref.extents: end = offset + length if end > len(self._data): end = len(self._data) if offset >= len(self._data): result.extend(itertools.repeat(0, length)) else: result.extend(self._data[offset:end]) else: result = bytearray(ref.inline_data) if (_r := ref._info_length) >= 0 and len(result) > _r: del result[_r:] return resultMethods
def open(self, data)-
Expand source code Browse git
def open(self, data: bytes | bytearray | memoryview) -> None: self._data = memoryview(data) if not self._find_anchor(): return self._read_volume_descriptor_sequence() self._read_file_sets() def entries(self)-
Expand source code Browse git
def entries(self) -> Iterator[UDFRef]: for ref in self.refs: if not ref.is_dir: yield ref def extract(self, ref)-
Expand source code Browse git
def extract(self, ref: UDFRef) -> bytearray: if ref.inline_data is None: result = bytearray() for offset, length in ref.extents: end = offset + length if end > len(self._data): end = len(self._data) if offset >= len(self._data): result.extend(itertools.repeat(0, length)) else: result.extend(self._data[offset:end]) else: result = bytearray(ref.inline_data) if (_r := ref._info_length) >= 0 and len(result) > _r: del result[_r:] return result