Module refinery.lib.cbor

Implements parsing of CBOR (Concise Binary Object Representation) data as specified in RFC 8949.

Expand source code Browse git
"""
Implements parsing of CBOR (Concise Binary Object Representation) data as specified in RFC 8949.
"""
from __future__ import annotations

import math
import struct

from refinery.lib.structures import StructReader


class CBORReader(StructReader[memoryview]):
    """
    A reader for CBOR-encoded data. CBOR encodes each data item as an initial byte whose high
    3 bits indicate the major type and whose low 5 bits provide additional information (the
    argument), optionally followed by content bytes. This reader decodes all major types:

    - `0`: unsigned integer
    - `1`: negative integer
    - `2`: byte string
    - `3`: text string (UTF-8)
    - `4`: array
    - `5`: map
    - `6`: semantic tag
    - `7`: simple values and floating-point numbers
    """

    BREAK = object()

    def read_argument(self, additional: int) -> int:
        """
        Decode the argument value from the 5-bit additional information field.
        """
        if additional <= 23:
            return additional
        if additional == 24:
            return self.u8()
        if additional == 25:
            return self.u16()
        if additional == 26:
            return self.u32()
        if additional == 27:
            return self.u64()
        raise ValueError(F'Invalid additional information value: {additional}')

    @staticmethod
    def _json_float(value: float):
        if math.isnan(value):
            return 'NaN'
        if math.isinf(value):
            return '-Infinity' if value < 0 else 'Infinity'
        return value

    def read_float16(self) -> float | str:
        """
        Decode an IEEE 754 half-precision (16-bit) floating-point value.
        """
        data = self.read_exactly(2)
        value, = struct.unpack('>e', data)
        return self._json_float(value)

    def read_item(self):
        """
        Read and decode a single CBOR data item. Returns the decoded Python object.
        """
        ib = self.u8()
        major = ib >> 5
        additional = ib & 0x1F

        if major == 0:
            return self.read_argument(additional)

        if major == 1:
            return -1 - self.read_argument(additional)

        if major == 2:
            if additional == 31:
                chunks = []
                while True:
                    item = self.read_item()
                    if item is self.BREAK:
                        break
                    if not isinstance(item, (bytes, bytearray, memoryview)):
                        raise ValueError('Indefinite-length byte string contains non-byte-string chunk.')
                    chunks.append(item)
                return b''.join(bytes(c) for c in chunks)
            length = self.read_argument(additional)
            return bytes(self.read_exactly(length))

        if major == 3:
            if additional == 31:
                chunks = []
                while True:
                    item = self.read_item()
                    if item is self.BREAK:
                        break
                    if not isinstance(item, str):
                        raise ValueError('Indefinite-length text string contains non-text-string chunk.')
                    chunks.append(item)
                return ''.join(chunks)
            length = self.read_argument(additional)
            return bytes(self.read_exactly(length)).decode('utf-8')

        if major == 4:
            if additional == 31:
                items = []
                while True:
                    item = self.read_item()
                    if item is self.BREAK:
                        break
                    items.append(item)
                return items
            count = self.read_argument(additional)
            return [self.read_item() for _ in range(count)]

        if major == 5:
            if additional == 31:
                pairs = {}
                while True:
                    key = self.read_item()
                    if key is self.BREAK:
                        break
                    value = self.read_item()
                    pairs[key] = value
                return pairs
            count = self.read_argument(additional)
            pairs = {}
            for _ in range(count):
                key = self.read_item()
                value = self.read_item()
                pairs[key] = value
            return pairs

        if major == 6:
            tag_number = self.read_argument(additional)
            content = self.read_item()
            return self._decode_tagged(tag_number, content)

        if major == 7:
            if additional == 31:
                return self.BREAK
            if additional <= 23:
                return self._decode_simple(additional)
            if additional == 24:
                return self._decode_simple(self.u8())
            if additional == 25:
                return self.read_float16()
            if additional == 26:
                return self._json_float(self.f32())
            if additional == 27:
                return self._json_float(self.f64())
            raise ValueError(F'Invalid additional information for major type 7: {additional}')

        raise ValueError(F'Unknown major type: {major}')

    @staticmethod
    def _decode_simple(value: int):
        if value == 20:
            return False
        if value == 21:
            return True
        if value == 22:
            return None
        if value == 23:
            return None
        return F'simple({value})'

    @staticmethod
    def _decode_tagged(tag: int, content):
        """
        Attempt to interpret well-known tags. For tag 2 and 3 (bignums), the content is decoded
        into an integer. Other tags are returned as a descriptive dictionary.
        """
        if tag == 2 and isinstance(content, (bytes, bytearray, memoryview)):
            return int.from_bytes(content, 'big')
        if tag == 3 and isinstance(content, (bytes, bytearray, memoryview)):
            return -1 - int.from_bytes(content, 'big')
        return {'tag': tag, 'value': content}

