Module refinery.lib.iso.iso9660
ISO 9660 filesystem parser ported from 7zip's Archive/Iso/ implementation.
Expand source code Browse git
"""
ISO 9660 filesystem parser ported from 7zip's Archive/Iso/ implementation.
"""
from __future__ import annotations
import itertools
import struct
from datetime import datetime, timedelta, timezone
from refinery.lib.id import buffer_offset
from refinery.lib.iso import FileSystemType
BLOCK_SIZE = 2048
START_POS = 0x8000
VD_TYPE_BOOT = 0
VD_TYPE_PRIMARY = 1
VD_TYPE_SUPPLEMENTARY = 2
VD_TYPE_PARTITION = 3
VD_TYPE_TERMINATOR = 255
FILE_FLAG_DIR = 0x02
FILE_FLAG_NON_FINAL_EXTENT = 0x80
SUSP_TAG_SP = b'SP'
SUSP_TAG_CE = b'CE'
SUSP_TAG_NM = b'NM'
SUSP_TAG_PX = b'PX'
SUSP_TAG_SL = b'SL'
SUSP_TAG_RR = b'RR'
MAX_DIR_DEPTH = 256
MAX_DIR_RECORDS = 0x100000
def _read_both_endian_u32(data: bytes | memoryview, offset: int) -> int:
return struct.unpack_from('<I', data, offset)[0]
def _read_both_endian_u16(data: bytes | memoryview, offset: int) -> int:
return struct.unpack_from('<H', data, offset)[0]
def _parse_recording_datetime(data: bytes | memoryview) -> datetime | None:
if len(data) < 7:
return None
year = data[0] + 1900
month = data[1]
day = data[2]
hour = data[3]
minute = data[4]
second = data[5]
gmt_offset = struct.unpack_from('b', data, 6)[0]
if month < 1 or month > 12:
return None
if day < 1 or day > 31:
return None
try:
tz = timezone(timedelta(minutes=15 * gmt_offset))
return datetime(year, month, day, hour, minute, second, tzinfo=tz)
except (ValueError, OverflowError):
return None
class DirRecord:
__slots__ = (
'extent_location',
'data_length',
'datetime',
'file_flags',
'file_id',
'system_use',
)
def __init__(self):
self.extent_location: int = 0
self.data_length: int = 0
self.datetime: datetime | None = None
self.file_flags: int = 0
self.file_id: bytes | memoryview = b''
self.system_use: bytes | memoryview = b''
@property
def is_dir(self) -> bool:
return bool(self.file_flags & FILE_FLAG_DIR)
@property
def is_non_final_extent(self) -> bool:
return bool(self.file_flags & FILE_FLAG_NON_FINAL_EXTENT)
@property
def is_system_item(self) -> bool:
return self.file_id == b'\x00' or self.file_id == b'\x01'
class VolumeDescriptor:
__slots__ = (
'vd_type',
'system_id',
'volume_id',
'volume_space_size',
'volume_set_size',
'volume_sequence_number',
'logical_block_size',
'path_table_size',
'root_dir_record',
'escape_sequence',
'is_supplementary',
)
def __init__(self):
self.vd_type: int = 0
self.system_id: bytes | memoryview = b''
self.volume_id: bytes | memoryview = b''
self.volume_space_size: int = 0
self.volume_set_size: int = 0
self.volume_sequence_number: int = 0
self.logical_block_size: int = 0
self.path_table_size: int = 0
self.root_dir_record: DirRecord | None = None
self.escape_sequence: bytes | memoryview = b''
self.is_supplementary: bool = False
@property
def is_joliet(self) -> bool:
es = self.escape_sequence[:3]
return es == b'%/@' or es == b'%/C' or es == b'%/E'
class ISORef:
__slots__ = ('path', 'date', 'extents', 'is_dir', '_inline')
def __init__(self, path: str, date: datetime | None, is_dir: bool = False):
self.path = path
self.date = date
self.extents: list[tuple[int, int]] = []
self.is_dir = is_dir
self._inline: bytes | None = None
@property
def total_size(self) -> int:
if self._inline is not None:
return len(self._inline)
return sum(s for _, s in self.extents)
def _find_susp_entry(system_use: bytes | memoryview, tag: bytes, offset: int = 0) -> bytes | memoryview | None:
pos = offset
while pos + 4 <= len(system_use):
entry_tag = system_use[pos:pos + 2]
entry_len = system_use[pos + 2]
if entry_len < 4:
break
if pos + entry_len > len(system_use):
break
if entry_tag == tag:
return system_use[pos:pos + entry_len]
pos += entry_len
return None
def _get_susp_ce(system_use: bytes | memoryview, susp_offset: int = 0) -> tuple[int, int, int] | None:
entry = _find_susp_entry(system_use, SUSP_TAG_CE, susp_offset)
if entry is None or len(entry) < 28:
return None
block = _read_both_endian_u32(entry, 4)
offset = _read_both_endian_u32(entry, 12)
length = _read_both_endian_u32(entry, 20)
return (block, offset, length)
def _check_susp(root_record: DirRecord) -> int:
su = root_record.system_use
for offset in (0, 14):
sp = _find_susp_entry(su, SUSP_TAG_SP, offset)
if sp and len(sp) >= 7 and sp[4] == 0xBE and sp[5] == 0xEF:
skip_len = sp[6]
return offset + skip_len
return -1
def _get_name_rr(
system_use: bytes | memoryview,
susp_offset: int,
data: memoryview
) -> str | None:
parts: list[bytes] = []
su = system_use
su_off = susp_offset
while True:
nm = _find_susp_entry(su, SUSP_TAG_NM, su_off)
if nm is None or len(nm) < 5:
break
flags = nm[4]
name_part = nm[5:]
parts.append(name_part)
if not (flags & 0x01):
break
su_off_next = buffer_offset(su, nm, su_off)
if su_off_next < 0:
su_off_next = len(su)
else:
su_off_next = len(nm) + su_off_next
ce = _get_susp_ce(su, su_off)
if ce:
block, off, length = ce
ce_pos = block * BLOCK_SIZE + off
if ce_pos + length <= len(data):
su = data[ce_pos:ce_pos + length]
su_off = 0
continue
su_off = su_off_next
if not parts:
return None
return b''.join(parts).decode('utf-8', errors='replace')
def _get_symlink_rr(
system_use: bytes | memoryview,
susp_offset: int,
data: memoryview
) -> str | None:
entry = _find_susp_entry(system_use, SUSP_TAG_SL, susp_offset)
if entry is None or len(entry) < 5:
return None
return None
def _has_rr(system_use: bytes | memoryview, susp_offset: int) -> bool:
entry = _find_susp_entry(system_use, SUSP_TAG_RR, susp_offset)
if entry is not None:
return True
entry = _find_susp_entry(system_use, SUSP_TAG_PX, susp_offset)
return entry is not None
def _parse_fat_image(
data: memoryview | bytes,
) -> list[tuple[str, datetime | None, bytes]] | None:
"""Parse a FAT12/FAT16 floppy or hard-disk image embedded in an El Torito
boot entry and return a list of *(path, date, content)* triples. Returns
``None`` when the image does not look like a valid FAT volume.
"""
if len(data) < 512:
return None
bps = struct.unpack_from('<H', data, 11)[0]
if bps not in (512, 1024, 2048, 4096):
return None
spc = data[13]
if spc == 0 or spc & (spc - 1):
return None
reserved = struct.unpack_from('<H', data, 14)[0]
num_fats = data[16]
root_entries = struct.unpack_from('<H', data, 17)[0]
total_sectors = struct.unpack_from('<H', data, 19)[0]
if total_sectors == 0:
total_sectors = struct.unpack_from('<I', data, 32)[0]
spf = struct.unpack_from('<H', data, 22)[0]
if num_fats == 0 or spf == 0 or root_entries == 0:
return None
fat_offset = reserved * bps
root_dir_offset = fat_offset + num_fats * spf * bps
root_dir_size = root_entries * 32
data_offset = root_dir_offset + root_dir_size
if data_offset > len(data):
return None
root_dir_sectors = (root_entries * 32 + bps - 1) // bps
total_data_sectors = total_sectors - (reserved + num_fats * spf + root_dir_sectors)
total_clusters = total_data_sectors // spc
is_fat16 = total_clusters >= 4085
cluster_bytes = spc * bps
fat = data[fat_offset:fat_offset + spf * bps]
def next_cluster(c: int) -> int:
if is_fat16:
off = c * 2
if off + 2 > len(fat):
return 0xFFFF
return struct.unpack_from('<H', fat, off)[0]
off = c * 3 // 2
if off + 2 > len(fat):
return 0xFFF
val = struct.unpack_from('<H', fat, off)[0]
return val & 0xFFF if c % 2 == 0 else val >> 4
end_mark = 0xFFF8 if is_fat16 else 0xFF8
def extract_chain(start: int, size: int) -> bytes:
result = bytearray()
remaining = size
c = start
while c >= 2 and c < end_mark and remaining > 0:
cluster_off = data_offset + (c - 2) * cluster_bytes
chunk = min(cluster_bytes, remaining)
if cluster_off + chunk > len(data):
break
result.extend(data[cluster_off:cluster_off + chunk])
remaining -= chunk
c = next_cluster(c)
return bytes(result[:size])
def parse_datetime(entry: bytes | memoryview) -> datetime | None:
time_val = struct.unpack_from('<H', entry, 22)[0]
date_val = struct.unpack_from('<H', entry, 24)[0]
if date_val == 0:
return None
try:
return datetime(
((date_val >> 9) & 0x7F) + 1980,
(date_val >> 5) & 0xF,
date_val & 0x1F,
(time_val >> 11) & 0x1F,
(time_val >> 5) & 0x3F,
(time_val & 0x1F) * 2,
)
except (ValueError, OverflowError):
return None
files: list[tuple[str, datetime | None, bytes]] = []
def walk_directory(
dir_data: bytes | memoryview,
prefix: str,
depth: int = 0,
) -> None:
if depth > 16:
return
for i in range(0, len(dir_data) - 31, 32):
entry = dir_data[i:i + 32]
if entry[0] == 0:
break
if entry[0] == 0xE5:
continue
attr = entry[11]
if attr == 0x0F or attr & 0x08:
continue
raw_name = bytes(entry[0:8]).rstrip(b' ')
raw_ext = bytes(entry[8:11]).rstrip(b' ')
try:
fname = raw_name.decode('ascii')
if raw_ext:
fname += '.' + raw_ext.decode('ascii')
except (UnicodeDecodeError, ValueError):
continue
fname = fname.lower()
cluster = struct.unpack_from('<H', entry, 26)[0]
size = struct.unpack_from('<I', entry, 28)[0]
dt = parse_datetime(entry)
full_path = F'{prefix}/{fname}' if prefix else fname
if attr & 0x10:
if fname in ('.', '..'):
continue
sub = extract_chain(cluster, cluster_bytes * 256)
walk_directory(sub, full_path, depth + 1)
else:
files.append((full_path, dt, extract_chain(cluster, size)))
walk_directory(data[root_dir_offset:root_dir_offset + root_dir_size], '')
return files or None
class ISO9660Archive:
def __init__(self):
self.refs: list[ISORef] = []
self._data: memoryview = memoryview(b'')
self._block_size: int = BLOCK_SIZE
self.filesystem_type: FileSystemType = FileSystemType.ISO
self._has_rr: bool = False
self._has_joliet: bool = False
self._primary_vd: VolumeDescriptor | None = None
self._joliet_vd: VolumeDescriptor | None = None
self._boot_record_found: bool = False
self._boot_catalog_location: int = 0
def open(self, data: bytes | bytearray | memoryview) -> None:
self._data = memoryview(data)
volume_descriptors: list[VolumeDescriptor] = []
pos = START_POS
while pos + BLOCK_SIZE <= len(self._data):
block = self._data[pos:pos + BLOCK_SIZE]
if len(block) < 7:
break
vd_type = block[0]
magic = block[1:6]
if magic != b'CD001':
break
if vd_type == VD_TYPE_TERMINATOR:
pos += BLOCK_SIZE
break
elif vd_type == VD_TYPE_BOOT:
self._parse_boot_record(block)
pos += BLOCK_SIZE
continue
elif vd_type == VD_TYPE_PRIMARY or vd_type == VD_TYPE_SUPPLEMENTARY:
vd = self._parse_volume_descriptor(block, vd_type)
if vd is not None:
volume_descriptors.append(vd)
pos += BLOCK_SIZE
primary: VolumeDescriptor | None = None
joliet: VolumeDescriptor | None = None
for vd in volume_descriptors:
if vd.is_supplementary and vd.is_joliet:
joliet = vd
elif not vd.is_supplementary:
if primary is None:
primary = vd
else:
primary = vd
self._primary_vd = primary
self._joliet_vd = joliet
self._has_joliet = joliet is not None
if joliet:
self._read_directory_tree(joliet, is_joliet=True)
elif primary:
self._read_directory_tree(primary, is_joliet=False)
if self._boot_record_found:
self._extract_boot_images()
def _parse_boot_record(self, block: bytes | memoryview) -> None:
boot_system_id = bytes(block[7:39])
if boot_system_id.rstrip(b'\x00').rstrip(b'\x20') == b'EL TORITO SPECIFICATION':
self._boot_record_found = True
self._boot_catalog_location = struct.unpack_from('<I', block, 71)[0]
def _parse_volume_descriptor(
self,
block: bytes | memoryview,
vd_type: int
) -> VolumeDescriptor | None:
vd = VolumeDescriptor()
vd.vd_type = vd_type
vd.is_supplementary = (vd_type == VD_TYPE_SUPPLEMENTARY)
vd.system_id = block[8:40]
vd.volume_id = block[40:72]
vd.volume_space_size = _read_both_endian_u32(block, 80)
vd.escape_sequence = block[88:120]
if not vd.is_supplementary:
vd.escape_sequence = b''
vd.volume_set_size = _read_both_endian_u16(block, 120)
vd.volume_sequence_number = _read_both_endian_u16(block, 124)
vd.logical_block_size = _read_both_endian_u16(block, 128)
vd.path_table_size = _read_both_endian_u32(block, 132)
root_record_data = block[156:190]
if len(root_record_data) >= 34:
vd.root_dir_record = self._parse_dir_record(root_record_data)
return vd
def _parse_dir_record(self, data: bytes | memoryview) -> DirRecord | None:
if len(data) < 34:
return None
rec = DirRecord()
record_len = data[0]
if record_len < 34:
record_len = 34
rec.extent_location = _read_both_endian_u32(data, 2)
rec.data_length = _read_both_endian_u32(data, 10)
rec.datetime = _parse_recording_datetime(data[18:25])
rec.file_flags = data[25]
file_id_len = data[32]
if 33 + file_id_len > len(data):
file_id_len = len(data) - 33
rec.file_id = data[33:33 + file_id_len]
su_start = 33 + file_id_len
if file_id_len % 2 == 0:
su_start += 1
if su_start < len(data):
rec.system_use = data[su_start:min(len(data), record_len)]
return rec
def _read_directory_tree(
self,
vd: VolumeDescriptor,
is_joliet: bool
) -> None:
root = vd.root_dir_record
if root is None:
return
susp_offset = -1
has_rr = False
if not is_joliet:
actual_root = self._read_root_dot_entry(root)
susp_root = actual_root or root
susp_offset = _check_susp(susp_root)
if susp_offset >= 0:
has_rr = _has_rr(susp_root.system_use, susp_offset)
self._has_rr = has_rr
if is_joliet:
self.filesystem_type = FileSystemType.JOLIET
elif has_rr:
self.filesystem_type = FileSystemType.RR
else:
self.filesystem_type = FileSystemType.ISO
self.refs.clear()
visited: set[int] = set()
self._walk_directory(
root.extent_location,
root.data_length,
'',
is_joliet,
has_rr,
susp_offset,
visited,
0,
)
def _walk_directory(
self,
extent_loc: int,
extent_size: int,
parent_path: str,
is_joliet: bool,
has_rr: bool,
susp_offset: int,
visited: set[int],
depth: int,
) -> None:
if depth > MAX_DIR_DEPTH:
return
if extent_loc in visited:
return
visited.add(extent_loc)
dir_pos = extent_loc * self._block_size
dir_end = dir_pos + extent_size
if dir_pos >= len(self._data) or dir_end > len(self._data):
dir_end = min(dir_end, len(self._data))
pos = dir_pos
record_count = 0
pending_ref: ISORef | None = None
while pos < dir_end and record_count < MAX_DIR_RECORDS:
if pos >= len(self._data):
break
record_len = self._data[pos]
if record_len == 0:
next_block = ((pos - dir_pos) // self._block_size + 1) * self._block_size + dir_pos
if next_block >= dir_end:
break
pos = next_block
continue
if record_len < 34:
pos += 1
continue
end = pos + record_len
if end > len(self._data):
break
record_data = self._data[pos:end]
rec = self._parse_dir_record(record_data)
pos = end
record_count += 1
if rec is None:
continue
if rec.is_system_item:
continue
name = self._decode_name(rec, is_joliet, has_rr, susp_offset)
if not name:
continue
full_path = F'{parent_path}/{name}' if parent_path else name
if rec.is_non_final_extent and not rec.is_dir:
if pending_ref is None or pending_ref.path != full_path:
pending_ref = ISORef(full_path, rec.datetime)
self.refs.append(pending_ref)
pending_ref.extents.append((rec.extent_location, rec.data_length))
continue
elif pending_ref is not None and pending_ref.path == full_path:
pending_ref.extents.append((rec.extent_location, rec.data_length))
pending_ref = None
continue
else:
pending_ref = None
if rec.is_dir:
ref = ISORef(full_path, rec.datetime, is_dir=True)
self.refs.append(ref)
self._walk_directory(
rec.extent_location,
rec.data_length,
full_path,
is_joliet,
has_rr,
susp_offset,
visited,
depth + 1,
)
else:
ref = ISORef(full_path, rec.datetime)
ref.extents.append((rec.extent_location, rec.data_length))
self.refs.append(ref)
def _decode_name(
self,
rec: DirRecord,
is_joliet: bool,
has_rr: bool,
susp_offset: int,
) -> str:
if has_rr and susp_offset >= 0:
rr_name = _get_name_rr(rec.system_use, susp_offset, self._data)
if rr_name:
return rr_name
if is_joliet:
try:
name = bytes(rec.file_id).decode('utf-16-be')
except (UnicodeDecodeError, ValueError):
name = bytes(rec.file_id).decode('latin-1')
else:
name = bytes(rec.file_id).decode('latin-1')
name = self._strip_revision(name)
if name.endswith('.'):
name = name[:-1]
return name
@staticmethod
def _strip_revision(name: str) -> str:
base, sep, revision = name.partition(';')
if sep and revision.isdigit():
return base
return name
def select_filesystem(self, fs: FileSystemType) -> None:
if fs is self.filesystem_type:
return
if fs is FileSystemType.JOLIET and self._joliet_vd:
self._read_directory_tree(self._joliet_vd, is_joliet=True)
elif fs is FileSystemType.RR and self._primary_vd:
self._read_directory_tree(self._primary_vd, is_joliet=False)
if not self._has_rr:
self.filesystem_type = FileSystemType.ISO
elif fs is FileSystemType.ISO and self._primary_vd:
self._has_rr = False
self._read_directory_tree_plain(self._primary_vd)
def _read_directory_tree_plain(self, vd: VolumeDescriptor) -> None:
root = vd.root_dir_record
if root is None:
return
self.filesystem_type = FileSystemType.ISO
self.refs.clear()
visited: set[int] = set()
self._walk_directory(
root.extent_location,
root.data_length,
'',
False,
False,
-1,
visited,
0,
)
def _read_root_dot_entry(self, root: DirRecord) -> DirRecord | None:
"""Read the actual '.' entry from the root directory data on disk.
The root directory record stored in the Volume Descriptor is a fixed
34-byte copy that lacks the system use area. The real first entry in
the root directory sector may be much larger and contain SUSP/RR data.
"""
dir_pos = root.extent_location * self._block_size
if dir_pos >= len(self._data):
return None
rec_len = self._data[dir_pos]
if rec_len < 34:
return None
end = dir_pos + rec_len
if end > len(self._data):
return None
return self._parse_dir_record(self._data[dir_pos:end])
def _extract_boot_images(self) -> None:
from refinery.lib.iso.eltorito import BootCatalog
catalog_pos = self._boot_catalog_location * self._block_size
if catalog_pos + 2048 > len(self._data):
return
catalog_data = bytes(self._data[catalog_pos:catalog_pos + 2048])
catalog = BootCatalog.parse(catalog_data)
if catalog is None:
return
existing_extents: set[int] = set()
for ref in self.refs:
for block, _ in ref.extents:
existing_extents.add(block)
for entry in catalog.entries:
if not entry.is_bootable:
continue
if entry.load_rba in existing_extents:
continue
img_pos = entry.load_rba * self._block_size
img_size = entry.image_size
if img_pos >= len(self._data):
continue
if img_pos + img_size > len(self._data):
img_size = len(self._data) - img_pos
if img_size <= 0:
continue
fat_files = _parse_fat_image(self._data[img_pos:img_pos + img_size])
if fat_files:
for path, date, file_data in fat_files:
ref = ISORef(path, date)
ref._inline = file_data
self.refs.append(ref)
else:
ref = ISORef(entry.name, None)
ref.extents.append((entry.load_rba, img_size))
self.refs.append(ref)
def entries(self):
for ref in self.refs:
if not ref.is_dir:
yield ref
def extract(self, ref: ISORef) -> bytearray:
if ref._inline is not None:
return bytearray(ref._inline)
if not ref.extents:
return bytearray()
result = bytearray()
for block, size in ref.extents:
start = block * self._block_size
end = start + size
if end > len(self._data):
end = len(self._data)
if start >= len(self._data):
result.extend(itertools.repeat(0, size))
else:
result.extend(self._data[start:end])
return result
Classes
class DirRecord-
Expand source code Browse git
class DirRecord: __slots__ = ( 'extent_location', 'data_length', 'datetime', 'file_flags', 'file_id', 'system_use', ) def __init__(self): self.extent_location: int = 0 self.data_length: int = 0 self.datetime: datetime | None = None self.file_flags: int = 0 self.file_id: bytes | memoryview = b'' self.system_use: bytes | memoryview = b'' @property def is_dir(self) -> bool: return bool(self.file_flags & FILE_FLAG_DIR) @property def is_non_final_extent(self) -> bool: return bool(self.file_flags & FILE_FLAG_NON_FINAL_EXTENT) @property def is_system_item(self) -> bool: return self.file_id == b'\x00' or self.file_id == b'\x01'Instance variables
var is_dir-
Expand source code Browse git
@property def is_dir(self) -> bool: return bool(self.file_flags & FILE_FLAG_DIR) var is_non_final_extent-
Expand source code Browse git
@property def is_non_final_extent(self) -> bool: return bool(self.file_flags & FILE_FLAG_NON_FINAL_EXTENT) var is_system_item-
Expand source code Browse git
@property def is_system_item(self) -> bool: return self.file_id == b'\x00' or self.file_id == b'\x01' var data_length-
Expand source code Browse git
class DirRecord: __slots__ = ( 'extent_location', 'data_length', 'datetime', 'file_flags', 'file_id', 'system_use', ) def __init__(self): self.extent_location: int = 0 self.data_length: int = 0 self.datetime: datetime | None = None self.file_flags: int = 0 self.file_id: bytes | memoryview = b'' self.system_use: bytes | memoryview = b'' @property def is_dir(self) -> bool: return bool(self.file_flags & FILE_FLAG_DIR) @property def is_non_final_extent(self) -> bool: return bool(self.file_flags & FILE_FLAG_NON_FINAL_EXTENT) @property def is_system_item(self) -> bool: return self.file_id == b'\x00' or self.file_id == b'\x01' var datetime-
Expand source code Browse git
class DirRecord: __slots__ = ( 'extent_location', 'data_length', 'datetime', 'file_flags', 'file_id', 'system_use', ) def __init__(self): self.extent_location: int = 0 self.data_length: int = 0 self.datetime: datetime | None = None self.file_flags: int = 0 self.file_id: bytes | memoryview = b'' self.system_use: bytes | memoryview = b'' @property def is_dir(self) -> bool: return bool(self.file_flags & FILE_FLAG_DIR) @property def is_non_final_extent(self) -> bool: return bool(self.file_flags & FILE_FLAG_NON_FINAL_EXTENT) @property def is_system_item(self) -> bool: return self.file_id == b'\x00' or self.file_id == b'\x01' var extent_location-
Expand source code Browse git
class DirRecord: __slots__ = ( 'extent_location', 'data_length', 'datetime', 'file_flags', 'file_id', 'system_use', ) def __init__(self): self.extent_location: int = 0 self.data_length: int = 0 self.datetime: datetime | None = None self.file_flags: int = 0 self.file_id: bytes | memoryview = b'' self.system_use: bytes | memoryview = b'' @property def is_dir(self) -> bool: return bool(self.file_flags & FILE_FLAG_DIR) @property def is_non_final_extent(self) -> bool: return bool(self.file_flags & FILE_FLAG_NON_FINAL_EXTENT) @property def is_system_item(self) -> bool: return self.file_id == b'\x00' or self.file_id == b'\x01' var file_flags-
Expand source code Browse git
class DirRecord: __slots__ = ( 'extent_location', 'data_length', 'datetime', 'file_flags', 'file_id', 'system_use', ) def __init__(self): self.extent_location: int = 0 self.data_length: int = 0 self.datetime: datetime | None = None self.file_flags: int = 0 self.file_id: bytes | memoryview = b'' self.system_use: bytes | memoryview = b'' @property def is_dir(self) -> bool: return bool(self.file_flags & FILE_FLAG_DIR) @property def is_non_final_extent(self) -> bool: return bool(self.file_flags & FILE_FLAG_NON_FINAL_EXTENT) @property def is_system_item(self) -> bool: return self.file_id == b'\x00' or self.file_id == b'\x01' var file_id-
Expand source code Browse git
class DirRecord: __slots__ = ( 'extent_location', 'data_length', 'datetime', 'file_flags', 'file_id', 'system_use', ) def __init__(self): self.extent_location: int = 0 self.data_length: int = 0 self.datetime: datetime | None = None self.file_flags: int = 0 self.file_id: bytes | memoryview = b'' self.system_use: bytes | memoryview = b'' @property def is_dir(self) -> bool: return bool(self.file_flags & FILE_FLAG_DIR) @property def is_non_final_extent(self) -> bool: return bool(self.file_flags & FILE_FLAG_NON_FINAL_EXTENT) @property def is_system_item(self) -> bool: return self.file_id == b'\x00' or self.file_id == b'\x01' var system_use-
Expand source code Browse git
class DirRecord: __slots__ = ( 'extent_location', 'data_length', 'datetime', 'file_flags', 'file_id', 'system_use', ) def __init__(self): self.extent_location: int = 0 self.data_length: int = 0 self.datetime: datetime | None = None self.file_flags: int = 0 self.file_id: bytes | memoryview = b'' self.system_use: bytes | memoryview = b'' @property def is_dir(self) -> bool: return bool(self.file_flags & FILE_FLAG_DIR) @property def is_non_final_extent(self) -> bool: return bool(self.file_flags & FILE_FLAG_NON_FINAL_EXTENT) @property def is_system_item(self) -> bool: return self.file_id == b'\x00' or self.file_id == b'\x01'
class VolumeDescriptor-
Expand source code Browse git
class VolumeDescriptor: __slots__ = ( 'vd_type', 'system_id', 'volume_id', 'volume_space_size', 'volume_set_size', 'volume_sequence_number', 'logical_block_size', 'path_table_size', 'root_dir_record', 'escape_sequence', 'is_supplementary', ) def __init__(self): self.vd_type: int = 0 self.system_id: bytes | memoryview = b'' self.volume_id: bytes | memoryview = b'' self.volume_space_size: int = 0 self.volume_set_size: int = 0 self.volume_sequence_number: int = 0 self.logical_block_size: int = 0 self.path_table_size: int = 0 self.root_dir_record: DirRecord | None = None self.escape_sequence: bytes | memoryview = b'' self.is_supplementary: bool = False @property def is_joliet(self) -> bool: es = self.escape_sequence[:3] return es == b'%/@' or es == b'%/C' or es == b'%/E'Instance variables
var is_joliet-
Expand source code Browse git
@property def is_joliet(self) -> bool: es = self.escape_sequence[:3] return es == b'%/@' or es == b'%/C' or es == b'%/E' var escape_sequence-
Expand source code Browse git
class VolumeDescriptor: __slots__ = ( 'vd_type', 'system_id', 'volume_id', 'volume_space_size', 'volume_set_size', 'volume_sequence_number', 'logical_block_size', 'path_table_size', 'root_dir_record', 'escape_sequence', 'is_supplementary', ) def __init__(self): self.vd_type: int = 0 self.system_id: bytes | memoryview = b'' self.volume_id: bytes | memoryview = b'' self.volume_space_size: int = 0 self.volume_set_size: int = 0 self.volume_sequence_number: int = 0 self.logical_block_size: int = 0 self.path_table_size: int = 0 self.root_dir_record: DirRecord | None = None self.escape_sequence: bytes | memoryview = b'' self.is_supplementary: bool = False @property def is_joliet(self) -> bool: es = self.escape_sequence[:3] return es == b'%/@' or es == b'%/C' or es == b'%/E' var is_supplementary-
Expand source code Browse git
class VolumeDescriptor: __slots__ = ( 'vd_type', 'system_id', 'volume_id', 'volume_space_size', 'volume_set_size', 'volume_sequence_number', 'logical_block_size', 'path_table_size', 'root_dir_record', 'escape_sequence', 'is_supplementary', ) def __init__(self): self.vd_type: int = 0 self.system_id: bytes | memoryview = b'' self.volume_id: bytes | memoryview = b'' self.volume_space_size: int = 0 self.volume_set_size: int = 0 self.volume_sequence_number: int = 0 self.logical_block_size: int = 0 self.path_table_size: int = 0 self.root_dir_record: DirRecord | None = None self.escape_sequence: bytes | memoryview = b'' self.is_supplementary: bool = False @property def is_joliet(self) -> bool: es = self.escape_sequence[:3] return es == b'%/@' or es == b'%/C' or es == b'%/E' var logical_block_size-
Expand source code Browse git
class VolumeDescriptor: __slots__ = ( 'vd_type', 'system_id', 'volume_id', 'volume_space_size', 'volume_set_size', 'volume_sequence_number', 'logical_block_size', 'path_table_size', 'root_dir_record', 'escape_sequence', 'is_supplementary', ) def __init__(self): self.vd_type: int = 0 self.system_id: bytes | memoryview = b'' self.volume_id: bytes | memoryview = b'' self.volume_space_size: int = 0 self.volume_set_size: int = 0 self.volume_sequence_number: int = 0 self.logical_block_size: int = 0 self.path_table_size: int = 0 self.root_dir_record: DirRecord | None = None self.escape_sequence: bytes | memoryview = b'' self.is_supplementary: bool = False @property def is_joliet(self) -> bool: es = self.escape_sequence[:3] return es == b'%/@' or es == b'%/C' or es == b'%/E' var path_table_size-
Expand source code Browse git
class VolumeDescriptor: __slots__ = ( 'vd_type', 'system_id', 'volume_id', 'volume_space_size', 'volume_set_size', 'volume_sequence_number', 'logical_block_size', 'path_table_size', 'root_dir_record', 'escape_sequence', 'is_supplementary', ) def __init__(self): self.vd_type: int = 0 self.system_id: bytes | memoryview = b'' self.volume_id: bytes | memoryview = b'' self.volume_space_size: int = 0 self.volume_set_size: int = 0 self.volume_sequence_number: int = 0 self.logical_block_size: int = 0 self.path_table_size: int = 0 self.root_dir_record: DirRecord | None = None self.escape_sequence: bytes | memoryview = b'' self.is_supplementary: bool = False @property def is_joliet(self) -> bool: es = self.escape_sequence[:3] return es == b'%/@' or es == b'%/C' or es == b'%/E' var root_dir_record-
Expand source code Browse git
class VolumeDescriptor: __slots__ = ( 'vd_type', 'system_id', 'volume_id', 'volume_space_size', 'volume_set_size', 'volume_sequence_number', 'logical_block_size', 'path_table_size', 'root_dir_record', 'escape_sequence', 'is_supplementary', ) def __init__(self): self.vd_type: int = 0 self.system_id: bytes | memoryview = b'' self.volume_id: bytes | memoryview = b'' self.volume_space_size: int = 0 self.volume_set_size: int = 0 self.volume_sequence_number: int = 0 self.logical_block_size: int = 0 self.path_table_size: int = 0 self.root_dir_record: DirRecord | None = None self.escape_sequence: bytes | memoryview = b'' self.is_supplementary: bool = False @property def is_joliet(self) -> bool: es = self.escape_sequence[:3] return es == b'%/@' or es == b'%/C' or es == b'%/E' var system_id-
Expand source code Browse git
class VolumeDescriptor: __slots__ = ( 'vd_type', 'system_id', 'volume_id', 'volume_space_size', 'volume_set_size', 'volume_sequence_number', 'logical_block_size', 'path_table_size', 'root_dir_record', 'escape_sequence', 'is_supplementary', ) def __init__(self): self.vd_type: int = 0 self.system_id: bytes | memoryview = b'' self.volume_id: bytes | memoryview = b'' self.volume_space_size: int = 0 self.volume_set_size: int = 0 self.volume_sequence_number: int = 0 self.logical_block_size: int = 0 self.path_table_size: int = 0 self.root_dir_record: DirRecord | None = None self.escape_sequence: bytes | memoryview = b'' self.is_supplementary: bool = False @property def is_joliet(self) -> bool: es = self.escape_sequence[:3] return es == b'%/@' or es == b'%/C' or es == b'%/E' var vd_type-
Expand source code Browse git
class VolumeDescriptor: __slots__ = ( 'vd_type', 'system_id', 'volume_id', 'volume_space_size', 'volume_set_size', 'volume_sequence_number', 'logical_block_size', 'path_table_size', 'root_dir_record', 'escape_sequence', 'is_supplementary', ) def __init__(self): self.vd_type: int = 0 self.system_id: bytes | memoryview = b'' self.volume_id: bytes | memoryview = b'' self.volume_space_size: int = 0 self.volume_set_size: int = 0 self.volume_sequence_number: int = 0 self.logical_block_size: int = 0 self.path_table_size: int = 0 self.root_dir_record: DirRecord | None = None self.escape_sequence: bytes | memoryview = b'' self.is_supplementary: bool = False @property def is_joliet(self) -> bool: es = self.escape_sequence[:3] return es == b'%/@' or es == b'%/C' or es == b'%/E' var volume_id-
Expand source code Browse git
class VolumeDescriptor: __slots__ = ( 'vd_type', 'system_id', 'volume_id', 'volume_space_size', 'volume_set_size', 'volume_sequence_number', 'logical_block_size', 'path_table_size', 'root_dir_record', 'escape_sequence', 'is_supplementary', ) def __init__(self): self.vd_type: int = 0 self.system_id: bytes | memoryview = b'' self.volume_id: bytes | memoryview = b'' self.volume_space_size: int = 0 self.volume_set_size: int = 0 self.volume_sequence_number: int = 0 self.logical_block_size: int = 0 self.path_table_size: int = 0 self.root_dir_record: DirRecord | None = None self.escape_sequence: bytes | memoryview = b'' self.is_supplementary: bool = False @property def is_joliet(self) -> bool: es = self.escape_sequence[:3] return es == b'%/@' or es == b'%/C' or es == b'%/E' var volume_sequence_number-
Expand source code Browse git
class VolumeDescriptor: __slots__ = ( 'vd_type', 'system_id', 'volume_id', 'volume_space_size', 'volume_set_size', 'volume_sequence_number', 'logical_block_size', 'path_table_size', 'root_dir_record', 'escape_sequence', 'is_supplementary', ) def __init__(self): self.vd_type: int = 0 self.system_id: bytes | memoryview = b'' self.volume_id: bytes | memoryview = b'' self.volume_space_size: int = 0 self.volume_set_size: int = 0 self.volume_sequence_number: int = 0 self.logical_block_size: int = 0 self.path_table_size: int = 0 self.root_dir_record: DirRecord | None = None self.escape_sequence: bytes | memoryview = b'' self.is_supplementary: bool = False @property def is_joliet(self) -> bool: es = self.escape_sequence[:3] return es == b'%/@' or es == b'%/C' or es == b'%/E' var volume_set_size-
Expand source code Browse git
class VolumeDescriptor: __slots__ = ( 'vd_type', 'system_id', 'volume_id', 'volume_space_size', 'volume_set_size', 'volume_sequence_number', 'logical_block_size', 'path_table_size', 'root_dir_record', 'escape_sequence', 'is_supplementary', ) def __init__(self): self.vd_type: int = 0 self.system_id: bytes | memoryview = b'' self.volume_id: bytes | memoryview = b'' self.volume_space_size: int = 0 self.volume_set_size: int = 0 self.volume_sequence_number: int = 0 self.logical_block_size: int = 0 self.path_table_size: int = 0 self.root_dir_record: DirRecord | None = None self.escape_sequence: bytes | memoryview = b'' self.is_supplementary: bool = False @property def is_joliet(self) -> bool: es = self.escape_sequence[:3] return es == b'%/@' or es == b'%/C' or es == b'%/E' var volume_space_size-
Expand source code Browse git
class VolumeDescriptor: __slots__ = ( 'vd_type', 'system_id', 'volume_id', 'volume_space_size', 'volume_set_size', 'volume_sequence_number', 'logical_block_size', 'path_table_size', 'root_dir_record', 'escape_sequence', 'is_supplementary', ) def __init__(self): self.vd_type: int = 0 self.system_id: bytes | memoryview = b'' self.volume_id: bytes | memoryview = b'' self.volume_space_size: int = 0 self.volume_set_size: int = 0 self.volume_sequence_number: int = 0 self.logical_block_size: int = 0 self.path_table_size: int = 0 self.root_dir_record: DirRecord | None = None self.escape_sequence: bytes | memoryview = b'' self.is_supplementary: bool = False @property def is_joliet(self) -> bool: es = self.escape_sequence[:3] return es == b'%/@' or es == b'%/C' or es == b'%/E'
class ISORef (path, date, is_dir=False)-
Expand source code Browse git
class ISORef: __slots__ = ('path', 'date', 'extents', 'is_dir', '_inline') def __init__(self, path: str, date: datetime | None, is_dir: bool = False): self.path = path self.date = date self.extents: list[tuple[int, int]] = [] self.is_dir = is_dir self._inline: bytes | None = None @property def total_size(self) -> int: if self._inline is not None: return len(self._inline) return sum(s for _, s in self.extents)Instance variables
var total_size-
Expand source code Browse git
@property def total_size(self) -> int: if self._inline is not None: return len(self._inline) return sum(s for _, s in self.extents) var date-
Expand source code Browse git
class ISORef: __slots__ = ('path', 'date', 'extents', 'is_dir', '_inline') def __init__(self, path: str, date: datetime | None, is_dir: bool = False): self.path = path self.date = date self.extents: list[tuple[int, int]] = [] self.is_dir = is_dir self._inline: bytes | None = None @property def total_size(self) -> int: if self._inline is not None: return len(self._inline) return sum(s for _, s in self.extents) var extents-
Expand source code Browse git
class ISORef: __slots__ = ('path', 'date', 'extents', 'is_dir', '_inline') def __init__(self, path: str, date: datetime | None, is_dir: bool = False): self.path = path self.date = date self.extents: list[tuple[int, int]] = [] self.is_dir = is_dir self._inline: bytes | None = None @property def total_size(self) -> int: if self._inline is not None: return len(self._inline) return sum(s for _, s in self.extents) var is_dir-
Expand source code Browse git
class ISORef: __slots__ = ('path', 'date', 'extents', 'is_dir', '_inline') def __init__(self, path: str, date: datetime | None, is_dir: bool = False): self.path = path self.date = date self.extents: list[tuple[int, int]] = [] self.is_dir = is_dir self._inline: bytes | None = None @property def total_size(self) -> int: if self._inline is not None: return len(self._inline) return sum(s for _, s in self.extents) var path-
Expand source code Browse git
class ISORef: __slots__ = ('path', 'date', 'extents', 'is_dir', '_inline') def __init__(self, path: str, date: datetime | None, is_dir: bool = False): self.path = path self.date = date self.extents: list[tuple[int, int]] = [] self.is_dir = is_dir self._inline: bytes | None = None @property def total_size(self) -> int: if self._inline is not None: return len(self._inline) return sum(s for _, s in self.extents)
class ISO9660Archive-
Expand source code Browse git
class ISO9660Archive: def __init__(self): self.refs: list[ISORef] = [] self._data: memoryview = memoryview(b'') self._block_size: int = BLOCK_SIZE self.filesystem_type: FileSystemType = FileSystemType.ISO self._has_rr: bool = False self._has_joliet: bool = False self._primary_vd: VolumeDescriptor | None = None self._joliet_vd: VolumeDescriptor | None = None self._boot_record_found: bool = False self._boot_catalog_location: int = 0 def open(self, data: bytes | bytearray | memoryview) -> None: self._data = memoryview(data) volume_descriptors: list[VolumeDescriptor] = [] pos = START_POS while pos + BLOCK_SIZE <= len(self._data): block = self._data[pos:pos + BLOCK_SIZE] if len(block) < 7: break vd_type = block[0] magic = block[1:6] if magic != b'CD001': break if vd_type == VD_TYPE_TERMINATOR: pos += BLOCK_SIZE break elif vd_type == VD_TYPE_BOOT: self._parse_boot_record(block) pos += BLOCK_SIZE continue elif vd_type == VD_TYPE_PRIMARY or vd_type == VD_TYPE_SUPPLEMENTARY: vd = self._parse_volume_descriptor(block, vd_type) if vd is not None: volume_descriptors.append(vd) pos += BLOCK_SIZE primary: VolumeDescriptor | None = None joliet: VolumeDescriptor | None = None for vd in volume_descriptors: if vd.is_supplementary and vd.is_joliet: joliet = vd elif not vd.is_supplementary: if primary is None: primary = vd else: primary = vd self._primary_vd = primary self._joliet_vd = joliet self._has_joliet = joliet is not None if joliet: self._read_directory_tree(joliet, is_joliet=True) elif primary: self._read_directory_tree(primary, is_joliet=False) if self._boot_record_found: self._extract_boot_images() def _parse_boot_record(self, block: bytes | memoryview) -> None: boot_system_id = bytes(block[7:39]) if boot_system_id.rstrip(b'\x00').rstrip(b'\x20') == b'EL TORITO SPECIFICATION': self._boot_record_found = True self._boot_catalog_location = struct.unpack_from('<I', block, 71)[0] def _parse_volume_descriptor( self, block: bytes | memoryview, vd_type: int ) -> VolumeDescriptor | None: vd = VolumeDescriptor() vd.vd_type = vd_type vd.is_supplementary = (vd_type == VD_TYPE_SUPPLEMENTARY) vd.system_id = block[8:40] vd.volume_id = block[40:72] vd.volume_space_size = _read_both_endian_u32(block, 80) vd.escape_sequence = block[88:120] if not vd.is_supplementary: vd.escape_sequence = b'' vd.volume_set_size = _read_both_endian_u16(block, 120) vd.volume_sequence_number = _read_both_endian_u16(block, 124) vd.logical_block_size = _read_both_endian_u16(block, 128) vd.path_table_size = _read_both_endian_u32(block, 132) root_record_data = block[156:190] if len(root_record_data) >= 34: vd.root_dir_record = self._parse_dir_record(root_record_data) return vd def _parse_dir_record(self, data: bytes | memoryview) -> DirRecord | None: if len(data) < 34: return None rec = DirRecord() record_len = data[0] if record_len < 34: record_len = 34 rec.extent_location = _read_both_endian_u32(data, 2) rec.data_length = _read_both_endian_u32(data, 10) rec.datetime = _parse_recording_datetime(data[18:25]) rec.file_flags = data[25] file_id_len = data[32] if 33 + file_id_len > len(data): file_id_len = len(data) - 33 rec.file_id = data[33:33 + file_id_len] su_start = 33 + file_id_len if file_id_len % 2 == 0: su_start += 1 if su_start < len(data): rec.system_use = data[su_start:min(len(data), record_len)] return rec def _read_directory_tree( self, vd: VolumeDescriptor, is_joliet: bool ) -> None: root = vd.root_dir_record if root is None: return susp_offset = -1 has_rr = False if not is_joliet: actual_root = self._read_root_dot_entry(root) susp_root = actual_root or root susp_offset = _check_susp(susp_root) if susp_offset >= 0: has_rr = _has_rr(susp_root.system_use, susp_offset) self._has_rr = has_rr if is_joliet: self.filesystem_type = FileSystemType.JOLIET elif has_rr: self.filesystem_type = FileSystemType.RR else: self.filesystem_type = FileSystemType.ISO self.refs.clear() visited: set[int] = set() self._walk_directory( root.extent_location, root.data_length, '', is_joliet, has_rr, susp_offset, visited, 0, ) def _walk_directory( self, extent_loc: int, extent_size: int, parent_path: str, is_joliet: bool, has_rr: bool, susp_offset: int, visited: set[int], depth: int, ) -> None: if depth > MAX_DIR_DEPTH: return if extent_loc in visited: return visited.add(extent_loc) dir_pos = extent_loc * self._block_size dir_end = dir_pos + extent_size if dir_pos >= len(self._data) or dir_end > len(self._data): dir_end = min(dir_end, len(self._data)) pos = dir_pos record_count = 0 pending_ref: ISORef | None = None while pos < dir_end and record_count < MAX_DIR_RECORDS: if pos >= len(self._data): break record_len = self._data[pos] if record_len == 0: next_block = ((pos - dir_pos) // self._block_size + 1) * self._block_size + dir_pos if next_block >= dir_end: break pos = next_block continue if record_len < 34: pos += 1 continue end = pos + record_len if end > len(self._data): break record_data = self._data[pos:end] rec = self._parse_dir_record(record_data) pos = end record_count += 1 if rec is None: continue if rec.is_system_item: continue name = self._decode_name(rec, is_joliet, has_rr, susp_offset) if not name: continue full_path = F'{parent_path}/{name}' if parent_path else name if rec.is_non_final_extent and not rec.is_dir: if pending_ref is None or pending_ref.path != full_path: pending_ref = ISORef(full_path, rec.datetime) self.refs.append(pending_ref) pending_ref.extents.append((rec.extent_location, rec.data_length)) continue elif pending_ref is not None and pending_ref.path == full_path: pending_ref.extents.append((rec.extent_location, rec.data_length)) pending_ref = None continue else: pending_ref = None if rec.is_dir: ref = ISORef(full_path, rec.datetime, is_dir=True) self.refs.append(ref) self._walk_directory( rec.extent_location, rec.data_length, full_path, is_joliet, has_rr, susp_offset, visited, depth + 1, ) else: ref = ISORef(full_path, rec.datetime) ref.extents.append((rec.extent_location, rec.data_length)) self.refs.append(ref) def _decode_name( self, rec: DirRecord, is_joliet: bool, has_rr: bool, susp_offset: int, ) -> str: if has_rr and susp_offset >= 0: rr_name = _get_name_rr(rec.system_use, susp_offset, self._data) if rr_name: return rr_name if is_joliet: try: name = bytes(rec.file_id).decode('utf-16-be') except (UnicodeDecodeError, ValueError): name = bytes(rec.file_id).decode('latin-1') else: name = bytes(rec.file_id).decode('latin-1') name = self._strip_revision(name) if name.endswith('.'): name = name[:-1] return name @staticmethod def _strip_revision(name: str) -> str: base, sep, revision = name.partition(';') if sep and revision.isdigit(): return base return name def select_filesystem(self, fs: FileSystemType) -> None: if fs is self.filesystem_type: return if fs is FileSystemType.JOLIET and self._joliet_vd: self._read_directory_tree(self._joliet_vd, is_joliet=True) elif fs is FileSystemType.RR and self._primary_vd: self._read_directory_tree(self._primary_vd, is_joliet=False) if not self._has_rr: self.filesystem_type = FileSystemType.ISO elif fs is FileSystemType.ISO and self._primary_vd: self._has_rr = False self._read_directory_tree_plain(self._primary_vd) def _read_directory_tree_plain(self, vd: VolumeDescriptor) -> None: root = vd.root_dir_record if root is None: return self.filesystem_type = FileSystemType.ISO self.refs.clear() visited: set[int] = set() self._walk_directory( root.extent_location, root.data_length, '', False, False, -1, visited, 0, ) def _read_root_dot_entry(self, root: DirRecord) -> DirRecord | None: """Read the actual '.' entry from the root directory data on disk. The root directory record stored in the Volume Descriptor is a fixed 34-byte copy that lacks the system use area. The real first entry in the root directory sector may be much larger and contain SUSP/RR data. """ dir_pos = root.extent_location * self._block_size if dir_pos >= len(self._data): return None rec_len = self._data[dir_pos] if rec_len < 34: return None end = dir_pos + rec_len if end > len(self._data): return None return self._parse_dir_record(self._data[dir_pos:end]) def _extract_boot_images(self) -> None: from refinery.lib.iso.eltorito import BootCatalog catalog_pos = self._boot_catalog_location * self._block_size if catalog_pos + 2048 > len(self._data): return catalog_data = bytes(self._data[catalog_pos:catalog_pos + 2048]) catalog = BootCatalog.parse(catalog_data) if catalog is None: return existing_extents: set[int] = set() for ref in self.refs: for block, _ in ref.extents: existing_extents.add(block) for entry in catalog.entries: if not entry.is_bootable: continue if entry.load_rba in existing_extents: continue img_pos = entry.load_rba * self._block_size img_size = entry.image_size if img_pos >= len(self._data): continue if img_pos + img_size > len(self._data): img_size = len(self._data) - img_pos if img_size <= 0: continue fat_files = _parse_fat_image(self._data[img_pos:img_pos + img_size]) if fat_files: for path, date, file_data in fat_files: ref = ISORef(path, date) ref._inline = file_data self.refs.append(ref) else: ref = ISORef(entry.name, None) ref.extents.append((entry.load_rba, img_size)) self.refs.append(ref) def entries(self): for ref in self.refs: if not ref.is_dir: yield ref def extract(self, ref: ISORef) -> bytearray: if ref._inline is not None: return bytearray(ref._inline) if not ref.extents: return bytearray() result = bytearray() for block, size in ref.extents: start = block * self._block_size end = start + size if end > len(self._data): end = len(self._data) if start >= len(self._data): result.extend(itertools.repeat(0, size)) else: result.extend(self._data[start:end]) return resultMethods
def open(self, data)-
Expand source code Browse git
def open(self, data: bytes | bytearray | memoryview) -> None: self._data = memoryview(data) volume_descriptors: list[VolumeDescriptor] = [] pos = START_POS while pos + BLOCK_SIZE <= len(self._data): block = self._data[pos:pos + BLOCK_SIZE] if len(block) < 7: break vd_type = block[0] magic = block[1:6] if magic != b'CD001': break if vd_type == VD_TYPE_TERMINATOR: pos += BLOCK_SIZE break elif vd_type == VD_TYPE_BOOT: self._parse_boot_record(block) pos += BLOCK_SIZE continue elif vd_type == VD_TYPE_PRIMARY or vd_type == VD_TYPE_SUPPLEMENTARY: vd = self._parse_volume_descriptor(block, vd_type) if vd is not None: volume_descriptors.append(vd) pos += BLOCK_SIZE primary: VolumeDescriptor | None = None joliet: VolumeDescriptor | None = None for vd in volume_descriptors: if vd.is_supplementary and vd.is_joliet: joliet = vd elif not vd.is_supplementary: if primary is None: primary = vd else: primary = vd self._primary_vd = primary self._joliet_vd = joliet self._has_joliet = joliet is not None if joliet: self._read_directory_tree(joliet, is_joliet=True) elif primary: self._read_directory_tree(primary, is_joliet=False) if self._boot_record_found: self._extract_boot_images() def select_filesystem(self, fs)-
Expand source code Browse git
def select_filesystem(self, fs: FileSystemType) -> None: if fs is self.filesystem_type: return if fs is FileSystemType.JOLIET and self._joliet_vd: self._read_directory_tree(self._joliet_vd, is_joliet=True) elif fs is FileSystemType.RR and self._primary_vd: self._read_directory_tree(self._primary_vd, is_joliet=False) if not self._has_rr: self.filesystem_type = FileSystemType.ISO elif fs is FileSystemType.ISO and self._primary_vd: self._has_rr = False self._read_directory_tree_plain(self._primary_vd) def entries(self)-
Expand source code Browse git
def entries(self): for ref in self.refs: if not ref.is_dir: yield ref def extract(self, ref)-
Expand source code Browse git
def extract(self, ref: ISORef) -> bytearray: if ref._inline is not None: return bytearray(ref._inline) if not ref.extents: return bytearray() result = bytearray() for block, size in ref.extents: start = block * self._block_size end = start + size if end > len(self._data): end = len(self._data) if start >= len(self._data): result.extend(itertools.repeat(0, size)) else: result.extend(self._data[start:end]) return result