Module refinery.units.formats.jpeg

Expand source code Browse git
from __future__ import annotations

import enum
import re

from refinery.lib import json
from refinery.lib.id import Fmt, get_image_format
from refinery.lib.structures import Struct, StructReader, struct_to_json
from refinery.units.formats import PathExtractorUnit, UnpackResult


class JpegMarker(int, enum.Enum):

    StartOfImage = 0xD8
    EndOfImage = 0xD9
    StartOfScan = 0xDA
    DCTBaseline = 0xC0
    DCTProgressive = 0xC2
    DefineQuantizationTable = 0xDB
    DefineHuffmanTable = 0xC4
    DefineRestartInterval = 0xDD
    Comment = 0xFE

    RST0 = 0xD0
    RST1 = 0xD1
    RST2 = 0xD2
    RST3 = 0xD3
    RST4 = 0xD4
    RST5 = 0xD5
    RST6 = 0xD6
    RST7 = 0xD7

    App00 = 0xE0
    App01 = 0xE1
    App02 = 0xE2
    App03 = 0xE3
    App04 = 0xE4
    App05 = 0xE5
    App06 = 0xE6
    App07 = 0xE7
    App08 = 0xE8
    App09 = 0xE9
    App10 = 0xEA
    App11 = 0xEB
    App12 = 0xEC
    App13 = 0xED
    App14 = 0xEE
    App15 = 0xEF

    SOI = StartOfImage
    EOI = EndOfImage
    SOS = StartOfScan
    DQT = DefineQuantizationTable
    DHT = DefineHuffmanTable
    DRI = DefineRestartInterval
    COM = Comment
    SOF0 = DCTBaseline
    SOF2 = DCTProgressive


class JpegSOFComponent(Struct):
    def __init__(self, reader: StructReader):
        self.id = reader.u8()
        hv = reader.u8()
        self.hs = hv >> 4
        self.vs = hv & 15
        self.qt = reader.u8()


class JpegSOF(Struct):
    def __init__(self, reader: StructReader):
        self.precision = reader.u8()
        self.height = reader.u16()
        self.width = reader.u16()
        self.components = [JpegSOFComponent(reader) for _ in range(reader.u8())]


class JpegStream(Struct):
    def __init__(self, reader: StructReader):
        self.offset = reader.tell()
        if (h := reader.u8()) != 0xFF:
            raise ValueError(F'Invalid magic byte {h:#04x} at start of stream.')
        self.type = t = JpegMarker(reader.u8())
        if t in range(0xD0, 0xDA):
            self.size = 0
            self.data = b''
            self.scan = b''
        else:
            self.size = reader.u16() - 2
            if self.size < 0:
                raise ValueError(F'Invalid size {self.size}.')
            self.data = reader.read_exactly(self.size)
            if t == JpegMarker.StartOfScan:
                eos = re.search(br'\xFF(?!\0)', reader.peek())
                if eos is None:
                    raise ValueError('Could not find end of stream data.')
                self.scan = reader.read(eos.start())
            else:
                self.scan = b''


class Jpeg(Struct):
    def __init__(self, reader: StructReader):
        self.streams: list[JpegStream] = []
        self.sof = None
        self.scandata: list[memoryview] = []
        self.scans: list[JpegStream] = []
        self.comments: list[JpegStream] = []
        self.meta: list[JpegStream] = []

        reader.bigendian = True

        while not reader.eof:
            stream = JpegStream(reader)
            self.streams.append(stream)
            if stream.type in (JpegMarker.SOF0, JpegMarker.SOF2):
                if self.sof is not None:
                    raise ValueError('Duplicate SOF Stream in File.')
                self.sof = JpegSOF.Parse(stream.data)
            elif stream.type == JpegMarker.StartOfScan:
                self.scans.append(stream)
            elif stream.type == JpegMarker.Comment:
                self.comments.append(stream)
            elif stream.type in range(JpegMarker.App00, JpegMarker.App15 + 1):
                self.meta.append(stream)


class jpeg(PathExtractorUnit):
    """
    Extract the raw segments from a JPG image.
    """
    def unpack(self, data):
        jpg = Jpeg.Parse(data)
        for k, stream in enumerate(jpg.streams):
            yield UnpackResult(F'streams/{k}.{stream.type.name}', stream.__buffer__(0))
        for k, comment in enumerate(jpg.comments):
            yield UnpackResult(F'comments/{k}', comment.data)
        for k, scan in enumerate(jpg.scans):
            yield UnpackResult(F'scans/{k}', scan.scan)
        for k, meta in enumerate(jpg.meta):
            extension = {
                JpegMarker.App00: '.jfif',
                JpegMarker.App01: '.exif',
                JpegMarker.App02: '.iccp',
            }.get((t := meta.type), '')
            yield UnpackResult(F'meta/{k}.{t.name.lower()}{extension}', meta.data)
        if sof := jpg.sof:
            yield UnpackResult('meta/dimensions.json', json.dumps(struct_to_json(sof)))

    @classmethod
    def handles(cls, data) -> bool:
        return get_image_format(data) == Fmt.JPG

