Module refinery.units.formats.archive.xtcpio
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from contextlib import suppress
from refinery.lib.structures import EOF, Struct, StructReader
from refinery.lib.tools import date_from_timestamp
from refinery.units.formats.archive import ArchiveUnit
class CPIOEntry(Struct):
def __init__(self, reader: StructReader):
def readint(length: int):
return int(bytes(reader.read(length * 2)), 16)
self.signature = reader.read(6)
if self.signature != b'070701':
raise ValueError('invalid CPIO header signature')
self.inode = readint(4)
self.mode = readint(4)
self.uid = readint(4)
self.gid = readint(4)
self.nlinks = readint(4)
mtime = readint(4)
self.mtime = date_from_timestamp(mtime)
self.size = readint(4)
self.dev = readint(4), readint(4)
self.rdev = readint(4), readint(4)
namesize = readint(4)
self.checksum = readint(4)
self.name = bytes(reader.read(namesize)).decode('ascii').rstrip('\0')
reader.byte_align(4)
self.data = reader.read(self.size)
reader.byte_align(4)
class xtcpio(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'):
"""
Extract files from a CPIO archive.
"""
def unpack(self, data):
def cpio():
with suppress(EOF): return CPIOEntry(reader)
reader = StructReader(memoryview(data))
for entry in iter(cpio, None):
if entry.name == 'TRAILER!!!':
break
yield self._pack(entry.name, entry.mtime, entry.data)
@classmethod
def handles(cls, data: bytearray) -> bool:
for signature in (B'\x71\xC7', B'\xC7\x71', B'0707'):
if data.startswith(signature):
if B'TRAILER!!' in data:
return True
else:
return None
return False
Classes
class CPIOEntry (reader)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class CPIOEntry(Struct): def __init__(self, reader: StructReader): def readint(length: int): return int(bytes(reader.read(length * 2)), 16) self.signature = reader.read(6) if self.signature != b'070701': raise ValueError('invalid CPIO header signature') self.inode = readint(4) self.mode = readint(4) self.uid = readint(4) self.gid = readint(4) self.nlinks = readint(4) mtime = readint(4) self.mtime = date_from_timestamp(mtime) self.size = readint(4) self.dev = readint(4), readint(4) self.rdev = readint(4), readint(4) namesize = readint(4) self.checksum = readint(4) self.name = bytes(reader.read(namesize)).decode('ascii').rstrip('\0') reader.byte_align(4) self.data = reader.read(self.size) reader.byte_align(4)
Ancestors
class xtcpio (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', pwd=b'')
-
Extract files from a CPIO archive. This unit is a path extractor which extracts data from a hierarchical structure. Each extracted item is emitted as a separate chunk and has attached to it a meta variable that contains its path within the source structure. The positional arguments to the command are patterns that can be used to filter the extracted items by their path. To view only the paths of all chunks, use the listing switch:
emit something | xtcpio --list
Otherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:
emit something | xtcpio [| dump {path} ]
Expand source code Browse git
class xtcpio(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'): """ Extract files from a CPIO archive. """ def unpack(self, data): def cpio(): with suppress(EOF): return CPIOEntry(reader) reader = StructReader(memoryview(data)) for entry in iter(cpio, None): if entry.name == 'TRAILER!!!': break yield self._pack(entry.name, entry.mtime, entry.data) @classmethod def handles(cls, data: bytearray) -> bool: for signature in (B'\x71\xC7', B'\xC7\x71', B'0707'): if data.startswith(signature): if B'TRAILER!!' in data: return True else: return None return False
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Methods
def unpack(self, data)
-
Expand source code Browse git
def unpack(self, data): def cpio(): with suppress(EOF): return CPIOEntry(reader) reader = StructReader(memoryview(data)) for entry in iter(cpio, None): if entry.name == 'TRAILER!!!': break yield self._pack(entry.name, entry.mtime, entry.data)
Inherited members