Module refinery.units.formats.archive.xt7z

Expand source code Browse git
from __future__ import annotations

import re

from refinery.lib.id import buffer_offset, is_likely_pe
from refinery.lib.un7z import (
    SIGNATURE,
    SzArchive,
    SzCannotUnpack,
    SzCorruptArchive,
    SzUnsupportedMethod,
)
from refinery.units.formats.archive import ArchiveUnit
from refinery.units.formats.pe import get_pe_size


class xt7z(ArchiveUnit, docs='{0}{p}{PathExtractorUnit}'):
    """
    Extract files from a 7zip archive.
    """

    def unpack(self, data: bytearray):
        for match in re.finditer(re.escape(SIGNATURE), data):
            start = match.start()
            if start != 0:
                self.log_info(F'found a header at offset 0x{start:X}, trying to extract from there.')
            try:
                yield from self._unpack_from(data, start)
            except SzCorruptArchive:
                continue
            else:
                break

    def _unpack_from(self, data: bytearray, zp: int = 0):
        mv = memoryview(data)
        chunk = mv[zp:]
        pwd = self.args.pwd

        def try_open(password: str | bytes | None) -> SzArchive:
            return SzArchive(chunk, password=password)

        archive: SzArchive | None = None

        if pwd:
            try:
                archive = try_open(pwd.decode(self.codec))
            except SzCorruptArchive:
                raise ValueError('corrupt archive; the password is likely invalid.')
        else:
            def passwords():
                yield None
                yield from self.CommonPasswords
            for pwd in passwords():
                if pwd is None:
                    self.log_debug('trying empty password')
                else:
                    self.log_debug(F'trying password: {pwd}')
                try:
                    archive = try_open(pwd)
                    for f in archive.files:
                        if not f.is_dir:
                            f.decompress(password=pwd)
                            break
                    problem = False
                except SzUnsupportedMethod as E:
                    raise ValueError(str(E))
                except SzCannotUnpack:
                    problem = True
                except Exception:
                    if pwd is None:
                        raise
                    problem = True
                if not problem:
                    break
            else:
                raise ValueError('a password is required and none of the default passwords worked.')

        assert archive is not None

        for info in archive.files:
            if info.is_dir:
                continue

            def extract(f=info, p=pwd):
                return f.decompress(password=p)

            yield self._pack(
                info.name,
                info.mtime or info.ctime,
                extract,
                crc32=info.crc,
                uncompressed=info.size,
            )

    @classmethod
    def handles(cls, data) -> bool | None:
        if data[:6] == SIGNATURE:
            return True
        if not is_likely_pe(data):
            return None
        offset = get_pe_size(data)
        memory = memoryview(data)
        memory = memory[offset:]
        if memory[:10] == B';!@Install' and buffer_offset(memory, SIGNATURE, 0, 0x1000) > 0:
            return True

Classes

class xt7z (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', exclude=None, date=b'date', pwd=b'')

Extract files from a 7zip archive.

This unit extracts items with an associated virtual path from a container; each extracted item is emitted as a separate chunk with a corresponding meta variable named "path".

Positional arguments to xt7z are patterns to filter the extracted items. Use the -x flag to add an exclusion pattern. To extract all files with a foo or bar extension, but none that has the word "temp" in its path:

xt7z .foo .bar -x temp

To view only the paths of all chunks, use the listing switch:

emit data | ... | xt7z -l

Otherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:

emit data | ... | xt7z [| dump extracted/{path} ]

The value {path} is a placeholder which is substituted by the virtual path of the extracted item. When using xt7z to unpack a file on disk, the following pattern can be useful:

ef pack.bin [| xt7z -j | d2p ]

The unit ef is also a path extractor. By specifying -j (or --join), the paths of extracted items are combined. Here, d2p is a shortcut for dump {path}. It deconflicts the joined paths with the local file system: If pack.bin contains items one.txt and two.txt, the following local file tree would be the result:

pack.bin
pack/one.txt
pack/two.txt

Finally, the -d (or --drop) switch can be used to not create (or alter) the path metadata at all, which is useful in cases where path metadata from a previous unit should be preserved.

Expand source code Browse git
class xt7z(ArchiveUnit, docs='{0}{p}{PathExtractorUnit}'):
    """
    Extract files from a 7zip archive.
    """

    def unpack(self, data: bytearray):
        for match in re.finditer(re.escape(SIGNATURE), data):
            start = match.start()
            if start != 0:
                self.log_info(F'found a header at offset 0x{start:X}, trying to extract from there.')
            try:
                yield from self._unpack_from(data, start)
            except SzCorruptArchive:
                continue
            else:
                break

    def _unpack_from(self, data: bytearray, zp: int = 0):
        mv = memoryview(data)
        chunk = mv[zp:]
        pwd = self.args.pwd

        def try_open(password: str | bytes | None) -> SzArchive:
            return SzArchive(chunk, password=password)

        archive: SzArchive | None = None

        if pwd:
            try:
                archive = try_open(pwd.decode(self.codec))
            except SzCorruptArchive:
                raise ValueError('corrupt archive; the password is likely invalid.')
        else:
            def passwords():
                yield None
                yield from self.CommonPasswords
            for pwd in passwords():
                if pwd is None:
                    self.log_debug('trying empty password')
                else:
                    self.log_debug(F'trying password: {pwd}')
                try:
                    archive = try_open(pwd)
                    for f in archive.files:
                        if not f.is_dir:
                            f.decompress(password=pwd)
                            break
                    problem = False
                except SzUnsupportedMethod as E:
                    raise ValueError(str(E))
                except SzCannotUnpack:
                    problem = True
                except Exception:
                    if pwd is None:
                        raise
                    problem = True
                if not problem:
                    break
            else:
                raise ValueError('a password is required and none of the default passwords worked.')

        assert archive is not None

        for info in archive.files:
            if info.is_dir:
                continue

            def extract(f=info, p=pwd):
                return f.decompress(password=p)

            yield self._pack(
                info.name,
                info.mtime or info.ctime,
                extract,
                crc32=info.crc,
                uncompressed=info.size,
            )

    @classmethod
    def handles(cls, data) -> bool | None:
        if data[:6] == SIGNATURE:
            return True
        if not is_likely_pe(data):
            return None
        offset = get_pe_size(data)
        memory = memoryview(data)
        memory = memory[offset:]
        if memory[:10] == B';!@Install' and buffer_offset(memory, SIGNATURE, 0, 0x1000) > 0:
            return True

Ancestors

Subclasses

Class variables

var reverse

The type of the None singleton.

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data: bytearray):
    for match in re.finditer(re.escape(SIGNATURE), data):
        start = match.start()
        if start != 0:
            self.log_info(F'found a header at offset 0x{start:X}, trying to extract from there.')
        try:
            yield from self._unpack_from(data, start)
        except SzCorruptArchive:
            continue
        else:
            break

Inherited members