Classes

class JpegMarker (*args, **kwds)

int([x]) -> integer int(x, base=10) -> integer

Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating-point numbers, this truncates towards zero.

If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.

>>> int('0b100', base=0)
4
Expand source code Browse git
class JpegMarker(int, enum.Enum):

    StartOfImage = 0xD8
    EndOfImage = 0xD9
    StartOfScan = 0xDA
    DCTBaseline = 0xC0
    DCTProgressive = 0xC2
    DefineQuantizationTable = 0xDB
    DefineHuffmanTable = 0xC4
    DefineRestartInterval = 0xDD
    Comment = 0xFE

    RST0 = 0xD0
    RST1 = 0xD1
    RST2 = 0xD2
    RST3 = 0xD3
    RST4 = 0xD4
    RST5 = 0xD5
    RST6 = 0xD6
    RST7 = 0xD7

    App00 = 0xE0
    App01 = 0xE1
    App02 = 0xE2
    App03 = 0xE3
    App04 = 0xE4
    App05 = 0xE5
    App06 = 0xE6
    App07 = 0xE7
    App08 = 0xE8
    App09 = 0xE9
    App10 = 0xEA
    App11 = 0xEB
    App12 = 0xEC
    App13 = 0xED
    App14 = 0xEE
    App15 = 0xEF

    SOI = StartOfImage
    EOI = EndOfImage
    SOS = StartOfScan
    DQT = DefineQuantizationTable
    DHT = DefineHuffmanTable
    DRI = DefineRestartInterval
    COM = Comment
    SOF0 = DCTBaseline
    SOF2 = DCTProgressive

Ancestors

  • builtins.int
  • enum.Enum

Class variables

var StartOfImage

The type of the None singleton.

var EndOfImage

The type of the None singleton.

var StartOfScan

The type of the None singleton.

var DCTBaseline

The type of the None singleton.

var DCTProgressive

The type of the None singleton.

var DefineQuantizationTable

The type of the None singleton.

var DefineHuffmanTable

The type of the None singleton.

var DefineRestartInterval

The type of the None singleton.

var Comment

The type of the None singleton.

var RST0

The type of the None singleton.

var RST1

The type of the None singleton.

var RST2

The type of the None singleton.

var RST3

The type of the None singleton.

var RST4

The type of the None singleton.

var RST5

The type of the None singleton.

var RST6

The type of the None singleton.

var RST7

The type of the None singleton.

var App00

The type of the None singleton.

var App01

The type of the None singleton.

var App02

The type of the None singleton.

var App03

The type of the None singleton.

var App04

The type of the None singleton.

var App05

The type of the None singleton.

var App06

The type of the None singleton.

var App07

The type of the None singleton.

var App08

The type of the None singleton.

var App09

The type of the None singleton.

var App10

The type of the None singleton.

var App11

The type of the None singleton.

var App12

The type of the None singleton.

var App13

The type of the None singleton.

var App14

The type of the None singleton.

var App15

The type of the None singleton.

var SOI

The type of the None singleton.

var EOI

The type of the None singleton.

var SOS

The type of the None singleton.

var DQT

The type of the None singleton.

var DHT

The type of the None singleton.

var DRI

The type of the None singleton.

var COM

The type of the None singleton.

var SOF0

The type of the None singleton.

var SOF2

The type of the None singleton.

class JpegSOFComponent (reader)

A class to parse structured data. A Struct class can be instantiated as follows:

foo = Struct(data, bar=29)

The initialization routine of the structure will be called with a single argument reader. If the object data is already a StructReader, then it will be passed as reader. Otherwise, the argument will be wrapped in a StructReader. Additional arguments to the struct are passed through.

Expand source code Browse git
class JpegSOFComponent(Struct):
    def __init__(self, reader: StructReader):
        self.id = reader.u8()
        hv = reader.u8()
        self.hs = hv >> 4
        self.vs = hv & 15
        self.qt = reader.u8()

Ancestors

  • Struct
  • typing.Generic
  • collections.abc.Buffer

Static methods

def Parse(reader, *args, **kwargs)
class JpegSOF (reader)

A class to parse structured data. A Struct class can be instantiated as follows:

foo = Struct(data, bar=29)

The initialization routine of the structure will be called with a single argument reader. If the object data is already a StructReader, then it will be passed as reader. Otherwise, the argument will be wrapped in a StructReader. Additional arguments to the struct are passed through.

Expand source code Browse git
class JpegSOF(Struct):
    def __init__(self, reader: StructReader):
        self.precision = reader.u8()
        self.height = reader.u16()
        self.width = reader.u16()
        self.components = [JpegSOFComponent(reader) for _ in range(reader.u8())]

Ancestors

  • Struct
  • typing.Generic
  • collections.abc.Buffer

Static methods

def Parse(reader, *args, **kwargs)
class JpegStream (reader)