Classes

class CBORReader (data, bigendian=None)

A reader for CBOR-encoded data. CBOR encodes each data item as an initial byte whose high 3 bits indicate the major type and whose low 5 bits provide additional information (the argument), optionally followed by content bytes. This reader decodes all major types:

  • 0: unsigned integer
  • 1: negative integer
  • 2: byte string
  • 3: text string (UTF-8)
  • 4: array
  • 5: map
  • 6: semantic tag
  • 7: simple values and floating-point numbers
Expand source code Browse git
class CBORReader(StructReader[memoryview]):
    """
    A reader for CBOR-encoded data. CBOR encodes each data item as an initial byte whose high
    3 bits indicate the major type and whose low 5 bits provide additional information (the
    argument), optionally followed by content bytes. This reader decodes all major types:

    - `0`: unsigned integer
    - `1`: negative integer
    - `2`: byte string
    - `3`: text string (UTF-8)
    - `4`: array
    - `5`: map
    - `6`: semantic tag
    - `7`: simple values and floating-point numbers
    """

    BREAK = object()

    def read_argument(self, additional: int) -> int:
        """
        Decode the argument value from the 5-bit additional information field.
        """
        if additional <= 23:
            return additional
        if additional == 24:
            return self.u8()
        if additional == 25:
            return self.u16()
        if additional == 26:
            return self.u32()
        if additional == 27:
            return self.u64()
        raise ValueError(F'Invalid additional information value: {additional}')

    @staticmethod
    def _json_float(value: float):
        if math.isnan(value):
            return 'NaN'
        if math.isinf(value):
            return '-Infinity' if value < 0 else 'Infinity'
        return value

    def read_float16(self) -> float | str:
        """
        Decode an IEEE 754 half-precision (16-bit) floating-point value.
        """
        data = self.read_exactly(2)
        value, = struct.unpack('>e', data)
        return self._json_float(value)

    def read_item(self):
        """
        Read and decode a single CBOR data item. Returns the decoded Python object.
        """
        ib = self.u8()
        major = ib >> 5
        additional = ib & 0x1F

        if major == 0:
            return self.read_argument(additional)

        if major == 1:
            return -1 - self.read_argument(additional)

        if major == 2:
            if additional == 31:
                chunks = []
                while True:
                    item = self.read_item()
                    if item is self.BREAK:
                        break
                    if not isinstance(item, (bytes, bytearray, memoryview)):
                        raise ValueError('Indefinite-length byte string contains non-byte-string chunk.')
                    chunks.append(item)
                return b''.join(bytes(c) for c in chunks)
            length = self.read_argument(additional)
            return bytes(self.read_exactly(length))

        if major == 3:
            if additional == 31:
                chunks = []
                while True:
                    item = self.read_item()
                    if item is self.BREAK:
                        break
                    if not isinstance(item, str):
                        raise ValueError('Indefinite-length text string contains non-text-string chunk.')
                    chunks.append(item)
                return ''.join(chunks)
            length = self.read_argument(additional)
            return bytes(self.read_exactly(length)).decode('utf-8')

        if major == 4:
            if additional == 31:
                items = []
                while True:
                    item = self.read_item()
                    if item is self.BREAK:
                        break
                    items.append(item)
                return items
            count = self.read_argument(additional)
            return [self.read_item() for _ in range(count)]

        if major == 5:
            if additional == 31:
                pairs = {}
                while True:
                    key = self.read_item()
                    if key is self.BREAK:
                        break
                    value = self.read_item()
                    pairs[key] = value
                return pairs
            count = self.read_argument(additional)
            pairs = {}
            for _ in range(count):
                key = self.read_item()
                value = self.read_item()
                pairs[key] = value
            return pairs

        if major == 6:
            tag_number = self.read_argument(additional)
            content = self.read_item()
            return self._decode_tagged(tag_number, content)

        if major == 7:
            if additional == 31:
                return self.BREAK
            if additional <= 23:
                return self._decode_simple(additional)
            if additional == 24:
                return self._decode_simple(self.u8())
            if additional == 25:
                return self.read_float16()
            if additional == 26:
                return self._json_float(self.f32())
            if additional == 27:
                return self._json_float(self.f64())
            raise ValueError(F'Invalid additional information for major type 7: {additional}')

        raise ValueError(F'Unknown major type: {major}')

    @staticmethod
    def _decode_simple(value: int):
        if value == 20:
            return False
        if value == 21:
            return True
        if value == 22:
            return None
        if value == 23:
            return None
        return F'simple({value})'

    @staticmethod
    def _decode_tagged(tag: int, content):
        """
        Attempt to interpret well-known tags. For tag 2 and 3 (bignums), the content is decoded
        into an integer. Other tags are returned as a descriptive dictionary.
        """
        if tag == 2 and isinstance(content, (bytes, bytearray, memoryview)):
            return int.from_bytes(content, 'big')
        if tag == 3 and isinstance(content, (bytes, bytearray, memoryview)):
            return -1 - int.from_bytes(content, 'big')
        return {'tag': tag, 'value': content}

