Module refinery.units.formats.archive

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
from functools import wraps
from datetime import datetime

from refinery.units import Executable, Unit, Arg
from refinery.units.formats import PathExtractorUnit, UnpackResult
from refinery.lib.types import ByteStr

if TYPE_CHECKING:
    from typing import ByteString, Callable, Optional, Union, Type, Self


class MultipleArchives(Exception):
    pass


class ArchiveExecutable(Executable):

    def __init__(exe: Union[Self, Type[PathExtractorUnit]], name, bases, nmspc, **kwargs):
        super(ArchiveExecutable, exe).__init__(name, bases, nmspc, **kwargs)

        carver = exe._carver()

        if carver is None:
            return

        unpack = exe.unpack

        @wraps(unpack)
        def __unpack(self: PathExtractorUnit, data: ByteStr):
            carved = data | carver
            try:
                arc1 = next(carved)
            except StopIteration:
                raise ValueError('The input does not contain any archive.')
            try:
                arc2 = next(carved)
            except StopIteration:
                yield from unpack(self, arc1)
                return
            if not self.args.lenient:
                some = 2 + sum(1 for _ in carved)
                text = (
                    F'The input contains {some} archives. Use the {carver.name} unit to extract them individually '
                    R'or set the --lenient/-L option to fuse the archives.')
                raise MultipleArchives(text)
            else:
                archives = [arc1, arc2]
                archives.extend(carved)

            for k, data in enumerate(archives, 1):
                for result in unpack(self, data):
                    result.path = F'archive{k}/{result.path}'
                    yield result

        exe.unpack = __unpack

    def _carver(cls) -> Optional[Unit]:
        return None


class ArchiveUnit(PathExtractorUnit, metaclass=ArchiveExecutable, abstract=True):
    def __init__(
        self, *paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path',
        date: Arg('-D', metavar='NAME',
            help='Name of the meta variable to receive the extracted file date. The default value is "{default}".') = b'date',
        pwd: Arg('-p', help='Optionally specify an extraction password.') = B'',
        **kwargs
    ):
        super().__init__(
            *paths,
            list=list,
            join_path=join_path,
            drop_path=drop_path,
            fuzzy=fuzzy,
            exact=exact,
            regex=regex,
            path=path,
            pwd=pwd,
            date=date,
            **kwargs
        )

    _COMMON_PASSWORDS = [
        'infected',
        'virus',
        'malware',
        'dangerous',
        'flare',
        '1234',
        '123',
        'Infected',
        'infected!',
        'INFECTED',
        'notinfected',
        'unzip-me',
        'password',
    ]

    def _pack(
        self,
        path: str,
        date: Optional[Union[datetime, str]],
        data: Union[ByteString, Callable[[], ByteString]],
        **meta
    ) -> UnpackResult:
        if isinstance(date, datetime):
            date = date.isoformat(' ', 'seconds')
        if isinstance(date, str):
            meta[self.args.date.decode(self.codec)] = date
        return UnpackResult(path, data, **meta)

Sub-modules

refinery.units.formats.archive.xt
refinery.units.formats.archive.xt7z
refinery.units.formats.archive.xtace
refinery.units.formats.archive.xtasar
refinery.units.formats.archive.xtcab
refinery.units.formats.archive.xtcpio
refinery.units.formats.archive.xtgz
refinery.units.formats.archive.xtiso
refinery.units.formats.archive.xtiss
refinery.units.formats.archive.xtmacho
refinery.units.formats.archive.xtmagtape
refinery.units.formats.archive.xtnode
refinery.units.formats.archive.xtnsis
refinery.units.formats.archive.xtnuitka
refinery.units.formats.archive.xtpyi
refinery.units.formats.archive.xttar
refinery.units.formats.archive.xtzip
refinery.units.formats.archive.xtzpaq

Classes

class MultipleArchives (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code Browse git
class MultipleArchives(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ArchiveExecutable (name, bases, nmspc, **kwargs)

This is the metaclass for refinery units. A class which is of this type is required to implement a method run(). If the class is created in the currently executing module, then an instance of the class is automatically created after it is defined and its run() method is invoked.

Expand source code Browse git
class ArchiveExecutable(Executable):

    def __init__(exe: Union[Self, Type[PathExtractorUnit]], name, bases, nmspc, **kwargs):
        super(ArchiveExecutable, exe).__init__(name, bases, nmspc, **kwargs)

        carver = exe._carver()

        if carver is None:
            return

        unpack = exe.unpack

        @wraps(unpack)
        def __unpack(self: PathExtractorUnit, data: ByteStr):
            carved = data | carver
            try:
                arc1 = next(carved)
            except StopIteration:
                raise ValueError('The input does not contain any archive.')
            try:
                arc2 = next(carved)
            except StopIteration:
                yield from unpack(self, arc1)
                return
            if not self.args.lenient:
                some = 2 + sum(1 for _ in carved)
                text = (
                    F'The input contains {some} archives. Use the {carver.name} unit to extract them individually '
                    R'or set the --lenient/-L option to fuse the archives.')
                raise MultipleArchives(text)
            else:
                archives = [arc1, arc2]
                archives.extend(carved)

            for k, data in enumerate(archives, 1):
                for result in unpack(self, data):
                    result.path = F'archive{k}/{result.path}'
                    yield result

        exe.unpack = __unpack

    def _carver(cls) -> Optional[Unit]:
        return None

Ancestors

Inherited members

class ArchiveUnit (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', pwd=b'', **kwargs)
Expand source code Browse git
class ArchiveUnit(PathExtractorUnit, metaclass=ArchiveExecutable, abstract=True):
    def __init__(
        self, *paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path',
        date: Arg('-D', metavar='NAME',
            help='Name of the meta variable to receive the extracted file date. The default value is "{default}".') = b'date',
        pwd: Arg('-p', help='Optionally specify an extraction password.') = B'',
        **kwargs
    ):
        super().__init__(
            *paths,
            list=list,
            join_path=join_path,
            drop_path=drop_path,
            fuzzy=fuzzy,
            exact=exact,
            regex=regex,
            path=path,
            pwd=pwd,
            date=date,
            **kwargs
        )

    _COMMON_PASSWORDS = [
        'infected',
        'virus',
        'malware',
        'dangerous',
        'flare',
        '1234',
        '123',
        'Infected',
        'infected!',
        'INFECTED',
        'notinfected',
        'unzip-me',
        'password',
    ]

    def _pack(
        self,
        path: str,
        date: Optional[Union[datetime, str]],
        data: Union[ByteString, Callable[[], ByteString]],
        **meta
    ) -> UnpackResult:
        if isinstance(date, datetime):
            date = date.isoformat(' ', 'seconds')
        if isinstance(date, str):
            meta[self.args.date.decode(self.codec)] = date
        return UnpackResult(path, data, **meta)

Ancestors

Subclasses

Class variables

var required_dependencies
var optional_dependencies

Inherited members