Module refinery.units.formats.archive.xtzip

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Optional

from datetime import datetime

from refinery.units.formats.archive import ArchiveUnit
from refinery.lib.structures import MemoryFile
from refinery.units.pattern.carve_zip import ZipEndOfCentralDirectory, carve_zip

ZIP_FILENAME_UTF8_FLAG = 0x800


class xtzip(ArchiveUnit):
    """
    Extract files from a Zip archive.
    """
    @ArchiveUnit.Requires('chardet', 'default', 'extended')
    def _chardet():
        import chardet
        return chardet

    @ArchiveUnit.Requires('pyzipper', 'arc', 'default', 'extended')
    def _pyzipper():
        import pyzipper
        return pyzipper

    @classmethod
    def _carver(cls):
        return carve_zip

    def unpack(self, data: bytearray):
        from zipfile import ZipInfo, ZipFile

        def password_invalid(password: Optional[bytes]):
            nonlocal archive, fallback
            if password:
                archive.setpassword(password)
            try:
                archive.testzip()
            except NotImplementedError:
                if fallback:
                    raise
                self.log_debug('compression method unsupported, switching to pyzipper')
                archive = self._pyzipper.AESZipFile(MemoryFile(data))
                fallback = True
                return password_invalid(password)
            except RuntimeError as E:
                if 'password' not in str(E):
                    raise
                return True
            else:
                if password:
                    self.log_debug('using password:', password)
                return False

        password = bytes(self.args.pwd)
        fallback = False
        archive = ZipFile(MemoryFile(data))
        passwords = [password]

        if not password:
            passwords.extend(p.encode(self.codec) for p in self._COMMON_PASSWORDS)
        for p in passwords:
            if not password_invalid(p):
                break
        else:
            raise RuntimeError('Archive is password-protected.')

        for info in archive.infolist():
            def xt(archive: ZipFile = archive, info: ZipInfo = info):
                try:
                    return archive.read(info.filename)
                except RuntimeError as E:
                    if 'password' not in str(E):
                        raise
                    if not password:
                        raise RuntimeError('archive is password-protected')
                    else:
                        raise RuntimeError(F'invalid password: {password.decode(self.codec)}') from E
            if info.filename:
                if info.is_dir():
                    continue

            # courtesy of https://stackoverflow.com/a/37773438/9130824
            filename = info.filename
            if info.flag_bits & ZIP_FILENAME_UTF8_FLAG == 0:
                filename_bytes = filename.encode('437')
                try:
                    guessed_encoding = self._chardet.detect(filename_bytes)['encoding']
                except ImportError:
                    guessed_encoding = None
                guessed_encoding = guessed_encoding or 'cp1252'
                filename = filename_bytes.decode(guessed_encoding, 'replace')

            try:
                date = datetime(*info.date_time)
            except Exception as e:
                self.log_info(F'{e!s} - unable to determine date from tuple {info.date_time} for: {filename}')
                date = None

            yield self._pack(filename, date, xt)

    @classmethod
    def handles(cls, data: bytearray) -> Optional[bool]:
        return data.rfind(ZipEndOfCentralDirectory.SIGNATURE) > 0

Classes

