Module refinery.units.formats.archive.xttar
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import tarfile
import datetime
from refinery.lib.structures import MemoryFile
from refinery.units.formats.archive import ArchiveUnit
class xttar(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'):
"""
Extract files from a Tar archive.
"""
def unpack(self, data: bytearray):
with MemoryFile(data) as stream:
try:
archive = tarfile.open(fileobj=stream)
except Exception:
ustar = data.find(B'ustar')
if ustar < 257:
raise
stream.seek(ustar - 257)
archive = tarfile.open(fileobj=stream)
for info in archive.getmembers():
if not info.isfile():
continue
extractor = archive.extractfile(info)
if extractor is None:
continue
date = datetime.datetime.fromtimestamp(info.mtime)
yield self._pack(info.name, date, lambda e=extractor: e.read())
@classmethod
def handles(cls, data: bytearray) -> bool:
ustar = data.find(B'ustar')
if ustar < 0:
return False
if ustar == 257:
return True
return data[ustar + 5:ustar + 8] in (B'\x00\x30\x30', B'\x20\x20\x00')
Classes
class xttar (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', pwd=b'')
-
Extract files from a Tar archive. This unit is a path extractor which extracts data from a hierarchical structure. Each extracted item is emitted as a separate chunk and has attached to it a meta variable that contains its path within the source structure. The positional arguments to the command are patterns that can be used to filter the extracted items by their path. To view only the paths of all chunks, use the listing switch:
emit something | xttar --list
Otherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:
emit something | xttar [| dump {path} ]
Expand source code Browse git
class xttar(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'): """ Extract files from a Tar archive. """ def unpack(self, data: bytearray): with MemoryFile(data) as stream: try: archive = tarfile.open(fileobj=stream) except Exception: ustar = data.find(B'ustar') if ustar < 257: raise stream.seek(ustar - 257) archive = tarfile.open(fileobj=stream) for info in archive.getmembers(): if not info.isfile(): continue extractor = archive.extractfile(info) if extractor is None: continue date = datetime.datetime.fromtimestamp(info.mtime) yield self._pack(info.name, date, lambda e=extractor: e.read()) @classmethod def handles(cls, data: bytearray) -> bool: ustar = data.find(B'ustar') if ustar < 0: return False if ustar == 257: return True return data[ustar + 5:ustar + 8] in (B'\x00\x30\x30', B'\x20\x20\x00')
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Methods
def unpack(self, data)
-
Expand source code Browse git
def unpack(self, data: bytearray): with MemoryFile(data) as stream: try: archive = tarfile.open(fileobj=stream) except Exception: ustar = data.find(B'ustar') if ustar < 257: raise stream.seek(ustar - 257) archive = tarfile.open(fileobj=stream) for info in archive.getmembers(): if not info.isfile(): continue extractor = archive.extractfile(info) if extractor is None: continue date = datetime.datetime.fromtimestamp(info.mtime) yield self._pack(info.name, date, lambda e=extractor: e.read())
Inherited members