Module refinery.units.formats.archive.xt7z

Expand source code Browse git
from __future__ import annotations

import re

from refinery.lib.id import buffer_offset, is_likely_pe
from refinery.lib.un7z import (
    SIGNATURE,
    SzArchive,
    SzCannotUnpack,
    SzCorruptArchive,
    SzUnsupportedMethod,
)
from refinery.units.formats.archive import ArchiveUnit
from refinery.units.formats.pe import get_pe_size


class xt7z(ArchiveUnit, docs='{0}{p}{PathExtractorUnit}'):
    """
    Extract files from a 7zip archive.
    """

    def unpack(self, data: bytearray):
        for match in re.finditer(re.escape(SIGNATURE), data):
            start = match.start()
            if start != 0:
                self.log_info(F'found a header at offset 0x{start:X}, trying to extract from there.')
            try:
                yield from self._unpack_from(data, start)
            except SzCorruptArchive:
                continue
            else:
                break

    def _unpack_from(self, data: bytearray, zp: int = 0):
        mv = memoryview(data)
        chunk = mv[zp:]
        pwd = self.args.pwd

        def try_open(password: str | bytes | None) -> SzArchive:
            return SzArchive(chunk, password=password)

        archive: SzArchive | None = None

        if pwd:
            try:
                archive = try_open(pwd.decode(self.codec))
            except SzCorruptArchive:
                raise ValueError('corrupt archive; the password is likely invalid.')
        else:
            def passwords():
                yield None
                yield from self.CommonPasswords
            for pwd in passwords():
                if pwd is None:
                    self.log_debug('trying empty password')
                else:
                    self.log_debug(F'trying password: {pwd}')
                try:
                    archive = try_open(pwd)
                    for f in archive.files:
                        if not f.is_dir:
                            f.decompress(password=pwd)
                            break
                    problem = False
                except SzUnsupportedMethod as E:
                    raise ValueError(str(E))
                except SzCannotUnpack:
                    problem = True
                except Exception:
                    if pwd is None:
                        raise
                    problem = True
                if not problem:
                    break
            else:
                raise ValueError('a password is required and none of the default passwords worked.')

        assert archive is not None

        for info in archive.files:
            if info.is_dir:
                continue

            def extract(f=info, p=pwd):
                return f.decompress(password=p)

            yield self._pack(
                info.name,
                info.mtime or info.ctime,
                extract,
                crc32=info.crc,
                uncompressed=info.size,
            )

    @classmethod
    def handles(cls, data) -> bool | None:
        if data[:6] == SIGNATURE:
            return True
        if not is_likely_pe(data):
            return None
        offset = get_pe_size(data)
        memory = memoryview(data)
        memory = memory[offset:]
        if memory[:10] == B';!@Install' and buffer_offset(memory, SIGNATURE, 0, 0x1000) > 0:
            return True

Classes

class xt7z (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', pwd=b'')

Extract files from a 7zip archive.

This unit is a path extractor which extracts data from a hierarchical structure. Each extracted item is emitted as a separate chunk and has attached to it a meta variable that contains its path within the source structure. The positional arguments to the command are patterns that can be used to filter the extracted items by their path. To view only the paths of all chunks, use the listing switch:

emit something | xt7z --list

Otherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:

emit something | xt7z [| d2p ]

If you using xt7z to unpack a file on disk, the following pattern can be useful:

ef pack.foo [| xt7z -j | d2p ]

The unit ef is also a path extractor. By specifying -j (or --join), the paths of extracted items are combined. The d2p unit will deconflict these with the local file system. For example, if pack.foo contains items one.txt and two.txt, the following local file tree would be the result:

pack.foo
pack/one.txt
pack/two.txt

Finally, the -d (or --drop) switch can be used to not create (or alter) the path metadata at all, which is useful in cases where path metadata from a previous unit should be preserved.

Expand source code Browse git
class xt7z(ArchiveUnit, docs='{0}{p}{PathExtractorUnit}'):
    """
    Extract files from a 7zip archive.
    """

    def unpack(self, data: bytearray):
        for match in re.finditer(re.escape(SIGNATURE), data):
            start = match.start()
            if start != 0:
                self.log_info(F'found a header at offset 0x{start:X}, trying to extract from there.')
            try:
                yield from self._unpack_from(data, start)
            except SzCorruptArchive:
                continue
            else:
                break

    def _unpack_from(self, data: bytearray, zp: int = 0):
        mv = memoryview(data)
        chunk = mv[zp:]
        pwd = self.args.pwd

        def try_open(password: str | bytes | None) -> SzArchive:
            return SzArchive(chunk, password=password)

        archive: SzArchive | None = None

        if pwd:
            try:
                archive = try_open(pwd.decode(self.codec))
            except SzCorruptArchive:
                raise ValueError('corrupt archive; the password is likely invalid.')
        else:
            def passwords():
                yield None
                yield from self.CommonPasswords
            for pwd in passwords():
                if pwd is None:
                    self.log_debug('trying empty password')
                else:
                    self.log_debug(F'trying password: {pwd}')
                try:
                    archive = try_open(pwd)
                    for f in archive.files:
                        if not f.is_dir:
                            f.decompress(password=pwd)
                            break
                    problem = False
                except SzUnsupportedMethod as E:
                    raise ValueError(str(E))
                except SzCannotUnpack:
                    problem = True
                except Exception:
                    if pwd is None:
                        raise
                    problem = True
                if not problem:
                    break
            else:
                raise ValueError('a password is required and none of the default passwords worked.')

        assert archive is not None

        for info in archive.files:
            if info.is_dir:
                continue

            def extract(f=info, p=pwd):
                return f.decompress(password=p)

            yield self._pack(
                info.name,
                info.mtime or info.ctime,
                extract,
                crc32=info.crc,
                uncompressed=info.size,
            )

    @classmethod
    def handles(cls, data) -> bool | None:
        if data[:6] == SIGNATURE:
            return True
        if not is_likely_pe(data):
            return None
        offset = get_pe_size(data)
        memory = memoryview(data)
        memory = memory[offset:]
        if memory[:10] == B';!@Install' and buffer_offset(memory, SIGNATURE, 0, 0x1000) > 0:
            return True

Ancestors

Subclasses

Class variables

var reverse

The type of the None singleton.

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data: bytearray):
    for match in re.finditer(re.escape(SIGNATURE), data):
        start = match.start()
        if start != 0:
            self.log_info(F'found a header at offset 0x{start:X}, trying to extract from there.')
        try:
            yield from self._unpack_from(data, start)
        except SzCorruptArchive:
            continue
        else:
            break

Inherited members