Module refinery.units.formats.archive.xtzip
Expand source code Browse git
from __future__ import annotations
import codecs
from refinery.lib import lief
from refinery.lib.id import buffer_offset, is_likely_pe
from refinery.lib.types import buf
from refinery.lib.zip import InvalidPassword, PasswordRequired, Zip, ZipDirEntry
from refinery.units import RefineryPartialResult
from refinery.units.formats.archive import ArchiveUnit
from refinery.units.formats.pe import get_pe_size
from refinery.units.pattern.carve_zip import carve_zip
class xtzip(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'):
"""
Extract files from a Zip archive.
"""
@classmethod
def _carver(cls):
return carve_zip
def unpack(self, data: buf):
def trypwd(password: str | None):
try:
zipf = Zip(data, password)
except (PasswordRequired, InvalidPassword):
return None
for file in zipf.records.values():
if file.is_dir():
continue
if file.is_password_ok(password):
break
return False
return zipf
password = self.args.pwd
if not password:
password = None
elif not isinstance(password, str):
password = codecs.decode(password, self.codec)
passwords = [password]
if not password:
passwords.extend(self._COMMON_PASSWORDS)
for p in passwords:
if zipf := trypwd(p):
break
else:
zipf = Zip(data, password)
if zipf.password:
self.log_debug('Using password:', zipf.password)
if boundary := zipf.coverage.boundary():
w = len(hex(boundary[1]))
for start, end in zipf.coverage.gaps():
self.log_warn(F'data cave detected at range {start:#0{w}x}:{end:#0{w}x}')
for entry in zipf.directory:
def xt(entry=entry):
record = zipf.read(entry)
try:
return record.unpack(zipf.password)
except InvalidPassword:
if not record.data:
raise
msg = 'invalid password; use -L to extract raw encrypted data'
raise RefineryPartialResult(msg, record.data)
if entry.is_dir():
continue
yield self._pack(entry.name, entry.date, xt)
@classmethod
def handles(cls, data):
if data[:4] in (
B'PK\x03\x04',
B'PK\x07\x08',
):
return True
if not is_likely_pe(data):
return False
memory = memoryview(data)
if 0 <= buffer_offset(memory[-0x400:], ZipDirEntry.Signature):
return True
pe = lief.load_pe_fast(data)
offset = get_pe_size(pe)
if 0 <= buffer_offset(memory[offset:], B'PK\x03\x04') < 0x1000:
return True
if not pe.has_debug:
return False
for entry in pe.debug:
if not isinstance(entry, lief.PE.CodeViewPDB):
continue
path = entry.filename
if not isinstance(path, str):
path = codecs.decode(path, 'latin1')
if 'sfxzip32' in path and 'WinRAR' in path:
return True
Classes
class xtzip (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', pwd=b'')-
Extract files from a Zip archive. This unit is a path extractor which extracts data from a hierarchical structure. Each extracted item is emitted as a separate chunk and has attached to it a meta variable that contains its path within the source structure. The positional arguments to the command are patterns that can be used to filter the extracted items by their path. To view only the paths of all chunks, use the listing switch:
emit something | xtzip --listOtherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:
emit something | xtzip [| dump {path} ]Expand source code Browse git
class xtzip(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'): """ Extract files from a Zip archive. """ @classmethod def _carver(cls): return carve_zip def unpack(self, data: buf): def trypwd(password: str | None): try: zipf = Zip(data, password) except (PasswordRequired, InvalidPassword): return None for file in zipf.records.values(): if file.is_dir(): continue if file.is_password_ok(password): break return False return zipf password = self.args.pwd if not password: password = None elif not isinstance(password, str): password = codecs.decode(password, self.codec) passwords = [password] if not password: passwords.extend(self._COMMON_PASSWORDS) for p in passwords: if zipf := trypwd(p): break else: zipf = Zip(data, password) if zipf.password: self.log_debug('Using password:', zipf.password) if boundary := zipf.coverage.boundary(): w = len(hex(boundary[1])) for start, end in zipf.coverage.gaps(): self.log_warn(F'data cave detected at range {start:#0{w}x}:{end:#0{w}x}') for entry in zipf.directory: def xt(entry=entry): record = zipf.read(entry) try: return record.unpack(zipf.password) except InvalidPassword: if not record.data: raise msg = 'invalid password; use -L to extract raw encrypted data' raise RefineryPartialResult(msg, record.data) if entry.is_dir(): continue yield self._pack(entry.name, entry.date, xt) @classmethod def handles(cls, data): if data[:4] in ( B'PK\x03\x04', B'PK\x07\x08', ): return True if not is_likely_pe(data): return False memory = memoryview(data) if 0 <= buffer_offset(memory[-0x400:], ZipDirEntry.Signature): return True pe = lief.load_pe_fast(data) offset = get_pe_size(pe) if 0 <= buffer_offset(memory[offset:], B'PK\x03\x04') < 0x1000: return True if not pe.has_debug: return False for entry in pe.debug: if not isinstance(entry, lief.PE.CodeViewPDB): continue path = entry.filename if not isinstance(path, str): path = codecs.decode(path, 'latin1') if 'sfxzip32' in path and 'WinRAR' in path: return TrueAncestors
Subclasses
Class variables
var required_dependenciesvar optional_dependenciesvar consolevar reverse
Methods
def unpack(self, data)-
Expand source code Browse git
def unpack(self, data: buf): def trypwd(password: str | None): try: zipf = Zip(data, password) except (PasswordRequired, InvalidPassword): return None for file in zipf.records.values(): if file.is_dir(): continue if file.is_password_ok(password): break return False return zipf password = self.args.pwd if not password: password = None elif not isinstance(password, str): password = codecs.decode(password, self.codec) passwords = [password] if not password: passwords.extend(self._COMMON_PASSWORDS) for p in passwords: if zipf := trypwd(p): break else: zipf = Zip(data, password) if zipf.password: self.log_debug('Using password:', zipf.password) if boundary := zipf.coverage.boundary(): w = len(hex(boundary[1])) for start, end in zipf.coverage.gaps(): self.log_warn(F'data cave detected at range {start:#0{w}x}:{end:#0{w}x}') for entry in zipf.directory: def xt(entry=entry): record = zipf.read(entry) try: return record.unpack(zipf.password) except InvalidPassword: if not record.data: raise msg = 'invalid password; use -L to extract raw encrypted data' raise RefineryPartialResult(msg, record.data) if entry.is_dir(): continue yield self._pack(entry.name, entry.date, xt)
Inherited members