Module refinery.units.formats.archive

Expand source code Browse git
from __future__ import annotations

from functools import wraps
from datetime import datetime

from refinery.units import Executable, Unit, Arg
from refinery.units.formats import PathExtractorUnit, UnpackResult
from refinery.lib.types import buf, Callable


class MultipleArchives(Exception):
    pass


class ArchiveExecutable(Executable):

    def __init__(cls, name, bases, nmspc, **kwargs):
        super(ArchiveExecutable, cls).__init__(name, bases, nmspc, **kwargs)

        carver = cls._carver()

        if carver is None:
            return

        if not issubclass(cls, PathExtractorUnit):
            raise TypeError

        unpack = cls.unpack

        @wraps(unpack)
        def __unpack(self, data: buf):
            carved = data | carver
            try:
                arc1 = next(carved)
            except StopIteration:
                arc1 = data
            try:
                arc2 = next(carved)
            except StopIteration:
                yield from unpack(self, arc1)
                return
            if not self.args.lenient:
                some = 2 + sum(1 for _ in carved)
                text = (
                    F'The input contains {some} archives. Use the {carver.name} unit to extract them individually '
                    R'or set the --lenient/-L option to fuse the archives.')
                raise MultipleArchives(text)
            else:
                archives = [arc1, arc2]
                archives.extend(carved)

            for k, data in enumerate(archives, 1):
                for result in unpack(self, data):
                    result.path = F'archive{k}/{result.path}'
                    yield result

        setattr(cls, 'unpack', __unpack)

    def _carver(cls) -> Unit | None:
        return None


class ArchiveUnit(PathExtractorUnit, metaclass=ArchiveExecutable, abstract=True):
    def __init__(
        self, *paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path',
        date: Arg('-D', metavar='NAME',
            help='Name of the meta variable to receive the extracted file date. The default value is "{default}".') = b'date',
        pwd: Arg('-p', help='Optionally specify an extraction password.') = B'',
        **kwargs
    ):
        super().__init__(
            *paths,
            list=list,
            join_path=join_path,
            drop_path=drop_path,
            fuzzy=fuzzy,
            exact=exact,
            regex=regex,
            path=path,
            pwd=pwd,
            date=date,
            **kwargs
        )

    _COMMON_PASSWORDS = [
        'infected',
        'virus',
        'malware',
        'dangerous',
        'flare',
        '1234',
        '123',
        'Infected',
        'infected!',
        'INFECTED',
        'notinfected',
        'unzip-me',
        'password',
    ]

    def _pack(
        self,
        path: str,
        date: datetime | str | None,
        data: buf | Callable[[], buf],
        **meta
    ) -> UnpackResult:
        if isinstance(date, datetime):
            date = date.isoformat(' ', 'seconds')
        if isinstance(date, str):
            meta[self.args.date.decode(self.codec)] = date
        return UnpackResult(path, data, **meta)

Sub-modules

refinery.units.formats.archive.innopwd
refinery.units.formats.archive.xt
refinery.units.formats.archive.xt7z
refinery.units.formats.archive.xtace
refinery.units.formats.archive.xtasar
refinery.units.formats.archive.xtcab
refinery.units.formats.archive.xtchm
refinery.units.formats.archive.xtcpio
refinery.units.formats.archive.xtgz
refinery.units.formats.archive.xtinno
refinery.units.formats.archive.xtiso
refinery.units.formats.archive.xtiss
refinery.units.formats.archive.xtmacho
refinery.units.formats.archive.xtmagtape
refinery.units.formats.archive.xtnode
refinery.units.formats.archive.xtnsis
refinery.units.formats.archive.xtnuitka
refinery.units.formats.archive.xtpyi
refinery.units.formats.archive.xtsim
refinery.units.formats.archive.xtsql
refinery.units.formats.archive.xttar
refinery.units.formats.archive.xtzip
refinery.units.formats.archive.xtzpaq

This code was ported directly from unzpaq.cpp; it is not very Pythonic and has inherited a somewhat convoluted structure from the source. Cleaning it …

Classes

class MultipleArchives (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code Browse git
class MultipleArchives(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ArchiveExecutable (name, bases, nmspc, **kwargs)

This is the metaclass for refinery units. A class which is of this type is required to implement a method run(). If the class is created in the currently executing module, then an instance of the class is automatically created after it is defined and its run() method is invoked.

Expand source code Browse git
class ArchiveExecutable(Executable):

    def __init__(cls, name, bases, nmspc, **kwargs):
        super(ArchiveExecutable, cls).__init__(name, bases, nmspc, **kwargs)

        carver = cls._carver()

        if carver is None:
            return

        if not issubclass(cls, PathExtractorUnit):
            raise TypeError

        unpack = cls.unpack

        @wraps(unpack)
        def __unpack(self, data: buf):
            carved = data | carver
            try:
                arc1 = next(carved)
            except StopIteration:
                arc1 = data
            try:
                arc2 = next(carved)
            except StopIteration:
                yield from unpack(self, arc1)
                return
            if not self.args.lenient:
                some = 2 + sum(1 for _ in carved)
                text = (
                    F'The input contains {some} archives. Use the {carver.name} unit to extract them individually '
                    R'or set the --lenient/-L option to fuse the archives.')
                raise MultipleArchives(text)
            else:
                archives = [arc1, arc2]
                archives.extend(carved)

            for k, data in enumerate(archives, 1):
                for result in unpack(self, data):
                    result.path = F'archive{k}/{result.path}'
                    yield result

        setattr(cls, 'unpack', __unpack)

    def _carver(cls) -> Unit | None:
        return None

Ancestors

Inherited members

class ArchiveUnit (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', pwd=b'', **kwargs)
Expand source code Browse git
class ArchiveUnit(PathExtractorUnit, metaclass=ArchiveExecutable, abstract=True):
    def __init__(
        self, *paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path',
        date: Arg('-D', metavar='NAME',
            help='Name of the meta variable to receive the extracted file date. The default value is "{default}".') = b'date',
        pwd: Arg('-p', help='Optionally specify an extraction password.') = B'',
        **kwargs
    ):
        super().__init__(
            *paths,
            list=list,
            join_path=join_path,
            drop_path=drop_path,
            fuzzy=fuzzy,
            exact=exact,
            regex=regex,
            path=path,
            pwd=pwd,
            date=date,
            **kwargs
        )

    _COMMON_PASSWORDS = [
        'infected',
        'virus',
        'malware',
        'dangerous',
        'flare',
        '1234',
        '123',
        'Infected',
        'infected!',
        'INFECTED',
        'notinfected',
        'unzip-me',
        'password',
    ]

    def _pack(
        self,
        path: str,
        date: datetime | str | None,
        data: buf | Callable[[], buf],
        **meta
    ) -> UnpackResult:
        if isinstance(date, datetime):
            date = date.isoformat(' ', 'seconds')
        if isinstance(date, str):
            meta[self.args.date.decode(self.codec)] = date
        return UnpackResult(path, data, **meta)

Ancestors

Subclasses

Class variables

var required_dependencies
var optional_dependencies
var console

Inherited members