Ancestors

Class variables

var BREAK

The type of the None singleton.

Methods

def read_argument(self, additional)

Decode the argument value from the 5-bit additional information field.

Expand source code Browse git
def read_argument(self, additional: int) -> int:
    """
    Decode the argument value from the 5-bit additional information field.
    """
    if additional <= 23:
        return additional
    if additional == 24:
        return self.u8()
    if additional == 25:
        return self.u16()
    if additional == 26:
        return self.u32()
    if additional == 27:
        return self.u64()
    raise ValueError(F'Invalid additional information value: {additional}')
def read_float16(self)

Decode an IEEE 754 half-precision (16-bit) floating-point value.

Expand source code Browse git
def read_float16(self) -> float | str:
    """
    Decode an IEEE 754 half-precision (16-bit) floating-point value.
    """
    data = self.read_exactly(2)
    value, = struct.unpack('>e', data)
    return self._json_float(value)
def read_item(self)

Read and decode a single CBOR data item. Returns the decoded Python object.

Expand source code Browse git
def read_item(self):
    """
    Read and decode a single CBOR data item. Returns the decoded Python object.
    """
    ib = self.u8()
    major = ib >> 5
    additional = ib & 0x1F

    if major == 0:
        return self.read_argument(additional)

    if major == 1:
        return -1 - self.read_argument(additional)

    if major == 2:
        if additional == 31:
            chunks = []
            while True:
                item = self.read_item()
                if item is self.BREAK:
                    break
                if not isinstance(item, (bytes, bytearray, memoryview)):
                    raise ValueError('Indefinite-length byte string contains non-byte-string chunk.')
                chunks.append(item)
            return b''.join(bytes(c) for c in chunks)
        length = self.read_argument(additional)
        return bytes(self.read_exactly(length))

    if major == 3:
        if additional == 31:
            chunks = []
            while True:
                item = self.read_item()
                if item is self.BREAK:
                    break
                if not isinstance(item, str):
                    raise ValueError('Indefinite-length text string contains non-text-string chunk.')
                chunks.append(item)
            return ''.join(chunks)
        length = self.read_argument(additional)
        return bytes(self.read_exactly(length)).decode('utf-8')

    if major == 4:
        if additional == 31:
            items = []
            while True:
                item = self.read_item()
                if item is self.BREAK:
                    break
                items.append(item)
            return items
        count = self.read_argument(additional)
        return [self.read_item() for _ in range(count)]

    if major == 5:
        if additional == 31:
            pairs = {}
            while True:
                key = self.read_item()
                if key is self.BREAK:
                    break
                value = self.read_item()
                pairs[key] = value
            return pairs
        count = self.read_argument(additional)
        pairs = {}
        for _ in range(count):
            key = self.read_item()
            value = self.read_item()
            pairs[key] = value
        return pairs

    if major == 6:
        tag_number = self.read_argument(additional)
        content = self.read_item()
        return self._decode_tagged(tag_number, content)

    if major == 7:
        if additional == 31:
            return self.BREAK
        if additional <= 23:
            return self._decode_simple(additional)
        if additional == 24:
            return self._decode_simple(self.u8())
        if additional == 25:
            return self.read_float16()
        if additional == 26:
            return self._json_float(self.f32())
        if additional == 27:
            return self._json_float(self.f64())
        raise ValueError(F'Invalid additional information for major type 7: {additional}')

    raise ValueError(F'Unknown major type: {major}')

Inherited members