Module refinery.units.formats.archive.xtzip

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Optional

from datetime import datetime

from refinery.units.formats.archive import ArchiveUnit
from refinery.lib.structures import MemoryFile
from refinery.units.pattern.carve_zip import ZipEndOfCentralDirectory, carve_zip

ZIP_FILENAME_UTF8_FLAG = 0x800


class xtzip(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'):
    """
    Extract files from a Zip archive.
    """
    @ArchiveUnit.Requires('chardet', 'default', 'extended')
    def _chardet():
        import chardet
        return chardet

    @ArchiveUnit.Requires('pyzipper', 'arc', 'default', 'extended')
    def _pyzipper():
        import pyzipper
        return pyzipper

    @classmethod
    def _carver(cls):
        return carve_zip

    def unpack(self, data: bytearray):
        from zipfile import ZipInfo, ZipFile, BadZipFile

        def password_invalid(password: Optional[bytes]):
            nonlocal archive, fallback
            if password:
                archive.setpassword(password)
            try:
                archive.testzip()
                files = (t for t in archive.infolist() if t.filename and not t.is_dir())
                files = sorted(files, key=lambda info: info.file_size)
                for info in files:
                    self.log_debug('testing password against:', info.filename)
                    try:
                        with archive.open(info.filename, "r") as test:
                            while test.read(1024):
                                pass
                    except BadZipFile:
                        continue
                    else:
                        break
            except NotImplementedError:
                if fallback:
                    raise
                self.log_debug('compression method unsupported, switching to pyzipper')
                archive = self._pyzipper.AESZipFile(MemoryFile(data))
                fallback = True
                return password_invalid(password)
            except RuntimeError as E:
                if 'password' not in str(E):
                    raise
                return True
            else:
                if password:
                    self.log_debug('using password:', password)
                return False

        password = bytes(self.args.pwd)
        fallback = False
        archive = ZipFile(MemoryFile(data))
        passwords = [password]

        if not password:
            passwords.extend(p.encode(self.codec) for p in self._COMMON_PASSWORDS)
        for p in passwords:
            if not password_invalid(p):
                break
        else:
            raise RuntimeError('Archive is password-protected.')

        for info in archive.infolist():
            def xt(archive: ZipFile = archive, info: ZipInfo = info):
                try:
                    return archive.read(info.filename)
                except RuntimeError as E:
                    if 'password' not in str(E):
                        raise
                    if not password:
                        raise RuntimeError('archive is password-protected')
                    else:
                        raise RuntimeError(F'invalid password: {password.decode(self.codec)}') from E
            if info.filename:
                if info.is_dir():
                    continue

            # courtesy of https://stackoverflow.com/a/37773438/9130824
            filename = info.filename
            if info.flag_bits & ZIP_FILENAME_UTF8_FLAG == 0:
                filename_bytes = filename.encode('437')
                try:
                    guessed_encoding = self._chardet.detect(filename_bytes)['encoding']
                except ImportError:
                    guessed_encoding = None
                guessed_encoding = guessed_encoding or 'cp1252'
                filename = filename_bytes.decode(guessed_encoding, 'replace')

            try:
                date = datetime(*info.date_time)
            except Exception as e:
                self.log_info(F'{e!s} - unable to determine date from tuple {info.date_time} for: {filename}')
                date = None

            yield self._pack(filename, date, xt)

    @classmethod
    def handles(cls, data: bytearray) -> Optional[bool]:
        return data.rfind(ZipEndOfCentralDirectory.SIGNATURE) > 0

Classes

class xtzip (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', pwd=b'')

Extract files from a Zip archive. This unit is a path extractor which extracts data from a hierarchical structure. Each extracted item is emitted as a separate chunk and has attached to it a meta variable that contains its path within the source structure. The positional arguments to the command are patterns that can be used to filter the extracted items by their path. To view only the paths of all chunks, use the listing switch:

emit something | xtzip --list

Otherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:

emit something | xtzip [| dump {path} ]
Expand source code Browse git
class xtzip(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'):
    """
    Extract files from a Zip archive.
    """
    @ArchiveUnit.Requires('chardet', 'default', 'extended')
    def _chardet():
        import chardet
        return chardet

    @ArchiveUnit.Requires('pyzipper', 'arc', 'default', 'extended')
    def _pyzipper():
        import pyzipper
        return pyzipper

    @classmethod
    def _carver(cls):
        return carve_zip

    def unpack(self, data: bytearray):
        from zipfile import ZipInfo, ZipFile, BadZipFile

        def password_invalid(password: Optional[bytes]):
            nonlocal archive, fallback
            if password:
                archive.setpassword(password)
            try:
                archive.testzip()
                files = (t for t in archive.infolist() if t.filename and not t.is_dir())
                files = sorted(files, key=lambda info: info.file_size)
                for info in files:
                    self.log_debug('testing password against:', info.filename)
                    try:
                        with archive.open(info.filename, "r") as test:
                            while test.read(1024):
                                pass
                    except BadZipFile:
                        continue
                    else:
                        break
            except NotImplementedError:
                if fallback:
                    raise
                self.log_debug('compression method unsupported, switching to pyzipper')
                archive = self._pyzipper.AESZipFile(MemoryFile(data))
                fallback = True
                return password_invalid(password)
            except RuntimeError as E:
                if 'password' not in str(E):
                    raise
                return True
            else:
                if password:
                    self.log_debug('using password:', password)
                return False

        password = bytes(self.args.pwd)
        fallback = False
        archive = ZipFile(MemoryFile(data))
        passwords = [password]

        if not password:
            passwords.extend(p.encode(self.codec) for p in self._COMMON_PASSWORDS)
        for p in passwords:
            if not password_invalid(p):
                break
        else:
            raise RuntimeError('Archive is password-protected.')

        for info in archive.infolist():
            def xt(archive: ZipFile = archive, info: ZipInfo = info):
                try:
                    return archive.read(info.filename)
                except RuntimeError as E:
                    if 'password' not in str(E):
                        raise
                    if not password:
                        raise RuntimeError('archive is password-protected')
                    else:
                        raise RuntimeError(F'invalid password: {password.decode(self.codec)}') from E
            if info.filename:
                if info.is_dir():
                    continue

            # courtesy of https://stackoverflow.com/a/37773438/9130824
            filename = info.filename
            if info.flag_bits & ZIP_FILENAME_UTF8_FLAG == 0:
                filename_bytes = filename.encode('437')
                try:
                    guessed_encoding = self._chardet.detect(filename_bytes)['encoding']
                except ImportError:
                    guessed_encoding = None
                guessed_encoding = guessed_encoding or 'cp1252'
                filename = filename_bytes.decode(guessed_encoding, 'replace')

            try:
                date = datetime(*info.date_time)
            except Exception as e:
                self.log_info(F'{e!s} - unable to determine date from tuple {info.date_time} for: {filename}')
                date = None

            yield self._pack(filename, date, xt)

    @classmethod
    def handles(cls, data: bytearray) -> Optional[bool]:
        return data.rfind(ZipEndOfCentralDirectory.SIGNATURE) > 0

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data: bytearray):
    from zipfile import ZipInfo, ZipFile, BadZipFile

    def password_invalid(password: Optional[bytes]):
        nonlocal archive, fallback
        if password:
            archive.setpassword(password)
        try:
            archive.testzip()
            files = (t for t in archive.infolist() if t.filename and not t.is_dir())
            files = sorted(files, key=lambda info: info.file_size)
            for info in files:
                self.log_debug('testing password against:', info.filename)
                try:
                    with archive.open(info.filename, "r") as test:
                        while test.read(1024):
                            pass
                except BadZipFile:
                    continue
                else:
                    break
        except NotImplementedError:
            if fallback:
                raise
            self.log_debug('compression method unsupported, switching to pyzipper')
            archive = self._pyzipper.AESZipFile(MemoryFile(data))
            fallback = True
            return password_invalid(password)
        except RuntimeError as E:
            if 'password' not in str(E):
                raise
            return True
        else:
            if password:
                self.log_debug('using password:', password)
            return False

    password = bytes(self.args.pwd)
    fallback = False
    archive = ZipFile(MemoryFile(data))
    passwords = [password]

    if not password:
        passwords.extend(p.encode(self.codec) for p in self._COMMON_PASSWORDS)
    for p in passwords:
        if not password_invalid(p):
            break
    else:
        raise RuntimeError('Archive is password-protected.')

    for info in archive.infolist():
        def xt(archive: ZipFile = archive, info: ZipInfo = info):
            try:
                return archive.read(info.filename)
            except RuntimeError as E:
                if 'password' not in str(E):
                    raise
                if not password:
                    raise RuntimeError('archive is password-protected')
                else:
                    raise RuntimeError(F'invalid password: {password.decode(self.codec)}') from E
        if info.filename:
            if info.is_dir():
                continue

        # courtesy of https://stackoverflow.com/a/37773438/9130824
        filename = info.filename
        if info.flag_bits & ZIP_FILENAME_UTF8_FLAG == 0:
            filename_bytes = filename.encode('437')
            try:
                guessed_encoding = self._chardet.detect(filename_bytes)['encoding']
            except ImportError:
                guessed_encoding = None
            guessed_encoding = guessed_encoding or 'cp1252'
            filename = filename_bytes.decode(guessed_encoding, 'replace')

        try:
            date = datetime(*info.date_time)
        except Exception as e:
            self.log_info(F'{e!s} - unable to determine date from tuple {info.date_time} for: {filename}')
            date = None

        yield self._pack(filename, date, xt)

Inherited members