Module refinery.units.pattern.carve_zip
Expand source code Browse git
from __future__ import annotations
from refinery.lib.structures import Struct, StructReader
from refinery.units import Unit
from refinery.units.misc.datefix import datefix
class ZipEndOfCentralDirectory(Struct):
SIGNATURE = B'PK\x05\x06'
def __init__(self, reader: StructReader):
if reader.read(4) != self.SIGNATURE:
raise ValueError
self.disk_number = reader.u16()
self.start_disk_number = reader.u16()
self.entries_on_disk = reader.u16()
self.entries_in_directory = reader.u16()
self.directory_size = reader.u32()
self.directory_offset = reader.u32()
self.comment_length = reader.u16()
class ZipCentralDirectory(Struct):
SIGNATURE = B'PK\x01\x02'
def __init__(self, reader: StructReader):
if reader.read(4) != self.SIGNATURE:
raise ValueError
self.version_made_by = reader.u16()
self.version_to_extract = reader.u16()
self.flags = reader.u16()
self.compression = reader.u16()
self.date = datefix.dostime(reader.u32())
self.crc32 = reader.u32()
self.compressed_size = reader.u32()
self.decompressed_size = reader.u32()
len_filename = reader.u16()
len_extra = reader.u16()
len_comment = reader.u16()
self.disk_nr_start = reader.u16()
self.internal_attributes = reader.u16()
self.external_attributes = reader.u32()
self.header_offset = reader.u32()
self.filename = len_filename and reader.read(len_filename) or None
self.extra = len_extra and reader.read(len_extra) or None
self.comment = len_comment and reader.read(len_comment) or None
class carve_zip(Unit):
"""
Extracts anything from the input data that looks like a zip archive file.
"""
def process(self, data: bytearray):
end = len(data)
mem = memoryview(data)
rev = []
while True:
end = data.rfind(ZipEndOfCentralDirectory.SIGNATURE, 0, end)
if end < 0:
break
try:
end_marker = ZipEndOfCentralDirectory(mem[end:])
except ValueError as e:
self.log_info(F'error parsing end of central directory at 0x{end:X}: {e!s}')
continue
else:
self.log_info(F'successfully parsed end of central directory at 0x{end:X}')
start = end - end_marker.directory_size
shift = start - end_marker.directory_offset
if start < 0:
self.log_debug('end of central directory size is invalid')
continue
try:
central_directory = ZipCentralDirectory(mem[start:])
except ValueError:
self.log_debug('computed location of central directory is invalid')
end = end - len(ZipEndOfCentralDirectory.SIGNATURE)
continue
start = central_directory.header_offset + shift
if mem[start:start + 4] not in (B'PK\x03\x04', B'\0\0\0\0'):
# SFX payloads seem to have a nulled header, so we permit this.
self.log_debug('computed start of ZIP archive does not have the correct signature bytes')
continue
rev.append((start, end + len(end_marker)))
end = start
for start, end in reversed(rev):
zip = mem[start:end]
yield self.labelled(zip, offset=start)
Classes
class ZipEndOfCentralDirectory (reader)-
A class to parse structured data. A
Structclass can be instantiated as follows:foo = Struct(data, bar=29)The initialization routine of the structure will be called with a single argument
reader. If the objectdatais already aStructReader, then it will be passed asreader. Otherwise, the argument will be wrapped in aStructReader. Additional arguments to the struct are passed through.Expand source code Browse git
class ZipEndOfCentralDirectory(Struct): SIGNATURE = B'PK\x05\x06' def __init__(self, reader: StructReader): if reader.read(4) != self.SIGNATURE: raise ValueError self.disk_number = reader.u16() self.start_disk_number = reader.u16() self.entries_on_disk = reader.u16() self.entries_in_directory = reader.u16() self.directory_size = reader.u32() self.directory_offset = reader.u32() self.comment_length = reader.u16()Ancestors
- Struct
- typing.Generic
Class variables
var SIGNATURE
class ZipCentralDirectory (reader)-
A class to parse structured data. A
Structclass can be instantiated as follows:foo = Struct(data, bar=29)The initialization routine of the structure will be called with a single argument
reader. If the objectdatais already aStructReader, then it will be passed asreader. Otherwise, the argument will be wrapped in aStructReader. Additional arguments to the struct are passed through.Expand source code Browse git
class ZipCentralDirectory(Struct): SIGNATURE = B'PK\x01\x02' def __init__(self, reader: StructReader): if reader.read(4) != self.SIGNATURE: raise ValueError self.version_made_by = reader.u16() self.version_to_extract = reader.u16() self.flags = reader.u16() self.compression = reader.u16() self.date = datefix.dostime(reader.u32()) self.crc32 = reader.u32() self.compressed_size = reader.u32() self.decompressed_size = reader.u32() len_filename = reader.u16() len_extra = reader.u16() len_comment = reader.u16() self.disk_nr_start = reader.u16() self.internal_attributes = reader.u16() self.external_attributes = reader.u32() self.header_offset = reader.u32() self.filename = len_filename and reader.read(len_filename) or None self.extra = len_extra and reader.read(len_extra) or None self.comment = len_comment and reader.read(len_comment) or NoneAncestors
- Struct
- typing.Generic
Class variables
var SIGNATURE
class carve_zip-
Extracts anything from the input data that looks like a zip archive file.
Expand source code Browse git
class carve_zip(Unit): """ Extracts anything from the input data that looks like a zip archive file. """ def process(self, data: bytearray): end = len(data) mem = memoryview(data) rev = [] while True: end = data.rfind(ZipEndOfCentralDirectory.SIGNATURE, 0, end) if end < 0: break try: end_marker = ZipEndOfCentralDirectory(mem[end:]) except ValueError as e: self.log_info(F'error parsing end of central directory at 0x{end:X}: {e!s}') continue else: self.log_info(F'successfully parsed end of central directory at 0x{end:X}') start = end - end_marker.directory_size shift = start - end_marker.directory_offset if start < 0: self.log_debug('end of central directory size is invalid') continue try: central_directory = ZipCentralDirectory(mem[start:]) except ValueError: self.log_debug('computed location of central directory is invalid') end = end - len(ZipEndOfCentralDirectory.SIGNATURE) continue start = central_directory.header_offset + shift if mem[start:start + 4] not in (B'PK\x03\x04', B'\0\0\0\0'): # SFX payloads seem to have a nulled header, so we permit this. self.log_debug('computed start of ZIP archive does not have the correct signature bytes') continue rev.append((start, end + len(end_marker))) end = start for start, end in reversed(rev): zip = mem[start:end] yield self.labelled(zip, offset=start)Ancestors
Subclasses
Class variables
var required_dependenciesvar optional_dependenciesvar consolevar reverse
Inherited members