Module refinery.units.formats.jpeg
Expand source code Browse git
from __future__ import annotations
import enum
import re
from refinery.lib import json
from refinery.lib.id import Fmt, get_image_format
from refinery.lib.structures import Struct, StructReader, struct_to_json
from refinery.units.formats import PathExtractorUnit, UnpackResult
class JpegMarker(int, enum.Enum):
StartOfImage = 0xD8
EndOfImage = 0xD9
StartOfScan = 0xDA
DCTBaseline = 0xC0
DCTProgressive = 0xC2
DefineQuantizationTable = 0xDB
DefineHuffmanTable = 0xC4
DefineRestartInterval = 0xDD
Comment = 0xFE
RST0 = 0xD0
RST1 = 0xD1
RST2 = 0xD2
RST3 = 0xD3
RST4 = 0xD4
RST5 = 0xD5
RST6 = 0xD6
RST7 = 0xD7
App00 = 0xE0
App01 = 0xE1
App02 = 0xE2
App03 = 0xE3
App04 = 0xE4
App05 = 0xE5
App06 = 0xE6
App07 = 0xE7
App08 = 0xE8
App09 = 0xE9
App10 = 0xEA
App11 = 0xEB
App12 = 0xEC
App13 = 0xED
App14 = 0xEE
App15 = 0xEF
SOI = StartOfImage
EOI = EndOfImage
SOS = StartOfScan
DQT = DefineQuantizationTable
DHT = DefineHuffmanTable
DRI = DefineRestartInterval
COM = Comment
SOF0 = DCTBaseline
SOF2 = DCTProgressive
class JpegSOFComponent(Struct):
def __init__(self, reader: StructReader):
self.id = reader.u8()
hv = reader.u8()
self.hs = hv >> 4
self.vs = hv & 15
self.qt = reader.u8()
class JpegSOF(Struct):
def __init__(self, reader: StructReader):
self.precision = reader.u8()
self.height = reader.u16()
self.width = reader.u16()
self.components = [JpegSOFComponent(reader) for _ in range(reader.u8())]
class JpegStream(Struct):
def __init__(self, reader: StructReader):
self.offset = reader.tell()
if (h := reader.u8()) != 0xFF:
raise ValueError(F'Invalid magic byte {h:#04x} at start of stream.')
self.type = t = JpegMarker(reader.u8())
if t in range(0xD0, 0xDA):
self.size = 0
self.data = b''
self.scan = b''
else:
self.size = reader.u16() - 2
if self.size < 0:
raise ValueError(F'Invalid size {self.size}.')
self.data = reader.read_exactly(self.size)
if t == JpegMarker.StartOfScan:
eos = re.search(br'\xFF(?!\0)', reader.peek())
if eos is None:
raise ValueError('Could not find end of stream data.')
self.scan = reader.read(eos.start())
else:
self.scan = b''
class Jpeg(Struct):
def __init__(self, reader: StructReader):
self.streams: list[JpegStream] = []
self.sof = None
self.scandata: list[memoryview] = []
self.scans: list[JpegStream] = []
self.comments: list[JpegStream] = []
self.meta: list[JpegStream] = []
reader.bigendian = True
while not reader.eof:
stream = JpegStream(reader)
self.streams.append(stream)
if stream.type in (JpegMarker.SOF0, JpegMarker.SOF2):
if self.sof is not None:
raise ValueError('Duplicate SOF Stream in File.')
self.sof = JpegSOF.Parse(stream.data)
elif stream.type == JpegMarker.StartOfScan:
self.scans.append(stream)
elif stream.type == JpegMarker.Comment:
self.comments.append(stream)
elif stream.type in range(JpegMarker.App00, JpegMarker.App15 + 1):
self.meta.append(stream)
class jpeg(PathExtractorUnit):
"""
Extract the raw segments from a JPG image.
"""
def unpack(self, data):
jpg = Jpeg.Parse(data)
for k, stream in enumerate(jpg.streams):
yield UnpackResult(F'streams/{k}.{stream.type.name}', stream.__buffer__(0))
for k, comment in enumerate(jpg.comments):
yield UnpackResult(F'comments/{k}', comment.data)
for k, scan in enumerate(jpg.scans):
yield UnpackResult(F'scans/{k}', scan.scan)
for k, meta in enumerate(jpg.meta):
extension = {
JpegMarker.App00: '.jfif',
JpegMarker.App01: '.exif',
JpegMarker.App02: '.iccp',
}.get((t := meta.type), '')
yield UnpackResult(F'meta/{k}.{t.name.lower()}{extension}', meta.data)
if sof := jpg.sof:
yield UnpackResult('meta/dimensions.json', json.dumps(struct_to_json(sof)))
@classmethod
def handles(cls, data) -> bool:
return get_image_format(data) == Fmt.JPG
Classes
class JpegMarker (*args, **kwds)-
int([x]) -> integer int(x, base=10) -> integer
Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating-point numbers, this truncates towards zero.
If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.
>>> int('0b100', base=0) 4Expand source code Browse git
class JpegMarker(int, enum.Enum): StartOfImage = 0xD8 EndOfImage = 0xD9 StartOfScan = 0xDA DCTBaseline = 0xC0 DCTProgressive = 0xC2 DefineQuantizationTable = 0xDB DefineHuffmanTable = 0xC4 DefineRestartInterval = 0xDD Comment = 0xFE RST0 = 0xD0 RST1 = 0xD1 RST2 = 0xD2 RST3 = 0xD3 RST4 = 0xD4 RST5 = 0xD5 RST6 = 0xD6 RST7 = 0xD7 App00 = 0xE0 App01 = 0xE1 App02 = 0xE2 App03 = 0xE3 App04 = 0xE4 App05 = 0xE5 App06 = 0xE6 App07 = 0xE7 App08 = 0xE8 App09 = 0xE9 App10 = 0xEA App11 = 0xEB App12 = 0xEC App13 = 0xED App14 = 0xEE App15 = 0xEF SOI = StartOfImage EOI = EndOfImage SOS = StartOfScan DQT = DefineQuantizationTable DHT = DefineHuffmanTable DRI = DefineRestartInterval COM = Comment SOF0 = DCTBaseline SOF2 = DCTProgressiveAncestors
- builtins.int
- enum.Enum
Class variables
var StartOfImage-
The type of the None singleton.
var EndOfImage-
The type of the None singleton.
var StartOfScan-
The type of the None singleton.
var DCTBaseline-
The type of the None singleton.
var DCTProgressive-
The type of the None singleton.
var DefineQuantizationTable-
The type of the None singleton.
var DefineHuffmanTable-
The type of the None singleton.
var DefineRestartInterval-
The type of the None singleton.
var Comment-
The type of the None singleton.
var RST0-
The type of the None singleton.
var RST1-
The type of the None singleton.
var RST2-
The type of the None singleton.
var RST3-
The type of the None singleton.
var RST4-
The type of the None singleton.
var RST5-
The type of the None singleton.
var RST6-
The type of the None singleton.
var RST7-
The type of the None singleton.
var App00-
The type of the None singleton.
var App01-
The type of the None singleton.
var App02-
The type of the None singleton.
var App03-
The type of the None singleton.
var App04-
The type of the None singleton.
var App05-
The type of the None singleton.
var App06-
The type of the None singleton.
var App07-
The type of the None singleton.
var App08-
The type of the None singleton.
var App09-
The type of the None singleton.
var App10-
The type of the None singleton.
var App11-
The type of the None singleton.
var App12-
The type of the None singleton.
var App13-
The type of the None singleton.
var App14-
The type of the None singleton.
var App15-
The type of the None singleton.
var SOI-
The type of the None singleton.
var EOI-
The type of the None singleton.
var SOS-
The type of the None singleton.
var DQT-
The type of the None singleton.
var DHT-
The type of the None singleton.
var DRI-
The type of the None singleton.
var COM-
The type of the None singleton.
var SOF0-
The type of the None singleton.
var SOF2-
The type of the None singleton.
class JpegSOFComponent (reader)-
A class to parse structured data. A
Structclass can be instantiated as follows:foo = Struct(data, bar=29)The initialization routine of the structure will be called with a single argument
reader. If the objectdatais already aStructReader, then it will be passed asreader. Otherwise, the argument will be wrapped in aStructReader. Additional arguments to the struct are passed through.Expand source code Browse git
class JpegSOFComponent(Struct): def __init__(self, reader: StructReader): self.id = reader.u8() hv = reader.u8() self.hs = hv >> 4 self.vs = hv & 15 self.qt = reader.u8()Ancestors
- Struct
- typing.Generic
- collections.abc.Buffer
Static methods
def Parse(reader, *args, **kwargs)
class JpegSOF (reader)-
A class to parse structured data. A
Structclass can be instantiated as follows:foo = Struct(data, bar=29)The initialization routine of the structure will be called with a single argument
reader. If the objectdatais already aStructReader, then it will be passed asreader. Otherwise, the argument will be wrapped in aStructReader. Additional arguments to the struct are passed through.Expand source code Browse git
class JpegSOF(Struct): def __init__(self, reader: StructReader): self.precision = reader.u8() self.height = reader.u16() self.width = reader.u16() self.components = [JpegSOFComponent(reader) for _ in range(reader.u8())]Ancestors
- Struct
- typing.Generic
- collections.abc.Buffer
Static methods
def Parse(reader, *args, **kwargs)
class JpegStream (reader)-
A class to parse structured data. A
Structclass can be instantiated as follows:foo = Struct(data, bar=29)The initialization routine of the structure will be called with a single argument
reader. If the objectdatais already aStructReader, then it will be passed asreader. Otherwise, the argument will be wrapped in aStructReader. Additional arguments to the struct are passed through.Expand source code Browse git
class JpegStream(Struct): def __init__(self, reader: StructReader): self.offset = reader.tell() if (h := reader.u8()) != 0xFF: raise ValueError(F'Invalid magic byte {h:#04x} at start of stream.') self.type = t = JpegMarker(reader.u8()) if t in range(0xD0, 0xDA): self.size = 0 self.data = b'' self.scan = b'' else: self.size = reader.u16() - 2 if self.size < 0: raise ValueError(F'Invalid size {self.size}.') self.data = reader.read_exactly(self.size) if t == JpegMarker.StartOfScan: eos = re.search(br'\xFF(?!\0)', reader.peek()) if eos is None: raise ValueError('Could not find end of stream data.') self.scan = reader.read(eos.start()) else: self.scan = b''Ancestors
- Struct
- typing.Generic
- collections.abc.Buffer
Static methods
def Parse(reader, *args, **kwargs)
class Jpeg (reader)-
A class to parse structured data. A
Structclass can be instantiated as follows:foo = Struct(data, bar=29)The initialization routine of the structure will be called with a single argument
reader. If the objectdatais already aStructReader, then it will be passed asreader. Otherwise, the argument will be wrapped in aStructReader. Additional arguments to the struct are passed through.Expand source code Browse git
class Jpeg(Struct): def __init__(self, reader: StructReader): self.streams: list[JpegStream] = [] self.sof = None self.scandata: list[memoryview] = [] self.scans: list[JpegStream] = [] self.comments: list[JpegStream] = [] self.meta: list[JpegStream] = [] reader.bigendian = True while not reader.eof: stream = JpegStream(reader) self.streams.append(stream) if stream.type in (JpegMarker.SOF0, JpegMarker.SOF2): if self.sof is not None: raise ValueError('Duplicate SOF Stream in File.') self.sof = JpegSOF.Parse(stream.data) elif stream.type == JpegMarker.StartOfScan: self.scans.append(stream) elif stream.type == JpegMarker.Comment: self.comments.append(stream) elif stream.type in range(JpegMarker.App00, JpegMarker.App15 + 1): self.meta.append(stream)Ancestors
- Struct
- typing.Generic
- collections.abc.Buffer
Static methods
def Parse(reader, *args, **kwargs)
class jpeg (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path')-
Extract the raw segments from a JPG image.
Expand source code Browse git
class jpeg(PathExtractorUnit): """ Extract the raw segments from a JPG image. """ def unpack(self, data): jpg = Jpeg.Parse(data) for k, stream in enumerate(jpg.streams): yield UnpackResult(F'streams/{k}.{stream.type.name}', stream.__buffer__(0)) for k, comment in enumerate(jpg.comments): yield UnpackResult(F'comments/{k}', comment.data) for k, scan in enumerate(jpg.scans): yield UnpackResult(F'scans/{k}', scan.scan) for k, meta in enumerate(jpg.meta): extension = { JpegMarker.App00: '.jfif', JpegMarker.App01: '.exif', JpegMarker.App02: '.iccp', }.get((t := meta.type), '') yield UnpackResult(F'meta/{k}.{t.name.lower()}{extension}', meta.data) if sof := jpg.sof: yield UnpackResult('meta/dimensions.json', json.dumps(struct_to_json(sof))) @classmethod def handles(cls, data) -> bool: return get_image_format(data) == Fmt.JPGAncestors
Subclasses
Methods
def unpack(self, data)-
Expand source code Browse git
def unpack(self, data): jpg = Jpeg.Parse(data) for k, stream in enumerate(jpg.streams): yield UnpackResult(F'streams/{k}.{stream.type.name}', stream.__buffer__(0)) for k, comment in enumerate(jpg.comments): yield UnpackResult(F'comments/{k}', comment.data) for k, scan in enumerate(jpg.scans): yield UnpackResult(F'scans/{k}', scan.scan) for k, meta in enumerate(jpg.meta): extension = { JpegMarker.App00: '.jfif', JpegMarker.App01: '.exif', JpegMarker.App02: '.iccp', }.get((t := meta.type), '') yield UnpackResult(F'meta/{k}.{t.name.lower()}{extension}', meta.data) if sof := jpg.sof: yield UnpackResult('meta/dimensions.json', json.dumps(struct_to_json(sof)))
Inherited members
PathExtractorUnit:CustomJoinBehaviourCustomPathSeparatorFilterEverythingRequiresassemblecodecconsolefilterfinishhandlesis_quietis_reversiblelabelledleniencylog_alwayslog_debuglog_detachlog_faillog_infolog_levellog_warnloggernamenozzleoptional_dependenciesreadread1required_dependenciesreverserunsourcesuperinit
UnitBase: