Module refinery.units.formats.archive.xtzip
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Optional
from datetime import datetime
from refinery.units.formats.archive import ArchiveUnit
from refinery.lib.structures import MemoryFile
from refinery.units.pattern.carve_zip import ZipEndOfCentralDirectory, carve_zip
ZIP_FILENAME_UTF8_FLAG = 0x800
class xtzip(ArchiveUnit):
"""
Extract files from a Zip archive.
"""
@ArchiveUnit.Requires('chardet', 'default', 'extended')
def _chardet():
import chardet
return chardet
@ArchiveUnit.Requires('pyzipper', 'arc', 'default', 'extended')
def _pyzipper():
import pyzipper
return pyzipper
@classmethod
def _carver(cls):
return carve_zip
def unpack(self, data: bytearray):
from zipfile import ZipFile, ZipInfo
password = bytes(self.args.pwd)
archive = ZipFile(MemoryFile(data))
if password:
archive.setpassword(password)
else:
def password_invalid(pwd: Optional[str], pyzipper=False):
nonlocal archive
if pwd is not None:
archive.setpassword(pwd.encode(self.codec))
try:
archive.testzip()
except NotImplementedError:
if pyzipper:
raise
self.log_debug('compression method unsupported, switching to pyzipper')
archive = self._pyzipper.AESZipFile(MemoryFile(data))
return password_invalid(pwd, True)
except RuntimeError as E:
if 'password' not in str(E):
raise
return True
else:
if pwd:
self.log_debug('using password:', pwd)
return False
for pwd in [None, *self._COMMON_PASSWORDS]:
if not password_invalid(pwd):
break
else:
raise RuntimeError('Archive is password-protected.')
for info in archive.infolist():
def xt(archive: ZipFile = archive, info: ZipInfo = info):
try:
return archive.read(info.filename)
except RuntimeError as E:
if 'password' not in str(E):
raise
if not password:
raise RuntimeError('archive is password-protected')
else:
raise RuntimeError(F'invalid password: {password.decode(self.codec)}') from E
if info.filename:
if info.is_dir():
continue
# courtesy of https://stackoverflow.com/a/37773438/9130824
filename = info.filename
if info.flag_bits & ZIP_FILENAME_UTF8_FLAG == 0:
filename_bytes = filename.encode('437')
try:
guessed_encoding = self._chardet.detect(filename_bytes)['encoding']
except ImportError:
guessed_encoding = None
guessed_encoding = guessed_encoding or 'cp1252'
filename = filename_bytes.decode(guessed_encoding, 'replace')
try:
date = datetime(*info.date_time)
except Exception as e:
self.log_info(F'{e!s} - unable to determine date from tuple {info.date_time} for: {filename}')
date = None
yield self._pack(filename, date, xt)
@classmethod
def handles(cls, data: bytearray) -> Optional[bool]:
return data.rfind(ZipEndOfCentralDirectory.SIGNATURE) > 0
Classes
class xtzip (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', pwd=b'')
-
Extract files from a Zip archive.
Expand source code Browse git
class xtzip(ArchiveUnit): """ Extract files from a Zip archive. """ @ArchiveUnit.Requires('chardet', 'default', 'extended') def _chardet(): import chardet return chardet @ArchiveUnit.Requires('pyzipper', 'arc', 'default', 'extended') def _pyzipper(): import pyzipper return pyzipper @classmethod def _carver(cls): return carve_zip def unpack(self, data: bytearray): from zipfile import ZipFile, ZipInfo password = bytes(self.args.pwd) archive = ZipFile(MemoryFile(data)) if password: archive.setpassword(password) else: def password_invalid(pwd: Optional[str], pyzipper=False): nonlocal archive if pwd is not None: archive.setpassword(pwd.encode(self.codec)) try: archive.testzip() except NotImplementedError: if pyzipper: raise self.log_debug('compression method unsupported, switching to pyzipper') archive = self._pyzipper.AESZipFile(MemoryFile(data)) return password_invalid(pwd, True) except RuntimeError as E: if 'password' not in str(E): raise return True else: if pwd: self.log_debug('using password:', pwd) return False for pwd in [None, *self._COMMON_PASSWORDS]: if not password_invalid(pwd): break else: raise RuntimeError('Archive is password-protected.') for info in archive.infolist(): def xt(archive: ZipFile = archive, info: ZipInfo = info): try: return archive.read(info.filename) except RuntimeError as E: if 'password' not in str(E): raise if not password: raise RuntimeError('archive is password-protected') else: raise RuntimeError(F'invalid password: {password.decode(self.codec)}') from E if info.filename: if info.is_dir(): continue # courtesy of https://stackoverflow.com/a/37773438/9130824 filename = info.filename if info.flag_bits & ZIP_FILENAME_UTF8_FLAG == 0: filename_bytes = filename.encode('437') try: guessed_encoding = self._chardet.detect(filename_bytes)['encoding'] except ImportError: guessed_encoding = None guessed_encoding = guessed_encoding or 'cp1252' filename = filename_bytes.decode(guessed_encoding, 'replace') try: date = datetime(*info.date_time) except Exception as e: self.log_info(F'{e!s} - unable to determine date from tuple {info.date_time} for: {filename}') date = None yield self._pack(filename, date, xt) @classmethod def handles(cls, data: bytearray) -> Optional[bool]: return data.rfind(ZipEndOfCentralDirectory.SIGNATURE) > 0
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Methods
def unpack(self, data)
-
Expand source code Browse git
def unpack(self, data: bytearray): from zipfile import ZipFile, ZipInfo password = bytes(self.args.pwd) archive = ZipFile(MemoryFile(data)) if password: archive.setpassword(password) else: def password_invalid(pwd: Optional[str], pyzipper=False): nonlocal archive if pwd is not None: archive.setpassword(pwd.encode(self.codec)) try: archive.testzip() except NotImplementedError: if pyzipper: raise self.log_debug('compression method unsupported, switching to pyzipper') archive = self._pyzipper.AESZipFile(MemoryFile(data)) return password_invalid(pwd, True) except RuntimeError as E: if 'password' not in str(E): raise return True else: if pwd: self.log_debug('using password:', pwd) return False for pwd in [None, *self._COMMON_PASSWORDS]: if not password_invalid(pwd): break else: raise RuntimeError('Archive is password-protected.') for info in archive.infolist(): def xt(archive: ZipFile = archive, info: ZipInfo = info): try: return archive.read(info.filename) except RuntimeError as E: if 'password' not in str(E): raise if not password: raise RuntimeError('archive is password-protected') else: raise RuntimeError(F'invalid password: {password.decode(self.codec)}') from E if info.filename: if info.is_dir(): continue # courtesy of https://stackoverflow.com/a/37773438/9130824 filename = info.filename if info.flag_bits & ZIP_FILENAME_UTF8_FLAG == 0: filename_bytes = filename.encode('437') try: guessed_encoding = self._chardet.detect(filename_bytes)['encoding'] except ImportError: guessed_encoding = None guessed_encoding = guessed_encoding or 'cp1252' filename = filename_bytes.decode(guessed_encoding, 'replace') try: date = datetime(*info.date_time) except Exception as e: self.log_info(F'{e!s} - unable to determine date from tuple {info.date_time} for: {filename}') date = None yield self._pack(filename, date, xt)
Inherited members