Module refinery.units.formats.archive.xt7z
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
from refinery.lib.structures import MemoryFile
from refinery.units.formats.archive import ArchiveUnit
import re
if TYPE_CHECKING:
from py7zr import SevenZipFile, FileInfo
class xt7z(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'):
"""
Extract files from a 7zip archive.
"""
@ArchiveUnit.Requires('py7zr', 'arc', 'default', 'extended')
def _py7zr():
import py7zr
import py7zr.exceptions
return py7zr
def unpack(self, data: bytearray):
for match in re.finditer(re.escape(B'7z\xBC\xAF\x27\x1C'), data):
start = match.start()
if start != 0:
self.log_info(F'found a header at offset 0x{start:X}, trying to extract from there.')
try:
yield from self._unpack_from(data, start)
except self._py7zr.Bad7zFile:
continue
else:
break
def _unpack_from(self, data: bytearray, zp: int = 0):
def mk7z(**keywords):
return self._py7zr.SevenZipFile(MemoryFile(mv[zp:]), **keywords)
pwd = self.args.pwd
mv = memoryview(data)
def test(archive: SevenZipFile):
if self.args.list:
archive.list()
return False
return archive.testzip()
if pwd:
try:
archive = mk7z(password=pwd.decode(self.codec))
except self._py7zr.Bad7zFile:
raise ValueError('corrupt archive; the password is likely invalid.')
else:
def passwords():
yield None
yield from self._COMMON_PASSWORDS
for pwd in passwords():
if pwd is None:
self.log_debug(U'trying empty password')
else:
self.log_debug(F'trying password: {pwd}')
try:
archive = mk7z(password=pwd)
problem = test(archive)
except self._py7zr.PasswordRequired:
problem = True
except self._py7zr.UnsupportedCompressionMethodError as E:
raise ValueError(E.message)
except self._py7zr.exceptions.InternalError:
# ignore internal errors during testzip
break
except SystemError:
problem = True
except Exception:
if pwd is None:
raise
problem = True
if not problem:
break
else:
raise ValueError('a password is required and none of the default passwords worked.')
for info in archive.list():
def extract(archive: SevenZipFile = archive, info: FileInfo = info):
archive.reset()
return archive.read([info.filename]).get(info.filename).read()
if info.is_directory:
continue
yield self._pack(info.filename, info.creationtime, extract, crc32=info.crc32, uncompressed=info.uncompressed)
@classmethod
def handles(cls, data: bytearray) -> bool:
return B'7z\xBC\xAF\x27\x1C' in data
Classes
class xt7z (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', pwd=b'')
-
Extract files from a 7zip archive. This unit is a path extractor which extracts data from a hierarchical structure. Each extracted item is emitted as a separate chunk and has attached to it a meta variable that contains its path within the source structure. The positional arguments to the command are patterns that can be used to filter the extracted items by their path. To view only the paths of all chunks, use the listing switch:
emit something | xt7z --list
Otherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:
emit something | xt7z [| dump {path} ]
Expand source code Browse git
class xt7z(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'): """ Extract files from a 7zip archive. """ @ArchiveUnit.Requires('py7zr', 'arc', 'default', 'extended') def _py7zr(): import py7zr import py7zr.exceptions return py7zr def unpack(self, data: bytearray): for match in re.finditer(re.escape(B'7z\xBC\xAF\x27\x1C'), data): start = match.start() if start != 0: self.log_info(F'found a header at offset 0x{start:X}, trying to extract from there.') try: yield from self._unpack_from(data, start) except self._py7zr.Bad7zFile: continue else: break def _unpack_from(self, data: bytearray, zp: int = 0): def mk7z(**keywords): return self._py7zr.SevenZipFile(MemoryFile(mv[zp:]), **keywords) pwd = self.args.pwd mv = memoryview(data) def test(archive: SevenZipFile): if self.args.list: archive.list() return False return archive.testzip() if pwd: try: archive = mk7z(password=pwd.decode(self.codec)) except self._py7zr.Bad7zFile: raise ValueError('corrupt archive; the password is likely invalid.') else: def passwords(): yield None yield from self._COMMON_PASSWORDS for pwd in passwords(): if pwd is None: self.log_debug(U'trying empty password') else: self.log_debug(F'trying password: {pwd}') try: archive = mk7z(password=pwd) problem = test(archive) except self._py7zr.PasswordRequired: problem = True except self._py7zr.UnsupportedCompressionMethodError as E: raise ValueError(E.message) except self._py7zr.exceptions.InternalError: # ignore internal errors during testzip break except SystemError: problem = True except Exception: if pwd is None: raise problem = True if not problem: break else: raise ValueError('a password is required and none of the default passwords worked.') for info in archive.list(): def extract(archive: SevenZipFile = archive, info: FileInfo = info): archive.reset() return archive.read([info.filename]).get(info.filename).read() if info.is_directory: continue yield self._pack(info.filename, info.creationtime, extract, crc32=info.crc32, uncompressed=info.uncompressed) @classmethod def handles(cls, data: bytearray) -> bool: return B'7z\xBC\xAF\x27\x1C' in data
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Methods
def unpack(self, data)
-
Expand source code Browse git
def unpack(self, data: bytearray): for match in re.finditer(re.escape(B'7z\xBC\xAF\x27\x1C'), data): start = match.start() if start != 0: self.log_info(F'found a header at offset 0x{start:X}, trying to extract from there.') try: yield from self._unpack_from(data, start) except self._py7zr.Bad7zFile: continue else: break
Inherited members