Module refinery.units.formats.archive.xtiso

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime

from refinery.lib.structures import MemoryFile
from refinery.units.formats.archive import Arg, ArchiveUnit

_ISO_FILE_SYSTEMS = ['udf', 'joliet', 'rr', 'iso', 'auto']


class xtiso(ArchiveUnit):
    """
    Extract files from a ISO archive.
    """
    def __init__(
        self,
        *paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False,
        path=b'path', date=b'date',
        fs: Arg.Choice('-s', metavar='TYPE', choices=_ISO_FILE_SYSTEMS, help=(
            'Specify a file system ({choices}) extension to use. The default setting {default} will automatically '
            'detect the first of the other available options and use it.')) = 'auto'
    ):
        if fs not in _ISO_FILE_SYSTEMS:
            raise ValueError(F'invalid file system {fs}: must be udf, joliet, rr, iso, or auto.')
        super().__init__(
            *paths,
            list=list,
            join_path=join_path,
            drop_path=drop_path,
            fuzzy=fuzzy,
            exact=exact,
            regex=regex,
            path=path,
            date=date,
            fs=fs
        )

    @ArchiveUnit.Requires('pycdlib', 'arc', 'default', 'extended')
    def _pycdlib():
        import pycdlib
        import pycdlib.dates

        def fixed_parse(self, datestr):
            datestr = datestr[:-3] + b'00\0'
            return original_parse(self, datestr)

        original_parse = pycdlib.dates.VolumeDescriptorDate.parse
        pycdlib.dates.VolumeDescriptorDate.parse = fixed_parse
        return pycdlib

    @staticmethod
    def _strip_revision(name: str):
        base, split, revision = name.partition(';')
        return base if split and revision.isdigit() else name

    def unpack(self, data):
        if not self.handles(data):
            self.log_warn('The data does not look like an ISO file.')
        with MemoryFile(data, read_as_bytes=True) as stream:
            iso = self._pycdlib.PyCdlib()
            iso.open_fp(stream)
            fs = self.args.fs
            if fs != 'auto':
                mkfacade = {
                    'iso'    : iso.get_iso9660_facade,
                    'udf'    : iso.get_udf_facade,
                    'joliet' : iso.get_joliet_facade,
                    'rr'     : iso.get_rock_ridge_facade,
                }
                facade = mkfacade[fs]()
            elif iso.has_udf():
                self.log_info('using format: udf')
                facade = iso.get_udf_facade()
            elif iso.has_joliet():
                self.log_info('using format: joliet')
                facade = iso.get_joliet_facade()
            elif iso.has_rock_ridge():
                self.log_info('using format: rr')
                facade = iso.get_rock_ridge_facade()
            else:
                self.log_info('using format: iso')
                facade = iso.get_iso9660_facade()

            for root, _, files in facade.walk('/'):
                root = root.rstrip('/')
                for name in files:
                    name = name.lstrip('/')
                    path = F'{root}/{name}'
                    try:
                        info = facade.get_record(path)
                        date = info.date
                    except Exception:
                        info = None
                        date = None
                    else:
                        date = datetime.datetime(
                            date.years_since_1900 + 1900,
                            date.month,
                            date.day_of_month,
                            date.hour,
                            date.minute,
                            date.second,
                            tzinfo=datetime.timezone(datetime.timedelta(minutes=15 * date.gmtoffset))
                        )

                    def extract(info=info, path=path):
                        if info:
                            buffer = MemoryFile(bytearray(info.data_length))
                        else:
                            buffer = MemoryFile(bytearray())
                        facade.get_file_from_iso_fp(buffer, path)
                        return buffer.getvalue()

                    yield self._pack(self._strip_revision(path), date, extract)

    @classmethod
    def handles(cls, data: bytearray) -> bool:
        return any(data[k] == B'CD001' for k in (
            slice(0x8001, 0x8006),
            slice(0x8801, 0x8806),
            slice(0x9001, 0x9006),
        ))

Classes