class xtzip (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', pwd=b'')

Extract files from a Zip archive.

Expand source code Browse git
class xtzip(ArchiveUnit):
    """
    Extract files from a Zip archive.
    """
    @ArchiveUnit.Requires('chardet', 'default', 'extended')
    def _chardet():
        import chardet
        return chardet

    @ArchiveUnit.Requires('pyzipper', 'arc', 'default', 'extended')
    def _pyzipper():
        import pyzipper
        return pyzipper

    @classmethod
    def _carver(cls):
        return carve_zip

    def unpack(self, data: bytearray):
        from zipfile import ZipInfo, ZipFile

        def password_invalid(password: Optional[bytes]):
            nonlocal archive, fallback
            if password:
                archive.setpassword(password)
            try:
                archive.testzip()
            except NotImplementedError:
                if fallback:
                    raise
                self.log_debug('compression method unsupported, switching to pyzipper')
                archive = self._pyzipper.AESZipFile(MemoryFile(data))
                fallback = True
                return password_invalid(password)
            except RuntimeError as E:
                if 'password' not in str(E):
                    raise
                return True
            else:
                if password:
                    self.log_debug('using password:', password)
                return False

        password = bytes(self.args.pwd)
        fallback = False
        archive = ZipFile(MemoryFile(data))
        passwords = [password]

        if not password:
            passwords.extend(p.encode(self.codec) for p in self._COMMON_PASSWORDS)
        for p in passwords:
            if not password_invalid(p):
                break
        else:
            raise RuntimeError('Archive is password-protected.')

        for info in archive.infolist():
            def xt(archive: ZipFile = archive, info: ZipInfo = info):
                try:
                    return archive.read(info.filename)
                except RuntimeError as E:
                    if 'password' not in str(E):
                        raise
                    if not password:
                        raise RuntimeError('archive is password-protected')
                    else:
                        raise RuntimeError(F'invalid password: {password.decode(self.codec)}') from E
            if info.filename:
                if info.is_dir():
                    continue

            # courtesy of https://stackoverflow.com/a/37773438/9130824
            filename = info.filename
            if info.flag_bits & ZIP_FILENAME_UTF8_FLAG == 0:
                filename_bytes = filename.encode('437')
                try:
                    guessed_encoding = self._chardet.detect(filename_bytes)['encoding']
                except ImportError:
                    guessed_encoding = None
                guessed_encoding = guessed_encoding or 'cp1252'
                filename = filename_bytes.decode(guessed_encoding, 'replace')

            try:
                date = datetime(*info.date_time)
            except Exception as e:
                self.log_info(F'{e!s} - unable to determine date from tuple {info.date_time} for: {filename}')
                date = None

            yield self._pack(filename, date, xt)

    @classmethod
    def handles(cls, data: bytearray) -> Optional[bool]:
        return data.rfind(ZipEndOfCentralDirectory.SIGNATURE) > 0

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data: bytearray):
    from zipfile import ZipInfo, ZipFile

    def password_invalid(password: Optional[bytes]):
        nonlocal archive, fallback
        if password:
            archive.setpassword(password)
        try:
            archive.testzip()
        except NotImplementedError:
            if fallback:
                raise
            self.log_debug('compression method unsupported, switching to pyzipper')
            archive = self._pyzipper.AESZipFile(MemoryFile(data))
            fallback = True
            return password_invalid(password)
        except RuntimeError as E:
            if 'password' not in str(E):
                raise
            return True
        else:
            if password:
                self.log_debug('using password:', password)
            return False

    password = bytes(self.args.pwd)
    fallback = False
    archive = ZipFile(MemoryFile(data))
    passwords = [password]

    if not password:
        passwords.extend(p.encode(self.codec) for p in self._COMMON_PASSWORDS)
    for p in passwords:
        if not password_invalid(p):
            break
    else:
        raise RuntimeError('Archive is password-protected.')

    for info in archive.infolist():
        def xt(archive: ZipFile = archive, info: ZipInfo = info):
            try:
                return archive.read(info.filename)
            except RuntimeError as E:
                if 'password' not in str(E):
                    raise
                if not password:
                    raise RuntimeError('archive is password-protected')
                else:
                    raise RuntimeError(F'invalid password: {password.decode(self.codec)}') from E
        if info.filename:
            if info.is_dir():
                continue

        # courtesy of https://stackoverflow.com/a/37773438/9130824
        filename = info.filename
        if info.flag_bits & ZIP_FILENAME_UTF8_FLAG == 0:
            filename_bytes = filename.encode('437')
            try:
                guessed_encoding = self._chardet.detect(filename_bytes)['encoding']
            except ImportError:
                guessed_encoding = None
            guessed_encoding = guessed_encoding or 'cp1252'
            filename = filename_bytes.decode(guessed_encoding, 'replace')

        try:
            date = datetime(*info.date_time)
        except Exception as e:
            self.log_info(F'{e!s} - unable to determine date from tuple {info.date_time} for: {filename}')
            date = None

        yield self._pack(filename, date, xt)

Inherited members