Module refinery.units.formats.archive.xtcpio

Expand source code Browse git
from __future__ import annotations

from contextlib import suppress

from refinery.lib.dt import date_from_timestamp
from refinery.lib.structures import EOF, Struct, StructReader
from refinery.units.formats.archive import ArchiveUnit


class CPIOEntry(Struct):
    def __init__(self, reader: StructReader):
        def readint(length: int):
            return int(bytes(reader.read(length * 2)), 16)
        self.signature = reader.read(6)
        if self.signature != b'070701':
            raise ValueError('invalid CPIO header signature')
        self.inode = readint(4)
        self.mode = readint(4)
        self.uid = readint(4)
        self.gid = readint(4)
        self.nlinks = readint(4)
        mtime = readint(4)
        self.mtime = date_from_timestamp(mtime)
        self.size = readint(4)
        self.dev = readint(4), readint(4)
        self.rdev = readint(4), readint(4)
        namesize = readint(4)
        self.checksum = readint(4)
        self.name = bytes(reader.read(namesize)).decode('ascii').rstrip('\0')
        reader.byte_align(4)
        self.data = reader.read(self.size)
        reader.byte_align(4)


class xtcpio(ArchiveUnit, docs='{0}{p}{PathExtractorUnit}'):
    """
    Extract files from a CPIO archive. A Unix archive format used in RPM packages, Linux initramfs
    images, and firmware updates.
    """
    def unpack(self, data):
        def cpio():
            with suppress(EOF):
                return CPIOEntry(reader)
        reader = StructReader(memoryview(data))
        for entry in iter(cpio, None):
            if entry.name == 'TRAILER!!!':
                break
            yield self._pack(entry.name, entry.mtime, entry.data)

    @classmethod
    def handles(cls, data) -> bool:
        return data[:6] == B'070701'

Classes

class CPIOEntry (reader)

A class to parse structured data. A Struct class can be instantiated as follows:

foo = Struct(data, bar=29)

The initialization routine of the structure will be called with a single argument reader. If the object data is already a StructReader, then it will be passed as reader. Otherwise, the argument will be wrapped in a StructReader. Additional arguments to the struct are passed through.

Expand source code Browse git
class CPIOEntry(Struct):
    def __init__(self, reader: StructReader):
        def readint(length: int):
            return int(bytes(reader.read(length * 2)), 16)
        self.signature = reader.read(6)
        if self.signature != b'070701':
            raise ValueError('invalid CPIO header signature')
        self.inode = readint(4)
        self.mode = readint(4)
        self.uid = readint(4)
        self.gid = readint(4)
        self.nlinks = readint(4)
        mtime = readint(4)
        self.mtime = date_from_timestamp(mtime)
        self.size = readint(4)
        self.dev = readint(4), readint(4)
        self.rdev = readint(4), readint(4)
        namesize = readint(4)
        self.checksum = readint(4)
        self.name = bytes(reader.read(namesize)).decode('ascii').rstrip('\0')
        reader.byte_align(4)
        self.data = reader.read(self.size)
        reader.byte_align(4)

Ancestors

  • Struct
  • typing.Generic
  • collections.abc.Buffer

Static methods

def Parse(reader, *args, **kwargs)
class xtcpio (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', exclude=None, date=b'date', pwd=b'')

Extract files from a CPIO archive. A Unix archive format used in RPM packages, Linux initramfs images, and firmware updates.

This unit extracts items with an associated virtual path from a container; each extracted item is emitted as a separate chunk with a corresponding meta variable named "path".

Positional arguments to xtcpio are patterns to filter the extracted items. Use the -x flag to add an exclusion pattern. To extract all files with a foo or bar extension, but none that has the word "temp" in its path:

xtcpio .foo .bar -x temp

To view only the paths of all chunks, use the listing switch:

emit data | ... | xtcpio -l

Otherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:

emit data | ... | xtcpio [| dump extracted/{path} ]

The value {path} is a placeholder which is substituted by the virtual path of the extracted item. When using xtcpio to unpack a file on disk, the following pattern can be useful:

ef pack.bin [| xtcpio -j | d2p ]

The unit ef is also a path extractor. By specifying -j (or --join), the paths of extracted items are combined. Here, d2p is a shortcut for dump {path}. It deconflicts the joined paths with the local file system: If pack.bin contains items one.txt and two.txt, the following local file tree would be the result:

pack.bin
pack/one.txt
pack/two.txt

Finally, the -d (or --drop) switch can be used to not create (or alter) the path metadata at all, which is useful in cases where path metadata from a previous unit should be preserved.

Expand source code Browse git
class xtcpio(ArchiveUnit, docs='{0}{p}{PathExtractorUnit}'):
    """
    Extract files from a CPIO archive. A Unix archive format used in RPM packages, Linux initramfs
    images, and firmware updates.
    """
    def unpack(self, data):
        def cpio():
            with suppress(EOF):
                return CPIOEntry(reader)
        reader = StructReader(memoryview(data))
        for entry in iter(cpio, None):
            if entry.name == 'TRAILER!!!':
                break
            yield self._pack(entry.name, entry.mtime, entry.data)

    @classmethod
    def handles(cls, data) -> bool:
        return data[:6] == B'070701'

Ancestors

Subclasses

Class variables

var reverse

The type of the None singleton.

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data):
    def cpio():
        with suppress(EOF):
            return CPIOEntry(reader)
    reader = StructReader(memoryview(data))
    for entry in iter(cpio, None):
        if entry.name == 'TRAILER!!!':
            break
        yield self._pack(entry.name, entry.mtime, entry.data)

Inherited members