Module refinery.units.formats.archive
Expand source code Browse git
from __future__ import annotations
from functools import wraps
from datetime import datetime
from refinery.units import Executable, Unit, Arg
from refinery.units.formats import PathExtractorUnit, UnpackResult
from refinery.lib.types import buf, Callable
class MultipleArchives(Exception):
pass
class ArchiveExecutable(Executable):
def __init__(cls, name, bases, nmspc, **kwargs):
super(ArchiveExecutable, cls).__init__(name, bases, nmspc, **kwargs)
carver = cls._carver()
if carver is None:
return
if not issubclass(cls, PathExtractorUnit):
raise TypeError
unpack = cls.unpack
@wraps(unpack)
def __unpack(self, data: buf):
carved = data | carver
try:
arc1 = next(carved)
except StopIteration:
arc1 = data
try:
arc2 = next(carved)
except StopIteration:
yield from unpack(self, arc1)
return
if not self.args.lenient:
some = 2 + sum(1 for _ in carved)
text = (
F'The input contains {some} archives. Use the {carver.name} unit to extract them individually '
R'or set the --lenient/-L option to fuse the archives.')
raise MultipleArchives(text)
else:
archives = [arc1, arc2]
archives.extend(carved)
for k, data in enumerate(archives, 1):
for result in unpack(self, data):
result.path = F'archive{k}/{result.path}'
yield result
setattr(cls, 'unpack', __unpack)
def _carver(cls) -> Unit | None:
return None
class ArchiveUnit(PathExtractorUnit, metaclass=ArchiveExecutable, abstract=True):
def __init__(
self, *paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path',
date: Arg('-D', metavar='NAME',
help='Name of the meta variable to receive the extracted file date. The default value is "{default}".') = b'date',
pwd: Arg('-p', help='Optionally specify an extraction password.') = B'',
**kwargs
):
super().__init__(
*paths,
list=list,
join_path=join_path,
drop_path=drop_path,
fuzzy=fuzzy,
exact=exact,
regex=regex,
path=path,
pwd=pwd,
date=date,
**kwargs
)
_COMMON_PASSWORDS = [
'infected',
'virus',
'malware',
'dangerous',
'flare',
'1234',
'123',
'Infected',
'infected!',
'INFECTED',
'notinfected',
'unzip-me',
'password',
]
def _pack(
self,
path: str,
date: datetime | str | None,
data: buf | Callable[[], buf],
**meta
) -> UnpackResult:
if isinstance(date, datetime):
date = date.isoformat(' ', 'seconds')
if isinstance(date, str):
meta[self.args.date.decode(self.codec)] = date
return UnpackResult(path, data, **meta)
Sub-modules
refinery.units.formats.archive.innopwd
refinery.units.formats.archive.xt
refinery.units.formats.archive.xt7z
refinery.units.formats.archive.xtace
refinery.units.formats.archive.xtasar
refinery.units.formats.archive.xtcab
refinery.units.formats.archive.xtchm
refinery.units.formats.archive.xtcpio
refinery.units.formats.archive.xtgz
refinery.units.formats.archive.xtinno
refinery.units.formats.archive.xtiso
refinery.units.formats.archive.xtiss
refinery.units.formats.archive.xtmacho
refinery.units.formats.archive.xtmagtape
refinery.units.formats.archive.xtnode
refinery.units.formats.archive.xtnsis
refinery.units.formats.archive.xtnuitka
refinery.units.formats.archive.xtpyi
refinery.units.formats.archive.xtsim
refinery.units.formats.archive.xtsql
refinery.units.formats.archive.xttar
refinery.units.formats.archive.xtzip
refinery.units.formats.archive.xtzpaq
-
This code was ported directly from unzpaq.cpp; it is not very Pythonic and has inherited a somewhat convoluted structure from the source. Cleaning it …
Classes
class MultipleArchives (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code Browse git
class MultipleArchives(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class ArchiveExecutable (name, bases, nmspc, **kwargs)
-
This is the metaclass for refinery units. A class which is of this type is required to implement a method
run()
. If the class is created in the currently executing module, then an instance of the class is automatically created after it is defined and itsrun()
method is invoked.Expand source code Browse git
class ArchiveExecutable(Executable): def __init__(cls, name, bases, nmspc, **kwargs): super(ArchiveExecutable, cls).__init__(name, bases, nmspc, **kwargs) carver = cls._carver() if carver is None: return if not issubclass(cls, PathExtractorUnit): raise TypeError unpack = cls.unpack @wraps(unpack) def __unpack(self, data: buf): carved = data | carver try: arc1 = next(carved) except StopIteration: arc1 = data try: arc2 = next(carved) except StopIteration: yield from unpack(self, arc1) return if not self.args.lenient: some = 2 + sum(1 for _ in carved) text = ( F'The input contains {some} archives. Use the {carver.name} unit to extract them individually ' R'or set the --lenient/-L option to fuse the archives.') raise MultipleArchives(text) else: archives = [arc1, arc2] archives.extend(carved) for k, data in enumerate(archives, 1): for result in unpack(self, data): result.path = F'archive{k}/{result.path}' yield result setattr(cls, 'unpack', __unpack) def _carver(cls) -> Unit | None: return None
Ancestors
- Executable
- abc.ABCMeta
- builtins.type
Inherited members
class ArchiveUnit (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', pwd=b'', **kwargs)
-
Expand source code Browse git
class ArchiveUnit(PathExtractorUnit, metaclass=ArchiveExecutable, abstract=True): def __init__( self, *paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date: Arg('-D', metavar='NAME', help='Name of the meta variable to receive the extracted file date. The default value is "{default}".') = b'date', pwd: Arg('-p', help='Optionally specify an extraction password.') = B'', **kwargs ): super().__init__( *paths, list=list, join_path=join_path, drop_path=drop_path, fuzzy=fuzzy, exact=exact, regex=regex, path=path, pwd=pwd, date=date, **kwargs ) _COMMON_PASSWORDS = [ 'infected', 'virus', 'malware', 'dangerous', 'flare', '1234', '123', 'Infected', 'infected!', 'INFECTED', 'notinfected', 'unzip-me', 'password', ] def _pack( self, path: str, date: datetime | str | None, data: buf | Callable[[], buf], **meta ) -> UnpackResult: if isinstance(date, datetime): date = date.isoformat(' ', 'seconds') if isinstance(date, str): meta[self.args.date.decode(self.codec)] = date return UnpackResult(path, data, **meta)
Ancestors
Subclasses
- xt
- xt7z
- xtace
- xtasar
- xtcab
- xtcpio
- xtgz
- xtinno
- xtiso
- xtiss
- xtmacho
- xtnode
- xtnsis
- xtpyi
- xtsim
- xttar
- xtzip
- xtzpaq
- xtpdf
- pyc
Class variables
var required_dependencies
var optional_dependencies
var console
Inherited members