Module refinery.units.formats.archive.xtiso
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
from refinery.lib.structures import MemoryFile
from refinery.units.formats.archive import Arg, ArchiveUnit
_ISO_FILE_SYSTEMS = ['udf', 'joliet', 'rr', 'iso', 'auto']
class xtiso(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'):
"""
Extract files from a ISO archive.
"""
def __init__(
self,
*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False,
path=b'path', date=b'date',
fs: Arg.Choice('-s', metavar='TYPE', choices=_ISO_FILE_SYSTEMS, help=(
'Specify a file system ({choices}) extension to use. The default setting {default} will automatically '
'detect the first of the other available options and use it.')) = 'auto'
):
if fs not in _ISO_FILE_SYSTEMS:
raise ValueError(F'invalid file system {fs}: must be udf, joliet, rr, iso, or auto.')
super().__init__(
*paths,
list=list,
join_path=join_path,
drop_path=drop_path,
fuzzy=fuzzy,
exact=exact,
regex=regex,
path=path,
date=date,
fs=fs
)
@ArchiveUnit.Requires('pycdlib', 'arc', 'default', 'extended')
def _pycdlib():
import pycdlib
import pycdlib.dates
def fixed_parse(self, datestr):
datestr = datestr[:-3] + b'00\0'
return original_parse(self, datestr)
original_parse = pycdlib.dates.VolumeDescriptorDate.parse
pycdlib.dates.VolumeDescriptorDate.parse = fixed_parse
return pycdlib
@staticmethod
def _strip_revision(name: str):
base, split, revision = name.partition(';')
return base if split and revision.isdigit() else name
def unpack(self, data):
if not self.handles(data):
self.log_warn('The data does not look like an ISO file.')
with MemoryFile(data, read_as_bytes=True) as stream:
iso = self._pycdlib.PyCdlib()
iso.open_fp(stream)
fs = self.args.fs
if fs != 'auto':
mkfacade = {
'iso' : iso.get_iso9660_facade,
'udf' : iso.get_udf_facade,
'joliet' : iso.get_joliet_facade,
'rr' : iso.get_rock_ridge_facade,
}
facade = mkfacade[fs]()
elif iso.has_udf():
self.log_info('using format: udf')
facade = iso.get_udf_facade()
elif iso.has_joliet():
self.log_info('using format: joliet')
facade = iso.get_joliet_facade()
elif iso.has_rock_ridge():
self.log_info('using format: rr')
facade = iso.get_rock_ridge_facade()
else:
self.log_info('using format: iso')
facade = iso.get_iso9660_facade()
for root, _, files in facade.walk('/'):
root = root.rstrip('/')
for name in files:
name = name.lstrip('/')
path = F'{root}/{name}'
try:
info = facade.get_record(path)
date = info.date
except Exception:
info = None
date = None
else:
date = datetime.datetime(
date.years_since_1900 + 1900,
date.month,
date.day_of_month,
date.hour,
date.minute,
date.second,
tzinfo=datetime.timezone(datetime.timedelta(minutes=15 * date.gmtoffset))
)
def extract(info=info, path=path):
if info:
buffer = MemoryFile(bytearray(info.data_length))
else:
buffer = MemoryFile(bytearray())
facade.get_file_from_iso_fp(buffer, path)
return buffer.getvalue()
yield self._pack(self._strip_revision(path), date, extract)
@classmethod
def handles(cls, data: bytearray) -> bool:
return any(data[k] == B'CD001' for k in (
slice(0x8001, 0x8006),
slice(0x8801, 0x8806),
slice(0x9001, 0x9006),
))
Classes
class xtiso (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', fs='auto')
-
Extract files from a ISO archive. This unit is a path extractor which extracts data from a hierarchical structure. Each extracted item is emitted as a separate chunk and has attached to it a meta variable that contains its path within the source structure. The positional arguments to the command are patterns that can be used to filter the extracted items by their path. To view only the paths of all chunks, use the listing switch:
emit something | xtiso --list
Otherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:
emit something | xtiso [| dump {path} ]
Expand source code Browse git
class xtiso(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'): """ Extract files from a ISO archive. """ def __init__( self, *paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', fs: Arg.Choice('-s', metavar='TYPE', choices=_ISO_FILE_SYSTEMS, help=( 'Specify a file system ({choices}) extension to use. The default setting {default} will automatically ' 'detect the first of the other available options and use it.')) = 'auto' ): if fs not in _ISO_FILE_SYSTEMS: raise ValueError(F'invalid file system {fs}: must be udf, joliet, rr, iso, or auto.') super().__init__( *paths, list=list, join_path=join_path, drop_path=drop_path, fuzzy=fuzzy, exact=exact, regex=regex, path=path, date=date, fs=fs ) @ArchiveUnit.Requires('pycdlib', 'arc', 'default', 'extended') def _pycdlib(): import pycdlib import pycdlib.dates def fixed_parse(self, datestr): datestr = datestr[:-3] + b'00\0' return original_parse(self, datestr) original_parse = pycdlib.dates.VolumeDescriptorDate.parse pycdlib.dates.VolumeDescriptorDate.parse = fixed_parse return pycdlib @staticmethod def _strip_revision(name: str): base, split, revision = name.partition(';') return base if split and revision.isdigit() else name def unpack(self, data): if not self.handles(data): self.log_warn('The data does not look like an ISO file.') with MemoryFile(data, read_as_bytes=True) as stream: iso = self._pycdlib.PyCdlib() iso.open_fp(stream) fs = self.args.fs if fs != 'auto': mkfacade = { 'iso' : iso.get_iso9660_facade, 'udf' : iso.get_udf_facade, 'joliet' : iso.get_joliet_facade, 'rr' : iso.get_rock_ridge_facade, } facade = mkfacade[fs]() elif iso.has_udf(): self.log_info('using format: udf') facade = iso.get_udf_facade() elif iso.has_joliet(): self.log_info('using format: joliet') facade = iso.get_joliet_facade() elif iso.has_rock_ridge(): self.log_info('using format: rr') facade = iso.get_rock_ridge_facade() else: self.log_info('using format: iso') facade = iso.get_iso9660_facade() for root, _, files in facade.walk('/'): root = root.rstrip('/') for name in files: name = name.lstrip('/') path = F'{root}/{name}' try: info = facade.get_record(path) date = info.date except Exception: info = None date = None else: date = datetime.datetime( date.years_since_1900 + 1900, date.month, date.day_of_month, date.hour, date.minute, date.second, tzinfo=datetime.timezone(datetime.timedelta(minutes=15 * date.gmtoffset)) ) def extract(info=info, path=path): if info: buffer = MemoryFile(bytearray(info.data_length)) else: buffer = MemoryFile(bytearray()) facade.get_file_from_iso_fp(buffer, path) return buffer.getvalue() yield self._pack(self._strip_revision(path), date, extract) @classmethod def handles(cls, data: bytearray) -> bool: return any(data[k] == B'CD001' for k in ( slice(0x8001, 0x8006), slice(0x8801, 0x8806), slice(0x9001, 0x9006), ))
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Methods
def unpack(self, data)
-
Expand source code Browse git
def unpack(self, data): if not self.handles(data): self.log_warn('The data does not look like an ISO file.') with MemoryFile(data, read_as_bytes=True) as stream: iso = self._pycdlib.PyCdlib() iso.open_fp(stream) fs = self.args.fs if fs != 'auto': mkfacade = { 'iso' : iso.get_iso9660_facade, 'udf' : iso.get_udf_facade, 'joliet' : iso.get_joliet_facade, 'rr' : iso.get_rock_ridge_facade, } facade = mkfacade[fs]() elif iso.has_udf(): self.log_info('using format: udf') facade = iso.get_udf_facade() elif iso.has_joliet(): self.log_info('using format: joliet') facade = iso.get_joliet_facade() elif iso.has_rock_ridge(): self.log_info('using format: rr') facade = iso.get_rock_ridge_facade() else: self.log_info('using format: iso') facade = iso.get_iso9660_facade() for root, _, files in facade.walk('/'): root = root.rstrip('/') for name in files: name = name.lstrip('/') path = F'{root}/{name}' try: info = facade.get_record(path) date = info.date except Exception: info = None date = None else: date = datetime.datetime( date.years_since_1900 + 1900, date.month, date.day_of_month, date.hour, date.minute, date.second, tzinfo=datetime.timezone(datetime.timedelta(minutes=15 * date.gmtoffset)) ) def extract(info=info, path=path): if info: buffer = MemoryFile(bytearray(info.data_length)) else: buffer = MemoryFile(bytearray()) facade.get_file_from_iso_fp(buffer, path) return buffer.getvalue() yield self._pack(self._strip_revision(path), date, extract)
Inherited members