Module refinery.units.formats.archive.xtcab
Expand source code Browse git
from __future__ import annotations
from refinery.lib.cab import CabDisk, Cabinet, CabSequenceMismatch
from refinery.units import Chunk
from refinery.units.formats.archive import ArchiveUnit
class xtcab(ArchiveUnit, docs='{0}{p}{PathExtractorUnit}'):
"""
Extract files from CAB (cabinet) archives. Multi-volume archives can be extracted if all
required disks are present as chunks within the current frame.
"""
def unpack(self, data: Chunk):
if (arc := data.temp) is None:
arc = Cabinet()
arc.append(memoryview(data))
try:
arc.check()
except CabSequenceMismatch as ce:
self.log_info(str(ce))
arc.process()
one = len(arc.files) == 1
self.log_info(F'processing CAB with {len(arc)} disk{"s" * (1 - one)}')
for id, files in arc.files.items():
for file in files:
path = file.name
if not one:
path = F'CAB{id:04X}/{path}'
yield self._pack(path, file.timestamp, lambda f=file: f.decompress())
def filter(self, chunks):
box = None
cab = Cabinet()
for chunk in chunks:
if not self.handles(chunk):
yield chunk
continue
if box is None:
box = chunk
box.temp = cab
if cab.needs_more_disks():
cab.append(memoryview(chunk))
else:
yield box
box = chunk
cab = box.temp = Cabinet()
if box:
yield box
@classmethod
def handles(cls, data):
return data[:4] == CabDisk.MAGIC
Classes
class xtcab (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', pwd=b'')-
Extract files from CAB (cabinet) archives. Multi-volume archives can be extracted if all required disks are present as chunks within the current frame.
This unit is a path extractor which extracts data from a hierarchical structure. Each extracted item is emitted as a separate chunk and has attached to it a meta variable that contains its path within the source structure. The positional arguments to the command are patterns that can be used to filter the extracted items by their path. To view only the paths of all chunks, use the listing switch:
emit something | xtcab --listOtherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:
emit something | xtcab [| d2p ]If you using xtcab to unpack a file on disk, the following pattern can be useful:
ef pack.foo [| xtcab -j | d2p ]The unit
efis also a path extractor. By specifying-j(or--join), the paths of extracted items are combined. Thed2punit will deconflict these with the local file system. For example, ifpack.foocontains itemsone.txtandtwo.txt, the following local file tree would be the result:pack.foo pack/one.txt pack/two.txtFinally, the
-d(or--drop) switch can be used to not create (or alter) the path metadata at all, which is useful in cases where path metadata from a previous unit should be preserved.Expand source code Browse git
class xtcab(ArchiveUnit, docs='{0}{p}{PathExtractorUnit}'): """ Extract files from CAB (cabinet) archives. Multi-volume archives can be extracted if all required disks are present as chunks within the current frame. """ def unpack(self, data: Chunk): if (arc := data.temp) is None: arc = Cabinet() arc.append(memoryview(data)) try: arc.check() except CabSequenceMismatch as ce: self.log_info(str(ce)) arc.process() one = len(arc.files) == 1 self.log_info(F'processing CAB with {len(arc)} disk{"s" * (1 - one)}') for id, files in arc.files.items(): for file in files: path = file.name if not one: path = F'CAB{id:04X}/{path}' yield self._pack(path, file.timestamp, lambda f=file: f.decompress()) def filter(self, chunks): box = None cab = Cabinet() for chunk in chunks: if not self.handles(chunk): yield chunk continue if box is None: box = chunk box.temp = cab if cab.needs_more_disks(): cab.append(memoryview(chunk)) else: yield box box = chunk cab = box.temp = Cabinet() if box: yield box @classmethod def handles(cls, data): return data[:4] == CabDisk.MAGICAncestors
Subclasses
Class variables
var reverse-
The type of the None singleton.
Methods
def unpack(self, data)-
Expand source code Browse git
def unpack(self, data: Chunk): if (arc := data.temp) is None: arc = Cabinet() arc.append(memoryview(data)) try: arc.check() except CabSequenceMismatch as ce: self.log_info(str(ce)) arc.process() one = len(arc.files) == 1 self.log_info(F'processing CAB with {len(arc)} disk{"s" * (1 - one)}') for id, files in arc.files.items(): for file in files: path = file.name if not one: path = F'CAB{id:04X}/{path}' yield self._pack(path, file.timestamp, lambda f=file: f.decompress())
Inherited members
ArchiveUnit:CustomJoinBehaviourCustomPathSeparatorFilterEverythingRequiresactassemblecodecconsolefilterfinishhandlesis_quietis_reversibleisattylabelledleniencylog_alwayslog_debuglog_detachlog_faillog_infolog_levellog_warnloggernamenozzleoptional_dependenciesprocessreadread1required_dependenciesresetrunsourcesuperinit