Module refinery.units.formats.archive.xtzip

Expand source code Browse git
from __future__ import annotations

import codecs

from refinery.lib import lief
from refinery.lib.id import buffer_offset, is_likely_pe
from refinery.lib.types import buf
from refinery.lib.zip import InvalidPassword, PasswordRequired, Zip, ZipDirEntry
from refinery.units import RefineryPartialResult
from refinery.units.formats.archive import ArchiveUnit
from refinery.units.formats.pe import get_pe_size
from refinery.units.pattern.carve_zip import carve_zip


class xtzip(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'):
    """
    Extract files from a Zip archive.
    """
    @classmethod
    def _carver(cls):
        return carve_zip

    def unpack(self, data: buf):
        def trypwd(password: str | None):
            try:
                zipf = Zip(data, password)
            except (PasswordRequired, InvalidPassword):
                return None
            for file in zipf.records.values():
                if file.is_dir():
                    continue
                if file.is_password_ok(password):
                    break
                return False
            return zipf

        password = self.args.pwd
        if not password:
            password = None
        elif not isinstance(password, str):
            password = codecs.decode(password, self.codec)
        passwords = [password]
        if not password:
            passwords.extend(self._COMMON_PASSWORDS)
        for p in passwords:
            if zipf := trypwd(p):
                break
        else:
            zipf = Zip(data, password)

        if zipf.password:
            self.log_debug('Using password:', zipf.password)

        if boundary := zipf.coverage.boundary():
            w = len(hex(boundary[1]))
            for start, end in zipf.coverage.gaps():
                self.log_warn(F'data cave detected at range {start:#0{w}x}:{end:#0{w}x}')

        for entry in zipf.directory:
            def xt(entry=entry):
                record = zipf.read(entry)
                try:
                    return record.unpack(zipf.password)
                except InvalidPassword:
                    if not record.data:
                        raise
                    msg = 'invalid password; use -L to extract raw encrypted data'
                    raise RefineryPartialResult(msg, record.data)
            if entry.is_dir():
                continue
            yield self._pack(entry.name, entry.date, xt)

    @classmethod
    def handles(cls, data):
        if data[:4] in (
            B'PK\x03\x04',
            B'PK\x07\x08',
        ):
            return True
        if not is_likely_pe(data):
            return False
        memory = memoryview(data)
        if 0 <= buffer_offset(memory[-0x400:], ZipDirEntry.Signature):
            return True
        pe = lief.load_pe_fast(data)
        offset = get_pe_size(pe)
        if 0 <= buffer_offset(memory[offset:], B'PK\x03\x04') < 0x1000:
            return True
        if not pe.has_debug:
            return False
        for entry in pe.debug:
            if not isinstance(entry, lief.PE.CodeViewPDB):
                continue
            path = entry.filename
            if not isinstance(path, str):
                path = codecs.decode(path, 'latin1')
            if 'sfxzip32' in path and 'WinRAR' in path:
                return True

Classes

class xtzip (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', pwd=b'')

Extract files from a Zip archive. This unit is a path extractor which extracts data from a hierarchical structure. Each extracted item is emitted as a separate chunk and has attached to it a meta variable that contains its path within the source structure. The positional arguments to the command are patterns that can be used to filter the extracted items by their path. To view only the paths of all chunks, use the listing switch:

emit something | xtzip --list

Otherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:

emit something | xtzip [| dump {path} ]
Expand source code Browse git
class xtzip(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'):
    """
    Extract files from a Zip archive.
    """
    @classmethod
    def _carver(cls):
        return carve_zip

    def unpack(self, data: buf):
        def trypwd(password: str | None):
            try:
                zipf = Zip(data, password)
            except (PasswordRequired, InvalidPassword):
                return None
            for file in zipf.records.values():
                if file.is_dir():
                    continue
                if file.is_password_ok(password):
                    break
                return False
            return zipf

        password = self.args.pwd
        if not password:
            password = None
        elif not isinstance(password, str):
            password = codecs.decode(password, self.codec)
        passwords = [password]
        if not password:
            passwords.extend(self._COMMON_PASSWORDS)
        for p in passwords:
            if zipf := trypwd(p):
                break
        else:
            zipf = Zip(data, password)

        if zipf.password:
            self.log_debug('Using password:', zipf.password)

        if boundary := zipf.coverage.boundary():
            w = len(hex(boundary[1]))
            for start, end in zipf.coverage.gaps():
                self.log_warn(F'data cave detected at range {start:#0{w}x}:{end:#0{w}x}')

        for entry in zipf.directory:
            def xt(entry=entry):
                record = zipf.read(entry)
                try:
                    return record.unpack(zipf.password)
                except InvalidPassword:
                    if not record.data:
                        raise
                    msg = 'invalid password; use -L to extract raw encrypted data'
                    raise RefineryPartialResult(msg, record.data)
            if entry.is_dir():
                continue
            yield self._pack(entry.name, entry.date, xt)

    @classmethod
    def handles(cls, data):
        if data[:4] in (
            B'PK\x03\x04',
            B'PK\x07\x08',
        ):
            return True
        if not is_likely_pe(data):
            return False
        memory = memoryview(data)
        if 0 <= buffer_offset(memory[-0x400:], ZipDirEntry.Signature):
            return True
        pe = lief.load_pe_fast(data)
        offset = get_pe_size(pe)
        if 0 <= buffer_offset(memory[offset:], B'PK\x03\x04') < 0x1000:
            return True
        if not pe.has_debug:
            return False
        for entry in pe.debug:
            if not isinstance(entry, lief.PE.CodeViewPDB):
                continue
            path = entry.filename
            if not isinstance(path, str):
                path = codecs.decode(path, 'latin1')
            if 'sfxzip32' in path and 'WinRAR' in path:
                return True

Ancestors

Subclasses

Class variables

var required_dependencies
var optional_dependencies
var console
var reverse

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data: buf):
    def trypwd(password: str | None):
        try:
            zipf = Zip(data, password)
        except (PasswordRequired, InvalidPassword):
            return None
        for file in zipf.records.values():
            if file.is_dir():
                continue
            if file.is_password_ok(password):
                break
            return False
        return zipf

    password = self.args.pwd
    if not password:
        password = None
    elif not isinstance(password, str):
        password = codecs.decode(password, self.codec)
    passwords = [password]
    if not password:
        passwords.extend(self._COMMON_PASSWORDS)
    for p in passwords:
        if zipf := trypwd(p):
            break
    else:
        zipf = Zip(data, password)

    if zipf.password:
        self.log_debug('Using password:', zipf.password)

    if boundary := zipf.coverage.boundary():
        w = len(hex(boundary[1]))
        for start, end in zipf.coverage.gaps():
            self.log_warn(F'data cave detected at range {start:#0{w}x}:{end:#0{w}x}')

    for entry in zipf.directory:
        def xt(entry=entry):
            record = zipf.read(entry)
            try:
                return record.unpack(zipf.password)
            except InvalidPassword:
                if not record.data:
                    raise
                msg = 'invalid password; use -L to extract raw encrypted data'
                raise RefineryPartialResult(msg, record.data)
        if entry.is_dir():
            continue
        yield self._pack(entry.name, entry.date, xt)

Inherited members