Module refinery.units.pattern.carve_png
Expand source code Browse git
from __future__ import annotations
import zlib
from refinery.lib.structures import StructReader
from refinery.units import Unit
class carve_png(Unit):
"""
Extracts anything from the input data that looks like a PNG image file.
"""
def process(self, data: bytearray):
memory = memoryview(data)
stream = StructReader(data, bigendian=True)
offset = 0
while (p := data.find(B'\x89PNG\r\n\x1A\n', offset)) > 0:
stream.seekset(start := p)
success = True
stream.skip(8)
try:
while success:
size = stream.u32()
data = stream.read_exactly(4 + size)
crc32r = stream.u32()
crc32c = zlib.crc32(data) & 0xFFFFFFFF
self.log_debug(F'{p:#x}: chunk of size {size:#010x}, crc32={crc32c:08X}, check={crc32r:08X}', data[:4])
if crc32r != crc32c:
self.log_info(F'{p:#x}: rejecting, invalid checksum on chunk')
success = False
elif data[:4] == B'IEND':
self.log_info(F'{p:#x}: accepting, reached the end')
break
elif data[:4] not in (
b'IHDR',
B'PLTE',
B'IDAT',
B'bKGD',
B'cHRM',
B'cICP',
B'dSIG',
B'eXIf',
B'gAMA',
B'hIST',
B'iCCP',
B'iTXt',
B'pHYs',
B'sBIT',
B'sPLT',
B'sRGB',
B'sTER',
B'tEXt',
B'tIME',
B'tRNS',
B'zTXt',
):
self.log_info(F'{p:#x}: rejecting, invalid header type', bytes(data[:4]))
success = False
except Exception as e:
self.log_info(F'{p:#x}: rejecting, exception:', e)
success = False
if success:
offset = stream.tell()
yield self.labelled(memory[start:offset], offset=start)
else:
offset = p + 1
Classes
class carve_png-
Extracts anything from the input data that looks like a PNG image file.
Expand source code Browse git
class carve_png(Unit): """ Extracts anything from the input data that looks like a PNG image file. """ def process(self, data: bytearray): memory = memoryview(data) stream = StructReader(data, bigendian=True) offset = 0 while (p := data.find(B'\x89PNG\r\n\x1A\n', offset)) > 0: stream.seekset(start := p) success = True stream.skip(8) try: while success: size = stream.u32() data = stream.read_exactly(4 + size) crc32r = stream.u32() crc32c = zlib.crc32(data) & 0xFFFFFFFF self.log_debug(F'{p:#x}: chunk of size {size:#010x}, crc32={crc32c:08X}, check={crc32r:08X}', data[:4]) if crc32r != crc32c: self.log_info(F'{p:#x}: rejecting, invalid checksum on chunk') success = False elif data[:4] == B'IEND': self.log_info(F'{p:#x}: accepting, reached the end') break elif data[:4] not in ( b'IHDR', B'PLTE', B'IDAT', B'bKGD', B'cHRM', B'cICP', B'dSIG', B'eXIf', B'gAMA', B'hIST', B'iCCP', B'iTXt', B'pHYs', B'sBIT', B'sPLT', B'sRGB', B'sTER', B'tEXt', B'tIME', B'tRNS', B'zTXt', ): self.log_info(F'{p:#x}: rejecting, invalid header type', bytes(data[:4])) success = False except Exception as e: self.log_info(F'{p:#x}: rejecting, exception:', e) success = False if success: offset = stream.tell() yield self.labelled(memory[start:offset], offset=start) else: offset = p + 1Ancestors
Subclasses
Inherited members