Module refinery.units.formats.archive.xt7z
Expand source code Browse git
from __future__ import annotations
import re
from refinery.lib.id import buffer_offset, is_likely_pe
from refinery.lib.un7z import (
SIGNATURE,
SzArchive,
SzCannotUnpack,
SzCorruptArchive,
SzUnsupportedMethod,
)
from refinery.units.formats.archive import ArchiveUnit
from refinery.units.formats.pe import get_pe_size
class xt7z(ArchiveUnit, docs='{0}{p}{PathExtractorUnit}'):
"""
Extract files from a 7zip archive.
"""
def unpack(self, data: bytearray):
for match in re.finditer(re.escape(SIGNATURE), data):
start = match.start()
if start != 0:
self.log_info(F'found a header at offset 0x{start:X}, trying to extract from there.')
try:
yield from self._unpack_from(data, start)
except SzCorruptArchive:
continue
else:
break
def _unpack_from(self, data: bytearray, zp: int = 0):
mv = memoryview(data)
chunk = mv[zp:]
pwd = self.args.pwd
def try_open(password: str | bytes | None) -> SzArchive:
return SzArchive(chunk, password=password)
archive: SzArchive | None = None
if pwd:
try:
archive = try_open(pwd.decode(self.codec))
except SzCorruptArchive:
raise ValueError('corrupt archive; the password is likely invalid.')
else:
def passwords():
yield None
yield from self.CommonPasswords
for pwd in passwords():
if pwd is None:
self.log_debug('trying empty password')
else:
self.log_debug(F'trying password: {pwd}')
try:
archive = try_open(pwd)
for f in archive.files:
if not f.is_dir:
f.decompress(password=pwd)
break
problem = False
except SzUnsupportedMethod as E:
raise ValueError(str(E))
except SzCannotUnpack:
problem = True
except Exception:
if pwd is None:
raise
problem = True
if not problem:
break
else:
raise ValueError('a password is required and none of the default passwords worked.')
assert archive is not None
for info in archive.files:
if info.is_dir:
continue
def extract(f=info, p=pwd):
return f.decompress(password=p)
yield self._pack(
info.name,
info.mtime or info.ctime,
extract,
crc32=info.crc,
uncompressed=info.size,
)
@classmethod
def handles(cls, data) -> bool | None:
if data[:6] == SIGNATURE:
return True
if not is_likely_pe(data):
return None
offset = get_pe_size(data)
memory = memoryview(data)
memory = memory[offset:]
if memory[:10] == B';!@Install' and buffer_offset(memory, SIGNATURE, 0, 0x1000) > 0:
return True
Classes
class xt7z (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', exclude=None, date=b'date', pwd=b'')-
Extract files from a 7zip archive.
This unit extracts items with an associated virtual path from a container; each extracted item is emitted as a separate chunk with a corresponding meta variable named "path".
Positional arguments to xt7z are patterns to filter the extracted items. Use the
-xflag to add an exclusion pattern. To extract all files with a foo or bar extension, but none that has the word "temp" in its path:xt7z .foo .bar -x tempTo view only the paths of all chunks, use the listing switch:
emit data | ... | xt7z -lOtherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:
emit data | ... | xt7z [| dump extracted/{path} ]The value
{path}is a placeholder which is substituted by the virtual path of the extracted item. When using xt7z to unpack a file on disk, the following pattern can be useful:ef pack.bin [| xt7z -j | d2p ]The unit
efis also a path extractor. By specifying-j(or--join), the paths of extracted items are combined. Here,d2pis a shortcut fordump {path}. It deconflicts the joined paths with the local file system: Ifpack.bincontains itemsone.txtandtwo.txt, the following local file tree would be the result:pack.bin pack/one.txt pack/two.txtFinally, the
-d(or--drop) switch can be used to not create (or alter) the path metadata at all, which is useful in cases where path metadata from a previous unit should be preserved.Expand source code Browse git
class xt7z(ArchiveUnit, docs='{0}{p}{PathExtractorUnit}'): """ Extract files from a 7zip archive. """ def unpack(self, data: bytearray): for match in re.finditer(re.escape(SIGNATURE), data): start = match.start() if start != 0: self.log_info(F'found a header at offset 0x{start:X}, trying to extract from there.') try: yield from self._unpack_from(data, start) except SzCorruptArchive: continue else: break def _unpack_from(self, data: bytearray, zp: int = 0): mv = memoryview(data) chunk = mv[zp:] pwd = self.args.pwd def try_open(password: str | bytes | None) -> SzArchive: return SzArchive(chunk, password=password) archive: SzArchive | None = None if pwd: try: archive = try_open(pwd.decode(self.codec)) except SzCorruptArchive: raise ValueError('corrupt archive; the password is likely invalid.') else: def passwords(): yield None yield from self.CommonPasswords for pwd in passwords(): if pwd is None: self.log_debug('trying empty password') else: self.log_debug(F'trying password: {pwd}') try: archive = try_open(pwd) for f in archive.files: if not f.is_dir: f.decompress(password=pwd) break problem = False except SzUnsupportedMethod as E: raise ValueError(str(E)) except SzCannotUnpack: problem = True except Exception: if pwd is None: raise problem = True if not problem: break else: raise ValueError('a password is required and none of the default passwords worked.') assert archive is not None for info in archive.files: if info.is_dir: continue def extract(f=info, p=pwd): return f.decompress(password=p) yield self._pack( info.name, info.mtime or info.ctime, extract, crc32=info.crc, uncompressed=info.size, ) @classmethod def handles(cls, data) -> bool | None: if data[:6] == SIGNATURE: return True if not is_likely_pe(data): return None offset = get_pe_size(data) memory = memoryview(data) memory = memory[offset:] if memory[:10] == B';!@Install' and buffer_offset(memory, SIGNATURE, 0, 0x1000) > 0: return TrueAncestors
Subclasses
Class variables
var reverse-
The type of the None singleton.
Methods
def unpack(self, data)-
Expand source code Browse git
def unpack(self, data: bytearray): for match in re.finditer(re.escape(SIGNATURE), data): start = match.start() if start != 0: self.log_info(F'found a header at offset 0x{start:X}, trying to extract from there.') try: yield from self._unpack_from(data, start) except SzCorruptArchive: continue else: break
Inherited members
ArchiveUnit:CommonPasswordsCustomJoinBehaviourCustomPathSeparatorFilterEverythingRequiresactassemblecodecconsolefilterfinishhandlesis_quietis_reversibleisattylabelledleniencylog_alwayslog_debuglog_detachlog_faillog_infolog_levellog_warnloggernamenozzleoptional_dependenciesprocessreadread1required_dependenciesresetrunsourcesuperinit