Module refinery.units.formats.archive.xtzip
Expand source code Browse git
from __future__ import annotations
import codecs
from refinery.lib.id import buffer_offset, is_likely_pe
from refinery.lib.types import buf
from refinery.lib.zip import (
InvalidChecksum,
InvalidPassword,
PasswordRequired,
Zip,
ZipDirEntry,
ZipEndOfCentralDirectory,
ZipEndOfCentralDirectory64,
)
from refinery.units import RefineryPartialResult
from refinery.units.formats.archive import ArchiveUnit, MultipleArchives
from refinery.units.formats.pe import get_pe_size
class xtzip(ArchiveUnit, docs='{0}{p}{PathExtractorUnit}'):
"""
Extract files from a Zip archive.
"""
def unpack(self, data: buf):
def trypwd(password: str | None):
try:
zipf = Zip(view, password)
except (PasswordRequired, InvalidPassword):
return None
for file in zipf.records.values():
if file.is_dir():
continue
if file.is_password_ok(password):
break
return False
return zipf
view = memoryview(data)
password = self.args.pwd
if not password:
password = None
elif not isinstance(password, str):
password = codecs.decode(password, self.codec)
passwords = [password]
if not password:
passwords.extend(self.CommonPasswords)
for p in passwords:
if zipf := trypwd(p):
break
else:
zipf = Zip(view, password)
if some := zipf.sub_archive_count() and not self.args.lenient:
text = (
F'The input contains {some + 1} archives. Use the xtzip unit to extract '
R'them individually or set the --lenient/-L option to fuse the archives.')
raise MultipleArchives(text)
if zipf.password:
self.log_debug('Using password:', zipf.password)
if boundary := zipf.coverage.boundary():
w = len(hex(boundary[1]))
for start, end in zipf.coverage.gaps():
self.log_info(F'data cave detected at range {start:#0{w}x}:{end:#0{w}x}')
yield self._pack(F'.{start:#0{w}x}.cave', None, view[start:end])
for entry in sorted(zipf.directory, key=lambda d: d.name):
def xt(entry=entry):
record = zipf.read(entry)
try:
return record.unpack(zipf.password)
except InvalidChecksum as ck:
raise RefineryPartialResult('invalid checksum', ck.data) from ck
except (PasswordRequired, InvalidPassword):
if not record.data:
raise
msg = 'invalid password; use -L to extract raw encrypted data'
raise RefineryPartialResult(msg, record.data)
if entry.is_dir():
continue
yield self._pack(entry.name, entry.date, xt)
@classmethod
def handles(cls, data):
if data[:4] in (
B'PK\x03\x04',
B'PK\x07\x08',
):
return True
for EOCD in (
ZipEndOfCentralDirectory64,
ZipEndOfCentralDirectory,
):
if buffer_offset(data, EOCD.Signature, back2front=True) > 0:
return True
if not is_likely_pe(data):
return False
memory = memoryview(data)
if 0 <= buffer_offset(memory[-0x400:], ZipDirEntry.Signature):
return True
from refinery.lib import lief
pe = lief.load_pe_fast(data)
offset = get_pe_size(pe)
if 0 <= buffer_offset(memory[offset:], B'PK\x03\x04') < 0x1000:
return True
if not pe.has_debug:
return False
for entry in pe.debug:
if not isinstance(entry, lief.PE.CodeViewPDB):
continue
path = entry.filename
if not isinstance(path, str):
path = codecs.decode(path, 'latin1')
if 'sfxzip32' in path and 'WinRAR' in path:
return True
Classes
class xtzip (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', exclude=None, date=b'date', pwd=b'')-
Extract files from a Zip archive.
This unit extracts items with an associated virtual path from a container; each extracted item is emitted as a separate chunk with a corresponding meta variable named "path".
Positional arguments to xtzip are patterns to filter the extracted items. Use the
-xflag to add an exclusion pattern. To extract all files with a foo or bar extension, but none that has the word "temp" in its path:xtzip .foo .bar -x tempTo view only the paths of all chunks, use the listing switch:
emit data | ... | xtzip -lOtherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:
emit data | ... | xtzip [| dump extracted/{path} ]The value
{path}is a placeholder which is substituted by the virtual path of the extracted item. When using xtzip to unpack a file on disk, the following pattern can be useful:ef pack.bin [| xtzip -j | d2p ]The unit
efis also a path extractor. By specifying-j(or--join), the paths of extracted items are combined. Here,d2pis a shortcut fordump {path}. It deconflicts the joined paths with the local file system: Ifpack.bincontains itemsone.txtandtwo.txt, the following local file tree would be the result:pack.bin pack/one.txt pack/two.txtFinally, the
-d(or--drop) switch can be used to not create (or alter) the path metadata at all, which is useful in cases where path metadata from a previous unit should be preserved.Expand source code Browse git
class xtzip(ArchiveUnit, docs='{0}{p}{PathExtractorUnit}'): """ Extract files from a Zip archive. """ def unpack(self, data: buf): def trypwd(password: str | None): try: zipf = Zip(view, password) except (PasswordRequired, InvalidPassword): return None for file in zipf.records.values(): if file.is_dir(): continue if file.is_password_ok(password): break return False return zipf view = memoryview(data) password = self.args.pwd if not password: password = None elif not isinstance(password, str): password = codecs.decode(password, self.codec) passwords = [password] if not password: passwords.extend(self.CommonPasswords) for p in passwords: if zipf := trypwd(p): break else: zipf = Zip(view, password) if some := zipf.sub_archive_count() and not self.args.lenient: text = ( F'The input contains {some + 1} archives. Use the xtzip unit to extract ' R'them individually or set the --lenient/-L option to fuse the archives.') raise MultipleArchives(text) if zipf.password: self.log_debug('Using password:', zipf.password) if boundary := zipf.coverage.boundary(): w = len(hex(boundary[1])) for start, end in zipf.coverage.gaps(): self.log_info(F'data cave detected at range {start:#0{w}x}:{end:#0{w}x}') yield self._pack(F'.{start:#0{w}x}.cave', None, view[start:end]) for entry in sorted(zipf.directory, key=lambda d: d.name): def xt(entry=entry): record = zipf.read(entry) try: return record.unpack(zipf.password) except InvalidChecksum as ck: raise RefineryPartialResult('invalid checksum', ck.data) from ck except (PasswordRequired, InvalidPassword): if not record.data: raise msg = 'invalid password; use -L to extract raw encrypted data' raise RefineryPartialResult(msg, record.data) if entry.is_dir(): continue yield self._pack(entry.name, entry.date, xt) @classmethod def handles(cls, data): if data[:4] in ( B'PK\x03\x04', B'PK\x07\x08', ): return True for EOCD in ( ZipEndOfCentralDirectory64, ZipEndOfCentralDirectory, ): if buffer_offset(data, EOCD.Signature, back2front=True) > 0: return True if not is_likely_pe(data): return False memory = memoryview(data) if 0 <= buffer_offset(memory[-0x400:], ZipDirEntry.Signature): return True from refinery.lib import lief pe = lief.load_pe_fast(data) offset = get_pe_size(pe) if 0 <= buffer_offset(memory[offset:], B'PK\x03\x04') < 0x1000: return True if not pe.has_debug: return False for entry in pe.debug: if not isinstance(entry, lief.PE.CodeViewPDB): continue path = entry.filename if not isinstance(path, str): path = codecs.decode(path, 'latin1') if 'sfxzip32' in path and 'WinRAR' in path: return TrueAncestors
Subclasses
Class variables
var reverse-
The type of the None singleton.
Methods
def unpack(self, data)-
Expand source code Browse git
def unpack(self, data: buf): def trypwd(password: str | None): try: zipf = Zip(view, password) except (PasswordRequired, InvalidPassword): return None for file in zipf.records.values(): if file.is_dir(): continue if file.is_password_ok(password): break return False return zipf view = memoryview(data) password = self.args.pwd if not password: password = None elif not isinstance(password, str): password = codecs.decode(password, self.codec) passwords = [password] if not password: passwords.extend(self.CommonPasswords) for p in passwords: if zipf := trypwd(p): break else: zipf = Zip(view, password) if some := zipf.sub_archive_count() and not self.args.lenient: text = ( F'The input contains {some + 1} archives. Use the xtzip unit to extract ' R'them individually or set the --lenient/-L option to fuse the archives.') raise MultipleArchives(text) if zipf.password: self.log_debug('Using password:', zipf.password) if boundary := zipf.coverage.boundary(): w = len(hex(boundary[1])) for start, end in zipf.coverage.gaps(): self.log_info(F'data cave detected at range {start:#0{w}x}:{end:#0{w}x}') yield self._pack(F'.{start:#0{w}x}.cave', None, view[start:end]) for entry in sorted(zipf.directory, key=lambda d: d.name): def xt(entry=entry): record = zipf.read(entry) try: return record.unpack(zipf.password) except InvalidChecksum as ck: raise RefineryPartialResult('invalid checksum', ck.data) from ck except (PasswordRequired, InvalidPassword): if not record.data: raise msg = 'invalid password; use -L to extract raw encrypted data' raise RefineryPartialResult(msg, record.data) if entry.is_dir(): continue yield self._pack(entry.name, entry.date, xt)
Inherited members
ArchiveUnit:CommonPasswordsCustomJoinBehaviourCustomPathSeparatorFilterEverythingRequiresactassemblecodecconsolefilterfinishhandlesis_quietis_reversibleisattylabelledleniencylog_alwayslog_debuglog_detachlog_faillog_infolog_levellog_warnloggernamenozzleoptional_dependenciesprocessreadread1required_dependenciesresetrunsourcesuperinit