Module refinery.units.pattern.carve_zip
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from refinery.units import Unit
from refinery.units.misc.datefix import datefix
from refinery.lib.structures import StructReader, Struct
class ZipEndOfCentralDirectory(Struct):
SIGNATURE = B'PK\x05\x06'
def __init__(self, reader: StructReader):
if reader.read(4) != self.SIGNATURE:
raise ValueError
self.disk_number = reader.u16()
self.start_disk_number = reader.u16()
self.entries_on_disk = reader.u16()
self.entries_in_directory = reader.u16()
self.directory_size = reader.u32()
self.directory_offset = reader.u32()
self.comment_length = reader.u16()
class ZipCentralDirectory(Struct):
SIGNATURE = B'PK\x01\x02'
def __init__(self, reader: StructReader):
if reader.read(4) != self.SIGNATURE:
raise ValueError
self.version_made_by = reader.u16()
self.version_to_extract = reader.u16()
self.flags = reader.u16()
self.compression = reader.u16()
self.date = datefix.dostime(reader.u32())
self.crc32 = reader.u32()
self.compressed_size = reader.u32()
self.decompressed_size = reader.u32()
len_filename = reader.u16()
len_extra = reader.u16()
len_comment = reader.u16()
self.disk_nr_start = reader.u16()
self.internal_attributes = reader.u16()
self.external_attributes = reader.u32()
self.header_offset = reader.u32()
self.filename = len_filename and reader.read(len_filename) or None
self.extra = len_extra and reader.read(len_extra) or None
self.comment = len_comment and reader.read(len_comment) or None
class carve_zip(Unit):
"""
Extracts anything from the input data that looks like a zip archive file.
"""
def process(self, data: bytearray):
end = len(data)
mem = memoryview(data)
rev = []
while True:
end = data.rfind(ZipEndOfCentralDirectory.SIGNATURE, 0, end)
if end < 0:
break
try:
end_marker = ZipEndOfCentralDirectory(mem[end:])
except ValueError as e:
self.log_info(F'error parsing end of central directory at 0x{end:X}: {e!s}')
continue
else:
self.log_info(F'successfully parsed end of central directory at 0x{end:X}')
start = end - end_marker.directory_size
shift = start - end_marker.directory_offset
if start < 0:
self.log_debug('end of central directory size is invalid')
continue
try:
central_directory = ZipCentralDirectory(mem[start:])
except ValueError:
self.log_debug('computed location of central directory is invalid')
end = end - len(ZipEndOfCentralDirectory.SIGNATURE)
continue
start = central_directory.header_offset + shift
if mem[start:start + 4] not in (B'PK\x03\x04', B'\0\0\0\0'):
# SFX payloads seem to have a nulled header, so we permit this.
self.log_debug('computed start of ZIP archive does not have the correct signature bytes')
continue
rev.append((start, end + len(end_marker)))
end = start
for start, end in reversed(rev):
zip = mem[start:end + len(end_marker)]
yield self.labelled(zip, offset=start)
Classes
class ZipEndOfCentralDirectory (reader)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class ZipEndOfCentralDirectory(Struct): SIGNATURE = B'PK\x05\x06' def __init__(self, reader: StructReader): if reader.read(4) != self.SIGNATURE: raise ValueError self.disk_number = reader.u16() self.start_disk_number = reader.u16() self.entries_on_disk = reader.u16() self.entries_in_directory = reader.u16() self.directory_size = reader.u32() self.directory_offset = reader.u32() self.comment_length = reader.u16()
Ancestors
Class variables
var SIGNATURE
class ZipCentralDirectory (reader)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class ZipCentralDirectory(Struct): SIGNATURE = B'PK\x01\x02' def __init__(self, reader: StructReader): if reader.read(4) != self.SIGNATURE: raise ValueError self.version_made_by = reader.u16() self.version_to_extract = reader.u16() self.flags = reader.u16() self.compression = reader.u16() self.date = datefix.dostime(reader.u32()) self.crc32 = reader.u32() self.compressed_size = reader.u32() self.decompressed_size = reader.u32() len_filename = reader.u16() len_extra = reader.u16() len_comment = reader.u16() self.disk_nr_start = reader.u16() self.internal_attributes = reader.u16() self.external_attributes = reader.u32() self.header_offset = reader.u32() self.filename = len_filename and reader.read(len_filename) or None self.extra = len_extra and reader.read(len_extra) or None self.comment = len_comment and reader.read(len_comment) or None
Ancestors
Class variables
var SIGNATURE
class carve_zip
-
Extracts anything from the input data that looks like a zip archive file.
Expand source code Browse git
class carve_zip(Unit): """ Extracts anything from the input data that looks like a zip archive file. """ def process(self, data: bytearray): end = len(data) mem = memoryview(data) rev = [] while True: end = data.rfind(ZipEndOfCentralDirectory.SIGNATURE, 0, end) if end < 0: break try: end_marker = ZipEndOfCentralDirectory(mem[end:]) except ValueError as e: self.log_info(F'error parsing end of central directory at 0x{end:X}: {e!s}') continue else: self.log_info(F'successfully parsed end of central directory at 0x{end:X}') start = end - end_marker.directory_size shift = start - end_marker.directory_offset if start < 0: self.log_debug('end of central directory size is invalid') continue try: central_directory = ZipCentralDirectory(mem[start:]) except ValueError: self.log_debug('computed location of central directory is invalid') end = end - len(ZipEndOfCentralDirectory.SIGNATURE) continue start = central_directory.header_offset + shift if mem[start:start + 4] not in (B'PK\x03\x04', B'\0\0\0\0'): # SFX payloads seem to have a nulled header, so we permit this. self.log_debug('computed start of ZIP archive does not have the correct signature bytes') continue rev.append((start, end + len(end_marker))) end = start for start, end in reversed(rev): zip = mem[start:end + len(end_marker)] yield self.labelled(zip, offset=start)
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Inherited members