Module refinery.units.formats.archive.xtcab

Expand source code Browse git
from __future__ import annotations

from refinery.lib.cab import CabDisk, Cabinet, CabSequenceMismatch
from refinery.units import Chunk
from refinery.units.formats.archive import ArchiveUnit


class xtcab(ArchiveUnit, docs='{0}{p}{PathExtractorUnit}'):
    """
    Extract files from CAB (cabinet) archives.

    Multi-volume archives can be extracted if all required disks are present as chunks
    within the current frame.
    """
    def unpack(self, data: Chunk):
        if (arc := data.temp) is None:
            arc = Cabinet()
            arc.append(memoryview(data))
        try:
            arc.check()
        except CabSequenceMismatch as ce:
            self.log_info(str(ce))
        arc.process()
        one = len(arc.files) == 1
        self.log_info(F'processing CAB with {len(arc)} disk{"s" * (1 - one)}')
        for id, files in arc.files.items():
            for file in files:
                path = file.name
                if not one:
                    path = F'CAB{id:04X}/{path}'
                yield self._pack(path, file.timestamp, lambda f=file: f.decompress())

    def filter(self, chunks):
        box = None
        cab = Cabinet()
        for chunk in chunks:
            if not self.handles(chunk):
                yield chunk
                continue
            if box is None:
                box = chunk
                box.temp = cab
            if cab.needs_more_disks():
                cab.append(memoryview(chunk))
            else:
                yield box
                box = chunk
                cab = box.temp = Cabinet()
        if box:
            yield box

    @classmethod
    def handles(cls, data):
        return data[:4] == CabDisk.MAGIC

Classes

class xtcab (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', exclude=None, date=b'date', pwd=b'')

Extract files from CAB (cabinet) archives.

Multi-volume archives can be extracted if all required disks are present as chunks within the current frame.

This unit extracts items with an associated virtual path from a container; each extracted item is emitted as a separate chunk with a corresponding meta variable named "path".

Positional arguments to xtcab are patterns to filter the extracted items. Use the -x flag to add an exclusion pattern. To extract all files with a foo or bar extension, but none that has the word "temp" in its path:

xtcab .foo .bar -x temp

To view only the paths of all chunks, use the listing switch:

emit data | ... | xtcab -l

Otherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:

emit data | ... | xtcab [| dump extracted/{path} ]

The value {path} is a placeholder which is substituted by the virtual path of the extracted item. When using xtcab to unpack a file on disk, the following pattern can be useful:

ef pack.bin [| xtcab -j | d2p ]

The unit ef is also a path extractor. By specifying -j (or --join), the paths of extracted items are combined. Here, d2p is a shortcut for dump {path}. It deconflicts the joined paths with the local file system: If pack.bin contains items one.txt and two.txt, the following local file tree would be the result:

pack.bin
pack/one.txt
pack/two.txt

Finally, the -d (or --drop) switch can be used to not create (or alter) the path metadata at all, which is useful in cases where path metadata from a previous unit should be preserved.

Expand source code Browse git
class xtcab(ArchiveUnit, docs='{0}{p}{PathExtractorUnit}'):
    """
    Extract files from CAB (cabinet) archives.

    Multi-volume archives can be extracted if all required disks are present as chunks
    within the current frame.
    """
    def unpack(self, data: Chunk):
        if (arc := data.temp) is None:
            arc = Cabinet()
            arc.append(memoryview(data))
        try:
            arc.check()
        except CabSequenceMismatch as ce:
            self.log_info(str(ce))
        arc.process()
        one = len(arc.files) == 1
        self.log_info(F'processing CAB with {len(arc)} disk{"s" * (1 - one)}')
        for id, files in arc.files.items():
            for file in files:
                path = file.name
                if not one:
                    path = F'CAB{id:04X}/{path}'
                yield self._pack(path, file.timestamp, lambda f=file: f.decompress())

    def filter(self, chunks):
        box = None
        cab = Cabinet()
        for chunk in chunks:
            if not self.handles(chunk):
                yield chunk
                continue
            if box is None:
                box = chunk
                box.temp = cab
            if cab.needs_more_disks():
                cab.append(memoryview(chunk))
            else:
                yield box
                box = chunk
                cab = box.temp = Cabinet()
        if box:
            yield box

    @classmethod
    def handles(cls, data):
        return data[:4] == CabDisk.MAGIC

Ancestors

Subclasses

Class variables

var reverse

The type of the None singleton.

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data: Chunk):
    if (arc := data.temp) is None:
        arc = Cabinet()
        arc.append(memoryview(data))
    try:
        arc.check()
    except CabSequenceMismatch as ce:
        self.log_info(str(ce))
    arc.process()
    one = len(arc.files) == 1
    self.log_info(F'processing CAB with {len(arc)} disk{"s" * (1 - one)}')
    for id, files in arc.files.items():
        for file in files:
            path = file.name
            if not one:
                path = F'CAB{id:04X}/{path}'
            yield self._pack(path, file.timestamp, lambda f=file: f.decompress())

Inherited members