Module refinery.units.formats.archive.xt7z

Expand source code Browse git
from __future__ import annotations

import re

from typing import TYPE_CHECKING

from refinery.lib.id import buffer_offset, is_likely_pe
from refinery.lib.structures import MemoryFile
from refinery.units.formats.archive import ArchiveUnit
from refinery.units.formats.pe import get_pe_size

if TYPE_CHECKING:
    from py7zr import SevenZipFile


_SIGNATURE = B'7z\xBC\xAF\x27\x1C'


class _IOFactory:
    def __init__(self):
        self.buffer = None

    def create(self, _):
        if self.buffer is not None:
            raise RuntimeError('IO factory was unexpectedly called twice.')
        self.buffer = MemoryFile()
        return self.buffer


class xt7z(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'):
    """
    Extract files from a 7zip archive.
    """
    @ArchiveUnit.Requires('py7zr', ['arc', 'default', 'extended'])
    def _py7zr():
        import py7zr
        import py7zr.exceptions
        return py7zr

    def unpack(self, data: bytearray):
        for match in re.finditer(re.escape(_SIGNATURE), data):
            start = match.start()
            if start != 0:
                self.log_info(F'found a header at offset 0x{start:X}, trying to extract from there.')
            try:
                yield from self._unpack_from(data, start)
            except self._py7zr.Bad7zFile:
                continue
            else:
                break

    def _unpack_from(self, data: bytearray, zp: int = 0):
        def mk7z(**keywords):
            return self._py7zr.SevenZipFile(MemoryFile(mv[zp:]), **keywords)

        pwd = self.args.pwd
        mv = memoryview(data)
        archive = None

        def test(archive: SevenZipFile):
            if self.args.list:
                archive.list()
                return False
            return archive.testzip()

        if pwd:
            try:
                archive = mk7z(password=pwd.decode(self.codec))
            except self._py7zr.Bad7zFile:
                raise ValueError('corrupt archive; the password is likely invalid.')
        else:
            def passwords():
                yield None
                yield from self.CommonPasswords
            for pwd in passwords():
                if pwd is None:
                    self.log_debug('trying empty password')
                else:
                    self.log_debug(F'trying password: {pwd}')
                try:
                    archive = mk7z(password=pwd)
                    problem = test(archive)
                except self._py7zr.PasswordRequired:
                    problem = True
                except self._py7zr.UnsupportedCompressionMethodError as E:
                    raise ValueError(E.message)
                except self._py7zr.exceptions.InternalError:
                    # ignore internal errors during testzip
                    break
                except SystemError:
                    problem = True
                except Exception:
                    if pwd is None:
                        raise
                    problem = True
                if not problem:
                    break
            else:
                raise ValueError('a password is required and none of the default passwords worked.')

        assert archive is not None
        has_read_method = hasattr(archive, 'read')

        for info in archive.list():
            if has_read_method:
                def extract(archive: SevenZipFile = archive, name: str = info.filename):
                    archive.reset()
                    io = archive.read([name])
                    io = io[name]
                    io.seek(0)
                    return io.read()
            else:
                def extract(archive: SevenZipFile = archive, name: str = info.filename):
                    io = _IOFactory()
                    archive.reset()
                    archive.extract(None, [name], factory=io)
                    return io.buffer.getvalue()

            if info.is_directory:
                continue

            yield self._pack(
                info.filename,
                info.creationtime,
                extract,
                crc32=info.crc32,
                uncompressed=info.uncompressed
            )

    @classmethod
    def handles(cls, data) -> bool | None:
        if data[:6] == _SIGNATURE:
            return True
        if not is_likely_pe(data):
            return None
        offset = get_pe_size(data)
        memory = memoryview(data)
        memory = memory[offset:]
        if memory[:10] == B';!@Install' and buffer_offset(memory, _SIGNATURE, 0, 0x1000) > 0:
            return True

Classes

class xt7z (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', pwd=b'')

Extract files from a 7zip archive. This unit is a path extractor which extracts data from a hierarchical structure. Each extracted item is emitted as a separate chunk and has attached to it a meta variable that contains its path within the source structure. The positional arguments to the command are patterns that can be used to filter the extracted items by their path. To view only the paths of all chunks, use the listing switch:

emit something | xt7z --list

Otherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:

emit something | xt7z [| d2p ]

If you using xt7z to unpack a file on disk, the following pattern can be useful:

ef pack.foo [| xt7z -j | d2p ]

The unit ef is also a path extractor. By specifying -j (or --join), the paths of extracted items are combined. The d2p unit will deconflict these with the local file system. For example, if pack.foo contains items one.txt and two.txt, the following local file tree would be the result:

pack.foo
pack/one.txt
pack/two.txt

Finally, the -d (or --drop) switch can be used to not create (or alter) the path metadata at all, which is useful in cases where path metadata from a previous unit should be preserved.

Expand source code Browse git
class xt7z(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'):
    """
    Extract files from a 7zip archive.
    """
    @ArchiveUnit.Requires('py7zr', ['arc', 'default', 'extended'])
    def _py7zr():
        import py7zr
        import py7zr.exceptions
        return py7zr

    def unpack(self, data: bytearray):
        for match in re.finditer(re.escape(_SIGNATURE), data):
            start = match.start()
            if start != 0:
                self.log_info(F'found a header at offset 0x{start:X}, trying to extract from there.')
            try:
                yield from self._unpack_from(data, start)
            except self._py7zr.Bad7zFile:
                continue
            else:
                break

    def _unpack_from(self, data: bytearray, zp: int = 0):
        def mk7z(**keywords):
            return self._py7zr.SevenZipFile(MemoryFile(mv[zp:]), **keywords)

        pwd = self.args.pwd
        mv = memoryview(data)
        archive = None

        def test(archive: SevenZipFile):
            if self.args.list:
                archive.list()
                return False
            return archive.testzip()

        if pwd:
            try:
                archive = mk7z(password=pwd.decode(self.codec))
            except self._py7zr.Bad7zFile:
                raise ValueError('corrupt archive; the password is likely invalid.')
        else:
            def passwords():
                yield None
                yield from self.CommonPasswords
            for pwd in passwords():
                if pwd is None:
                    self.log_debug('trying empty password')
                else:
                    self.log_debug(F'trying password: {pwd}')
                try:
                    archive = mk7z(password=pwd)
                    problem = test(archive)
                except self._py7zr.PasswordRequired:
                    problem = True
                except self._py7zr.UnsupportedCompressionMethodError as E:
                    raise ValueError(E.message)
                except self._py7zr.exceptions.InternalError:
                    # ignore internal errors during testzip
                    break
                except SystemError:
                    problem = True
                except Exception:
                    if pwd is None:
                        raise
                    problem = True
                if not problem:
                    break
            else:
                raise ValueError('a password is required and none of the default passwords worked.')

        assert archive is not None
        has_read_method = hasattr(archive, 'read')

        for info in archive.list():
            if has_read_method:
                def extract(archive: SevenZipFile = archive, name: str = info.filename):
                    archive.reset()
                    io = archive.read([name])
                    io = io[name]
                    io.seek(0)
                    return io.read()
            else:
                def extract(archive: SevenZipFile = archive, name: str = info.filename):
                    io = _IOFactory()
                    archive.reset()
                    archive.extract(None, [name], factory=io)
                    return io.buffer.getvalue()

            if info.is_directory:
                continue

            yield self._pack(
                info.filename,
                info.creationtime,
                extract,
                crc32=info.crc32,
                uncompressed=info.uncompressed
            )

    @classmethod
    def handles(cls, data) -> bool | None:
        if data[:6] == _SIGNATURE:
            return True
        if not is_likely_pe(data):
            return None
        offset = get_pe_size(data)
        memory = memoryview(data)
        memory = memory[offset:]
        if memory[:10] == B';!@Install' and buffer_offset(memory, _SIGNATURE, 0, 0x1000) > 0:
            return True

Ancestors

Subclasses

Class variables

var reverse

The type of the None singleton.

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data: bytearray):
    for match in re.finditer(re.escape(_SIGNATURE), data):
        start = match.start()
        if start != 0:
            self.log_info(F'found a header at offset 0x{start:X}, trying to extract from there.')
        try:
            yield from self._unpack_from(data, start)
        except self._py7zr.Bad7zFile:
            continue
        else:
            break

Inherited members