A class to parse structured data. A Struct class can be instantiated as follows:

foo = Struct(data, bar=29)

The initialization routine of the structure will be called with a single argument reader. If the object data is already a StructReader, then it will be passed as reader. Otherwise, the argument will be wrapped in a StructReader. Additional arguments to the struct are passed through.

Expand source code Browse git
class JpegStream(Struct):
    def __init__(self, reader: StructReader):
        self.offset = reader.tell()
        if (h := reader.u8()) != 0xFF:
            raise ValueError(F'Invalid magic byte {h:#04x} at start of stream.')
        self.type = t = JpegMarker(reader.u8())
        if t in range(0xD0, 0xDA):
            self.size = 0
            self.data = b''
            self.scan = b''
        else:
            self.size = reader.u16() - 2
            if self.size < 0:
                raise ValueError(F'Invalid size {self.size}.')
            self.data = reader.read_exactly(self.size)
            if t == JpegMarker.StartOfScan:
                eos = re.search(br'\xFF(?!\0)', reader.peek())
                if eos is None:
                    raise ValueError('Could not find end of stream data.')
                self.scan = reader.read(eos.start())
            else:
                self.scan = b''

Ancestors

  • Struct
  • typing.Generic
  • collections.abc.Buffer

Static methods

def Parse(reader, *args, **kwargs)
class Jpeg (reader)

A class to parse structured data. A Struct class can be instantiated as follows:

foo = Struct(data, bar=29)

The initialization routine of the structure will be called with a single argument reader. If the object data is already a StructReader, then it will be passed as reader. Otherwise, the argument will be wrapped in a StructReader. Additional arguments to the struct are passed through.

Expand source code Browse git
class Jpeg(Struct):
    def __init__(self, reader: StructReader):
        self.streams: list[JpegStream] = []
        self.sof = None
        self.scandata: list[memoryview] = []
        self.scans: list[JpegStream] = []
        self.comments: list[JpegStream] = []
        self.meta: list[JpegStream] = []

        reader.bigendian = True

        while not reader.eof:
            stream = JpegStream(reader)
            self.streams.append(stream)
            if stream.type in (JpegMarker.SOF0, JpegMarker.SOF2):
                if self.sof is not None:
                    raise ValueError('Duplicate SOF Stream in File.')
                self.sof = JpegSOF.Parse(stream.data)
            elif stream.type == JpegMarker.StartOfScan:
                self.scans.append(stream)
            elif stream.type == JpegMarker.Comment:
                self.comments.append(stream)
            elif stream.type in range(JpegMarker.App00, JpegMarker.App15 + 1):
                self.meta.append(stream)

Ancestors

  • Struct
  • typing.Generic
  • collections.abc.Buffer

Static methods

def Parse(reader, *args, **kwargs)
class jpeg (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path')

Extract the raw segments from a JPG image.

Expand source code Browse git
class jpeg(PathExtractorUnit):
    """
    Extract the raw segments from a JPG image.
    """
    def unpack(self, data):
        jpg = Jpeg.Parse(data)
        for k, stream in enumerate(jpg.streams):
            yield UnpackResult(F'streams/{k}.{stream.type.name}', stream.__buffer__(0))
        for k, comment in enumerate(jpg.comments):
            yield UnpackResult(F'comments/{k}', comment.data)
        for k, scan in enumerate(jpg.scans):
            yield UnpackResult(F'scans/{k}', scan.scan)
        for k, meta in enumerate(jpg.meta):
            extension = {
                JpegMarker.App00: '.jfif',
                JpegMarker.App01: '.exif',
                JpegMarker.App02: '.iccp',
            }.get((t := meta.type), '')
            yield UnpackResult(F'meta/{k}.{t.name.lower()}{extension}', meta.data)
        if sof := jpg.sof:
            yield UnpackResult('meta/dimensions.json', json.dumps(struct_to_json(sof)))

    @classmethod
    def handles(cls, data) -> bool:
        return get_image_format(data) == Fmt.JPG

Ancestors

Subclasses

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data):
    jpg = Jpeg.Parse(data)
    for k, stream in enumerate(jpg.streams):
        yield UnpackResult(F'streams/{k}.{stream.type.name}', stream.__buffer__(0))
    for k, comment in enumerate(jpg.comments):
        yield UnpackResult(F'comments/{k}', comment.data)
    for k, scan in enumerate(jpg.scans):
        yield UnpackResult(F'scans/{k}', scan.scan)
    for k, meta in enumerate(jpg.meta):
        extension = {
            JpegMarker.App00: '.jfif',
            JpegMarker.App01: '.exif',
            JpegMarker.App02: '.iccp',
        }.get((t := meta.type), '')
        yield UnpackResult(F'meta/{k}.{t.name.lower()}{extension}', meta.data)
    if sof := jpg.sof:
        yield UnpackResult('meta/dimensions.json', json.dumps(struct_to_json(sof)))

Inherited members