Module refinery.units.formats.archive.xttar

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import tarfile
import datetime

from refinery.lib.structures import MemoryFile
from refinery.units.formats.archive import ArchiveUnit


class xttar(ArchiveUnit):
    """
    Extract files from a Tar archive.
    """
    def unpack(self, data: bytearray):
        with MemoryFile(data) as stream:
            try:
                archive = tarfile.open(fileobj=stream)
            except Exception:
                ustar = data.find(B'ustar')
                if ustar < 257:
                    raise
                stream.seek(ustar - 257)
                archive = tarfile.open(fileobj=stream)
        for info in archive.getmembers():
            if not info.isfile():
                continue
            extractor = archive.extractfile(info)
            if extractor is None:
                continue
            date = datetime.datetime.fromtimestamp(info.mtime)
            yield self._pack(info.name, date, lambda e=extractor: e.read())

    @classmethod
    def handles(cls, data: bytearray) -> bool:
        ustar = data.find(B'ustar')
        if ustar >= 0:
            return ustar == 257 or data[ustar:ustar + 3] in (B'\x00\x30\x30', B'\x20\x20\x00')
        return False

Classes

class xttar (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', pwd=b'')

Extract files from a Tar archive.

Expand source code Browse git
class xttar(ArchiveUnit):
    """
    Extract files from a Tar archive.
    """
    def unpack(self, data: bytearray):
        with MemoryFile(data) as stream:
            try:
                archive = tarfile.open(fileobj=stream)
            except Exception:
                ustar = data.find(B'ustar')
                if ustar < 257:
                    raise
                stream.seek(ustar - 257)
                archive = tarfile.open(fileobj=stream)
        for info in archive.getmembers():
            if not info.isfile():
                continue
            extractor = archive.extractfile(info)
            if extractor is None:
                continue
            date = datetime.datetime.fromtimestamp(info.mtime)
            yield self._pack(info.name, date, lambda e=extractor: e.read())

    @classmethod
    def handles(cls, data: bytearray) -> bool:
        ustar = data.find(B'ustar')
        if ustar >= 0:
            return ustar == 257 or data[ustar:ustar + 3] in (B'\x00\x30\x30', B'\x20\x20\x00')
        return False

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data: bytearray):
    with MemoryFile(data) as stream:
        try:
            archive = tarfile.open(fileobj=stream)
        except Exception:
            ustar = data.find(B'ustar')
            if ustar < 257:
                raise
            stream.seek(ustar - 257)
            archive = tarfile.open(fileobj=stream)
    for info in archive.getmembers():
        if not info.isfile():
            continue
        extractor = archive.extractfile(info)
        if extractor is None:
            continue
        date = datetime.datetime.fromtimestamp(info.mtime)
        yield self._pack(info.name, date, lambda e=extractor: e.read())

Inherited members