class xtiso (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', fs='auto')

Extract files from a ISO archive.

Expand source code Browse git
class xtiso(ArchiveUnit):
    """
    Extract files from a ISO archive.
    """
    def __init__(
        self,
        *paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False,
        path=b'path', date=b'date',
        fs: Arg.Choice('-s', metavar='TYPE', choices=_ISO_FILE_SYSTEMS, help=(
            'Specify a file system ({choices}) extension to use. The default setting {default} will automatically '
            'detect the first of the other available options and use it.')) = 'auto'
    ):
        if fs not in _ISO_FILE_SYSTEMS:
            raise ValueError(F'invalid file system {fs}: must be udf, joliet, rr, iso, or auto.')
        super().__init__(
            *paths,
            list=list,
            join_path=join_path,
            drop_path=drop_path,
            fuzzy=fuzzy,
            exact=exact,
            regex=regex,
            path=path,
            date=date,
            fs=fs
        )

    @ArchiveUnit.Requires('pycdlib', 'arc', 'default', 'extended')
    def _pycdlib():
        import pycdlib
        import pycdlib.dates

        def fixed_parse(self, datestr):
            datestr = datestr[:-3] + b'00\0'
            return original_parse(self, datestr)

        original_parse = pycdlib.dates.VolumeDescriptorDate.parse
        pycdlib.dates.VolumeDescriptorDate.parse = fixed_parse
        return pycdlib

    @staticmethod
    def _strip_revision(name: str):
        base, split, revision = name.partition(';')
        return base if split and revision.isdigit() else name

    def unpack(self, data):
        if not self.handles(data):
            self.log_warn('The data does not look like an ISO file.')
        with MemoryFile(data, read_as_bytes=True) as stream:
            iso = self._pycdlib.PyCdlib()
            iso.open_fp(stream)
            fs = self.args.fs
            if fs != 'auto':
                mkfacade = {
                    'iso'    : iso.get_iso9660_facade,
                    'udf'    : iso.get_udf_facade,
                    'joliet' : iso.get_joliet_facade,
                    'rr'     : iso.get_rock_ridge_facade,
                }
                facade = mkfacade[fs]()
            elif iso.has_udf():
                self.log_info('using format: udf')
                facade = iso.get_udf_facade()
            elif iso.has_joliet():
                self.log_info('using format: joliet')
                facade = iso.get_joliet_facade()
            elif iso.has_rock_ridge():
                self.log_info('using format: rr')
                facade = iso.get_rock_ridge_facade()
            else:
                self.log_info('using format: iso')
                facade = iso.get_iso9660_facade()

            for root, _, files in facade.walk('/'):
                root = root.rstrip('/')
                for name in files:
                    name = name.lstrip('/')
                    path = F'{root}/{name}'
                    try:
                        info = facade.get_record(path)
                        date = info.date
                    except Exception:
                        info = None
                        date = None
                    else:
                        date = datetime.datetime(
                            date.years_since_1900 + 1900,
                            date.month,
                            date.day_of_month,
                            date.hour,
                            date.minute,
                            date.second,
                            tzinfo=datetime.timezone(datetime.timedelta(minutes=15 * date.gmtoffset))
                        )

                    def extract(info=info, path=path):
                        if info:
                            buffer = MemoryFile(bytearray(info.data_length))
                        else:
                            buffer = MemoryFile(bytearray())
                        facade.get_file_from_iso_fp(buffer, path)
                        return buffer.getvalue()

                    yield self._pack(self._strip_revision(path), date, extract)

    @classmethod
    def handles(cls, data: bytearray) -> bool:
        return any(data[k] == B'CD001' for k in (
            slice(0x8001, 0x8006),
            slice(0x8801, 0x8806),
            slice(0x9001, 0x9006),
        ))

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data):
    if not self.handles(data):
        self.log_warn('The data does not look like an ISO file.')
    with MemoryFile(data, read_as_bytes=True) as stream:
        iso = self._pycdlib.PyCdlib()
        iso.open_fp(stream)
        fs = self.args.fs
        if fs != 'auto':
            mkfacade = {
                'iso'    : iso.get_iso9660_facade,
                'udf'    : iso.get_udf_facade,
                'joliet' : iso.get_joliet_facade,
                'rr'     : iso.get_rock_ridge_facade,
            }
            facade = mkfacade[fs]()
        elif iso.has_udf():
            self.log_info('using format: udf')
            facade = iso.get_udf_facade()
        elif iso.has_joliet():
            self.log_info('using format: joliet')
            facade = iso.get_joliet_facade()
        elif iso.has_rock_ridge():
            self.log_info('using format: rr')
            facade = iso.get_rock_ridge_facade()
        else:
            self.log_info('using format: iso')
            facade = iso.get_iso9660_facade()

        for root, _, files in facade.walk('/'):
            root = root.rstrip('/')
            for name in files:
                name = name.lstrip('/')
                path = F'{root}/{name}'
                try:
                    info = facade.get_record(path)
                    date = info.date
                except Exception:
                    info = None
                    date = None
                else:
                    date = datetime.datetime(
                        date.years_since_1900 + 1900,
                        date.month,
                        date.day_of_month,
                        date.hour,
                        date.minute,
                        date.second,
                        tzinfo=datetime.timezone(datetime.timedelta(minutes=15 * date.gmtoffset))
                    )

                def extract(info=info, path=path):
                    if info:
                        buffer = MemoryFile(bytearray(info.data_length))
                    else:
                        buffer = MemoryFile(bytearray())
                    facade.get_file_from_iso_fp(buffer, path)
                    return buffer.getvalue()

                yield self._pack(self._strip_revision(path), date, extract)

Inherited members