Module refinery.units.formats.archive.xtzip

Expand source code Browse git
from __future__ import annotations

import codecs

from refinery.lib.id import buffer_offset, is_likely_pe
from refinery.lib.types import buf
from refinery.lib.zip import (
    InvalidChecksum,
    InvalidPassword,
    PasswordRequired,
    Zip,
    ZipDirEntry,
    ZipEndOfCentralDirectory,
    ZipEndOfCentralDirectory64,
)
from refinery.units import RefineryPartialResult
from refinery.units.formats.archive import ArchiveUnit, MultipleArchives
from refinery.units.formats.pe import get_pe_size


class xtzip(ArchiveUnit, docs='{0}{p}{PathExtractorUnit}'):
    """
    Extract files from a Zip archive.
    """
    def unpack(self, data: buf):
        def trypwd(password: str | None):
            try:
                zipf = Zip(view, password)
            except (PasswordRequired, InvalidPassword):
                return None
            for file in zipf.records.values():
                if file.is_dir():
                    continue
                if file.is_password_ok(password):
                    break
                return False
            return zipf

        view = memoryview(data)
        password = self.args.pwd
        if not password:
            password = None
        elif not isinstance(password, str):
            password = codecs.decode(password, self.codec)
        passwords = [password]
        if not password:
            passwords.extend(self.CommonPasswords)
        for p in passwords:
            if zipf := trypwd(p):
                break
        else:
            zipf = Zip(view, password)

        if some := zipf.sub_archive_count() and not self.args.lenient:
            text = (
                F'The input contains {some + 1} archives. Use the xtzip unit to extract '
                R'them individually or set the --lenient/-L option to fuse the archives.')
            raise MultipleArchives(text)

        if zipf.password:
            self.log_debug('Using password:', zipf.password)

        if boundary := zipf.coverage.boundary():
            w = len(hex(boundary[1]))
            for start, end in zipf.coverage.gaps():
                self.log_info(F'data cave detected at range {start:#0{w}x}:{end:#0{w}x}')
                yield self._pack(F'.{start:#0{w}x}.cave', None, view[start:end])

        for entry in sorted(zipf.directory, key=lambda d: d.name):
            def xt(entry=entry):
                record = zipf.read(entry)
                try:
                    return record.unpack(zipf.password)
                except InvalidChecksum as ck:
                    raise RefineryPartialResult('invalid checksum', ck.data) from ck
                except (PasswordRequired, InvalidPassword):
                    if not record.data:
                        raise
                    msg = 'invalid password; use -L to extract raw encrypted data'
                    raise RefineryPartialResult(msg, record.data)
            if entry.is_dir():
                continue
            yield self._pack(entry.name, entry.date, xt)

    @classmethod
    def handles(cls, data):
        if data[:4] in (
            B'PK\x03\x04',
            B'PK\x07\x08',
        ):
            return True
        for EOCD in (
            ZipEndOfCentralDirectory64,
            ZipEndOfCentralDirectory,
        ):
            if buffer_offset(data, EOCD.Signature, back2front=True) > 0:
                return True
        if not is_likely_pe(data):
            return False
        memory = memoryview(data)
        if 0 <= buffer_offset(memory[-0x400:], ZipDirEntry.Signature):
            return True
        from refinery.lib import lief
        pe = lief.load_pe_fast(data)
        offset = get_pe_size(pe)
        if 0 <= buffer_offset(memory[offset:], B'PK\x03\x04') < 0x1000:
            return True
        if not pe.has_debug:
            return False
        for entry in pe.debug:
            if not isinstance(entry, lief.PE.CodeViewPDB):
                continue
            path = entry.filename
            if not isinstance(path, str):
                path = codecs.decode(path, 'latin1')
            if 'sfxzip32' in path and 'WinRAR' in path:
                return True

Classes

