Module refinery.units.formats.archive.xtgz

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from gzip import GzipFile, FEXTRA, FNAME
from datetime import datetime
from pathlib import Path

from refinery.units.formats.archive import ArchiveUnit
from refinery.lib.structures import Struct, StructReader, StreamDetour
from refinery.lib.meta import metavars


class GzipHeader(Struct):
    MAGIC = B'\x1F\x8B'

    def __init__(self, reader: StructReader):
        unpacker = GzipFile(fileobj=reader)
        with StreamDetour(reader, 0):
            self.magic = reader.read(2)
            self.method, self.flags, self.mtime = reader.read_struct('BBIxx')
            self.extra = None
            self.name = None
            if self.flags & FEXTRA:
                self.extra = reader.read(reader.u16())
            if self.flags & FNAME:
                self.name = reader.read_c_string().decode('latin1')
        self.data = unpacker.read()


class xtgz(ArchiveUnit):
    """
    Extract a file from a GZip archive.
    """
    def unpack(self, data: bytearray):
        archive = GzipHeader(data)
        path = archive.name
        date = archive.mtime
        date = date and datetime.fromtimestamp(date) or None
        if path is None:
            try:
                meta = metavars(data)
                path = Path(meta['path'])
            except KeyError:
                path = 'ungz'
            else:
                self.log_warn(path)
                suffix = path.suffix
                if suffix.lower() == '.gz':
                    path = path.with_suffix('')
                else:
                    path = path.with_suffix(F'{suffix}.ungz')
                path = path.as_posix()
        yield self._pack(path, date, archive.data)

    @classmethod
    def handles(cls, data: bytearray) -> bool:
        return data.startswith(B'\x1F\x8B')

Classes

class GzipHeader (reader)

A class to parse structured data. A Struct class can be instantiated as follows:

foo = Struct(data, bar=29)

The initialization routine of the structure will be called with a single argument reader. If the object data is already a StructReader, then it will be passed as reader. Otherwise, the argument will be wrapped in a StructReader. Additional arguments to the struct are passed through.

Expand source code Browse git
class GzipHeader(Struct):
    MAGIC = B'\x1F\x8B'

    def __init__(self, reader: StructReader):
        unpacker = GzipFile(fileobj=reader)
        with StreamDetour(reader, 0):
            self.magic = reader.read(2)
            self.method, self.flags, self.mtime = reader.read_struct('BBIxx')
            self.extra = None
            self.name = None
            if self.flags & FEXTRA:
                self.extra = reader.read(reader.u16())
            if self.flags & FNAME:
                self.name = reader.read_c_string().decode('latin1')
        self.data = unpacker.read()

Ancestors

Class variables

var MAGIC
class xtgz (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', pwd=b'')

Extract a file from a GZip archive.

Expand source code Browse git
class xtgz(ArchiveUnit):
    """
    Extract a file from a GZip archive.
    """
    def unpack(self, data: bytearray):
        archive = GzipHeader(data)
        path = archive.name
        date = archive.mtime
        date = date and datetime.fromtimestamp(date) or None
        if path is None:
            try:
                meta = metavars(data)
                path = Path(meta['path'])
            except KeyError:
                path = 'ungz'
            else:
                self.log_warn(path)
                suffix = path.suffix
                if suffix.lower() == '.gz':
                    path = path.with_suffix('')
                else:
                    path = path.with_suffix(F'{suffix}.ungz')
                path = path.as_posix()
        yield self._pack(path, date, archive.data)

    @classmethod
    def handles(cls, data: bytearray) -> bool:
        return data.startswith(B'\x1F\x8B')

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data: bytearray):
    archive = GzipHeader(data)
    path = archive.name
    date = archive.mtime
    date = date and datetime.fromtimestamp(date) or None
    if path is None:
        try:
            meta = metavars(data)
            path = Path(meta['path'])
        except KeyError:
            path = 'ungz'
        else:
            self.log_warn(path)
            suffix = path.suffix
            if suffix.lower() == '.gz':
                path = path.with_suffix('')
            else:
                path = path.with_suffix(F'{suffix}.ungz')
            path = path.as_posix()
    yield self._pack(path, date, archive.data)

Inherited members