class xtzip (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', exclude=None, date=b'date', pwd=b'')

Extract files from a Zip archive.

This unit extracts items with an associated virtual path from a container; each extracted item is emitted as a separate chunk with a corresponding meta variable named "path".

Positional arguments to xtzip are patterns to filter the extracted items. Use the -x flag to add an exclusion pattern. To extract all files with a foo or bar extension, but none that has the word "temp" in its path:

xtzip .foo .bar -x temp

To view only the paths of all chunks, use the listing switch:

emit data | ... | xtzip -l

Otherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:

emit data | ... | xtzip [| dump extracted/{path} ]

The value {path} is a placeholder which is substituted by the virtual path of the extracted item. When using xtzip to unpack a file on disk, the following pattern can be useful:

ef pack.bin [| xtzip -j | d2p ]

The unit ef is also a path extractor. By specifying -j (or --join), the paths of extracted items are combined. Here, d2p is a shortcut for dump {path}. It deconflicts the joined paths with the local file system: If pack.bin contains items one.txt and two.txt, the following local file tree would be the result:

pack.bin
pack/one.txt
pack/two.txt

Finally, the -d (or --drop) switch can be used to not create (or alter) the path metadata at all, which is useful in cases where path metadata from a previous unit should be preserved.

Expand source code Browse git
class xtzip(ArchiveUnit, docs='{0}{p}{PathExtractorUnit}'):
    """
    Extract files from a Zip archive.
    """
    def unpack(self, data: buf):
        def trypwd(password: str | None):
            try:
                zipf = Zip(view, password)
            except (PasswordRequired, InvalidPassword):
                return None
            for file in zipf.records.values():
                if file.is_dir():
                    continue
                if file.is_password_ok(password):
                    break
                return False
            return zipf

        view = memoryview(data)
        password = self.args.pwd
        if not password:
            password = None
        elif not isinstance(password, str):
            password = codecs.decode(password, self.codec)
        passwords = [password]
        if not password:
            passwords.extend(self.CommonPasswords)
        for p in passwords:
            if zipf := trypwd(p):
                break
        else:
            zipf = Zip(view, password)

        if some := zipf.sub_archive_count() and not self.args.lenient:
            text = (
                F'The input contains {some + 1} archives. Use the xtzip unit to extract '
                R'them individually or set the --lenient/-L option to fuse the archives.')
            raise MultipleArchives(text)

        if zipf.password:
            self.log_debug('Using password:', zipf.password)

        if boundary := zipf.coverage.boundary():
            w = len(hex(boundary[1]))
            for start, end in zipf.coverage.gaps():
                self.log_info(F'data cave detected at range {start:#0{w}x}:{end:#0{w}x}')
                yield self._pack(F'.{start:#0{w}x}.cave', None, view[start:end])

        for entry in sorted(zipf.directory, key=lambda d: d.name):
            def xt(entry=entry):
                record = zipf.read(entry)
                try:
                    return record.unpack(zipf.password)
                except InvalidChecksum as ck:
                    raise RefineryPartialResult('invalid checksum', ck.data) from ck
                except (PasswordRequired, InvalidPassword):
                    if not record.data:
                        raise
                    msg = 'invalid password; use -L to extract raw encrypted data'
                    raise RefineryPartialResult(msg, record.data)
            if entry.is_dir():
                continue
            yield self._pack(entry.name, entry.date, xt)

    @classmethod
    def handles(cls, data):
        if data[:4] in (
            B'PK\x03\x04',
            B'PK\x07\x08',
        ):
            return True
        for EOCD in (
            ZipEndOfCentralDirectory64,
            ZipEndOfCentralDirectory,
        ):
            if buffer_offset(data, EOCD.Signature, back2front=True) > 0:
                return True
        if not is_likely_pe(data):
            return False
        memory = memoryview(data)
        if 0 <= buffer_offset(memory[-0x400:], ZipDirEntry.Signature):
            return True
        from refinery.lib import lief
        pe = lief.load_pe_fast(data)
        offset = get_pe_size(pe)
        if 0 <= buffer_offset(memory[offset:], B'PK\x03\x04') < 0x1000:
            return True
        if not pe.has_debug:
            return False
        for entry in pe.debug:
            if not isinstance(entry, lief.PE.CodeViewPDB):
                continue
            path = entry.filename
            if not isinstance(path, str):
                path = codecs.decode(path, 'latin1')
            if 'sfxzip32' in path and 'WinRAR' in path:
                return True

Ancestors

Subclasses

Class variables

var reverse

The type of the None singleton.

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data: buf):
    def trypwd(password: str | None):
        try:
            zipf = Zip(view, password)
        except (PasswordRequired, InvalidPassword):
            return None
        for file in zipf.records.values():
            if file.is_dir():
                continue
            if file.is_password_ok(password):
                break
            return False
        return zipf

    view = memoryview(data)
    password = self.args.pwd
    if not password:
        password = None
    elif not isinstance(password, str):
        password = codecs.decode(password, self.codec)
    passwords = [password]
    if not password:
        passwords.extend(self.CommonPasswords)
    for p in passwords:
        if zipf := trypwd(p):
            break
    else:
        zipf = Zip(view, password)

    if some := zipf.sub_archive_count() and not self.args.lenient:
        text = (
            F'The input contains {some + 1} archives. Use the xtzip unit to extract '
            R'them individually or set the --lenient/-L option to fuse the archives.')
        raise MultipleArchives(text)

    if zipf.password:
        self.log_debug('Using password:', zipf.password)

    if boundary := zipf.coverage.boundary():
        w = len(hex(boundary[1]))
        for start, end in zipf.coverage.gaps():
            self.log_info(F'data cave detected at range {start:#0{w}x}:{end:#0{w}x}')
            yield self._pack(F'.{start:#0{w}x}.cave', None, view[start:end])

    for entry in sorted(zipf.directory, key=lambda d: d.name):
        def xt(entry=entry):
            record = zipf.read(entry)
            try:
                return record.unpack(zipf.password)
            except InvalidChecksum as ck:
                raise RefineryPartialResult('invalid checksum', ck.data) from ck
            except (PasswordRequired, InvalidPassword):
                if not record.data:
                    raise
                msg = 'invalid password; use -L to extract raw encrypted data'
                raise RefineryPartialResult(msg, record.data)
        if entry.is_dir():
            continue
        yield self._pack(entry.name, entry.date, xt)

Inherited members