Module refinery.units.formats.cbor
Implements parsing of CBOR (Concise Binary Object Representation) data as specified in RFC 8949.
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Implements parsing of CBOR (Concise Binary Object Representation) data as specified in RFC 8949.
"""
from __future__ import annotations
import math
import struct
from refinery.lib.structures import StructReader
from refinery.units.formats import JSONEncoderUnit
class CBORReader(StructReader[memoryview]):
"""
A reader for CBOR-encoded data. CBOR encodes each data item as an initial byte whose high
3 bits indicate the major type and whose low 5 bits provide additional information (the
argument), optionally followed by content bytes. This reader decodes all major types:
- `0`: unsigned integer
- `1`: negative integer
- `2`: byte string
- `3`: text string (UTF-8)
- `4`: array
- `5`: map
- `6`: semantic tag
- `7`: simple values and floating-point numbers
"""
BREAK = object()
def read_argument(self, additional: int) -> int:
"""
Decode the argument value from the 5-bit additional information field.
"""
if additional <= 23:
return additional
if additional == 24:
return self.u8()
if additional == 25:
return self.u16()
if additional == 26:
return self.u32()
if additional == 27:
return self.u64()
raise ValueError(F'Invalid additional information value: {additional}')
@staticmethod
def _json_float(value: float):
if math.isnan(value):
return 'NaN'
if math.isinf(value):
return '-Infinity' if value < 0 else 'Infinity'
return value
def read_float16(self) -> float | str:
"""
Decode an IEEE 754 half-precision (16-bit) floating-point value.
"""
data = self.read_exactly(2)
value, = struct.unpack('>e', data)
return self._json_float(value)
def read_item(self):
"""
Read and decode a single CBOR data item. Returns the decoded Python object.
"""
ib = self.u8()
major = ib >> 5
additional = ib & 0x1F
if major == 0:
return self.read_argument(additional)
if major == 1:
return -1 - self.read_argument(additional)
if major == 2:
if additional == 31:
chunks = []
while True:
item = self.read_item()
if item is self.BREAK:
break
if not isinstance(item, (bytes, bytearray, memoryview)):
raise ValueError('Indefinite-length byte string contains non-byte-string chunk.')
chunks.append(item)
return b''.join(bytes(c) for c in chunks)
length = self.read_argument(additional)
return bytes(self.read_exactly(length))
if major == 3:
if additional == 31:
chunks = []
while True:
item = self.read_item()
if item is self.BREAK:
break
if not isinstance(item, str):
raise ValueError('Indefinite-length text string contains non-text-string chunk.')
chunks.append(item)
return ''.join(chunks)
length = self.read_argument(additional)
return bytes(self.read_exactly(length)).decode('utf-8')
if major == 4:
if additional == 31:
items = []
while True:
item = self.read_item()
if item is self.BREAK:
break
items.append(item)
return items
count = self.read_argument(additional)
return [self.read_item() for _ in range(count)]
if major == 5:
if additional == 31:
pairs = {}
while True:
key = self.read_item()
if key is self.BREAK:
break
value = self.read_item()
pairs[key] = value
return pairs
count = self.read_argument(additional)
pairs = {}
for _ in range(count):
key = self.read_item()
value = self.read_item()
pairs[key] = value
return pairs
if major == 6:
tag_number = self.read_argument(additional)
content = self.read_item()
return self._decode_tagged(tag_number, content)
if major == 7:
if additional == 31:
return self.BREAK
if additional <= 23:
return self._decode_simple(additional)
if additional == 24:
return self._decode_simple(self.u8())
if additional == 25:
return self.read_float16()
if additional == 26:
return self._json_float(self.f32())
if additional == 27:
return self._json_float(self.f64())
raise ValueError(F'Invalid additional information for major type 7: {additional}')
raise ValueError(F'Unknown major type: {major}')
@staticmethod
def _decode_simple(value: int):
if value == 20:
return False
if value == 21:
return True
if value == 22:
return None
if value == 23:
return None
return F'simple({value})'
@staticmethod
def _decode_tagged(tag: int, content):
"""
Attempt to interpret well-known tags. For tag 2 and 3 (bignums), the content is decoded
into an integer. Other tags are returned as a descriptive dictionary.
"""
if tag == 2 and isinstance(content, (bytes, bytearray, memoryview)):
return int.from_bytes(content, 'big')
if tag == 3 and isinstance(content, (bytes, bytearray, memoryview)):
return -1 - int.from_bytes(content, 'big')
return {'tag': tag, 'value': content}
class cbor(JSONEncoderUnit):
"""
Parses CBOR (Concise Binary Object Representation) data and converts it to JSON. CBOR is a
binary data serialization format defined in RFC 8949. It supports integers, byte strings,
text strings, arrays, maps, tags, and simple values including booleans, null, and
floating-point numbers.
"""
def process(self, data):
reader = CBORReader(memoryview(data), bigendian=True)
message = reader.read_item()
return self.to_json(message)
Classes
class CBORReader (data, bigendian=None)-
A reader for CBOR-encoded data. CBOR encodes each data item as an initial byte whose high 3 bits indicate the major type and whose low 5 bits provide additional information (the argument), optionally followed by content bytes. This reader decodes all major types:
0: unsigned integer1: negative integer2: byte string3: text string (UTF-8)4: array5: map6: semantic tag7: simple values and floating-point numbers
Expand source code Browse git
class CBORReader(StructReader[memoryview]): """ A reader for CBOR-encoded data. CBOR encodes each data item as an initial byte whose high 3 bits indicate the major type and whose low 5 bits provide additional information (the argument), optionally followed by content bytes. This reader decodes all major types: - `0`: unsigned integer - `1`: negative integer - `2`: byte string - `3`: text string (UTF-8) - `4`: array - `5`: map - `6`: semantic tag - `7`: simple values and floating-point numbers """ BREAK = object() def read_argument(self, additional: int) -> int: """ Decode the argument value from the 5-bit additional information field. """ if additional <= 23: return additional if additional == 24: return self.u8() if additional == 25: return self.u16() if additional == 26: return self.u32() if additional == 27: return self.u64() raise ValueError(F'Invalid additional information value: {additional}') @staticmethod def _json_float(value: float): if math.isnan(value): return 'NaN' if math.isinf(value): return '-Infinity' if value < 0 else 'Infinity' return value def read_float16(self) -> float | str: """ Decode an IEEE 754 half-precision (16-bit) floating-point value. """ data = self.read_exactly(2) value, = struct.unpack('>e', data) return self._json_float(value) def read_item(self): """ Read and decode a single CBOR data item. Returns the decoded Python object. """ ib = self.u8() major = ib >> 5 additional = ib & 0x1F if major == 0: return self.read_argument(additional) if major == 1: return -1 - self.read_argument(additional) if major == 2: if additional == 31: chunks = [] while True: item = self.read_item() if item is self.BREAK: break if not isinstance(item, (bytes, bytearray, memoryview)): raise ValueError('Indefinite-length byte string contains non-byte-string chunk.') chunks.append(item) return b''.join(bytes(c) for c in chunks) length = self.read_argument(additional) return bytes(self.read_exactly(length)) if major == 3: if additional == 31: chunks = [] while True: item = self.read_item() if item is self.BREAK: break if not isinstance(item, str): raise ValueError('Indefinite-length text string contains non-text-string chunk.') chunks.append(item) return ''.join(chunks) length = self.read_argument(additional) return bytes(self.read_exactly(length)).decode('utf-8') if major == 4: if additional == 31: items = [] while True: item = self.read_item() if item is self.BREAK: break items.append(item) return items count = self.read_argument(additional) return [self.read_item() for _ in range(count)] if major == 5: if additional == 31: pairs = {} while True: key = self.read_item() if key is self.BREAK: break value = self.read_item() pairs[key] = value return pairs count = self.read_argument(additional) pairs = {} for _ in range(count): key = self.read_item() value = self.read_item() pairs[key] = value return pairs if major == 6: tag_number = self.read_argument(additional) content = self.read_item() return self._decode_tagged(tag_number, content) if major == 7: if additional == 31: return self.BREAK if additional <= 23: return self._decode_simple(additional) if additional == 24: return self._decode_simple(self.u8()) if additional == 25: return self.read_float16() if additional == 26: return self._json_float(self.f32()) if additional == 27: return self._json_float(self.f64()) raise ValueError(F'Invalid additional information for major type 7: {additional}') raise ValueError(F'Unknown major type: {major}') @staticmethod def _decode_simple(value: int): if value == 20: return False if value == 21: return True if value == 22: return None if value == 23: return None return F'simple({value})' @staticmethod def _decode_tagged(tag: int, content): """ Attempt to interpret well-known tags. For tag 2 and 3 (bignums), the content is decoded into an integer. Other tags are returned as a descriptive dictionary. """ if tag == 2 and isinstance(content, (bytes, bytearray, memoryview)): return int.from_bytes(content, 'big') if tag == 3 and isinstance(content, (bytes, bytearray, memoryview)): return -1 - int.from_bytes(content, 'big') return {'tag': tag, 'value': content}Ancestors
- StructReader
- MemoryFile
- MemoryFileMethods
- typing.Generic
- _io.BytesIO
- _io._BufferedIOBase
- _io._IOBase
Class variables
var BREAK-
The type of the None singleton.
Methods
def read_argument(self, additional)-
Decode the argument value from the 5-bit additional information field.
Expand source code Browse git
def read_argument(self, additional: int) -> int: """ Decode the argument value from the 5-bit additional information field. """ if additional <= 23: return additional if additional == 24: return self.u8() if additional == 25: return self.u16() if additional == 26: return self.u32() if additional == 27: return self.u64() raise ValueError(F'Invalid additional information value: {additional}') def read_float16(self)-
Decode an IEEE 754 half-precision (16-bit) floating-point value.
Expand source code Browse git
def read_float16(self) -> float | str: """ Decode an IEEE 754 half-precision (16-bit) floating-point value. """ data = self.read_exactly(2) value, = struct.unpack('>e', data) return self._json_float(value) def read_item(self)-
Read and decode a single CBOR data item. Returns the decoded Python object.
Expand source code Browse git
def read_item(self): """ Read and decode a single CBOR data item. Returns the decoded Python object. """ ib = self.u8() major = ib >> 5 additional = ib & 0x1F if major == 0: return self.read_argument(additional) if major == 1: return -1 - self.read_argument(additional) if major == 2: if additional == 31: chunks = [] while True: item = self.read_item() if item is self.BREAK: break if not isinstance(item, (bytes, bytearray, memoryview)): raise ValueError('Indefinite-length byte string contains non-byte-string chunk.') chunks.append(item) return b''.join(bytes(c) for c in chunks) length = self.read_argument(additional) return bytes(self.read_exactly(length)) if major == 3: if additional == 31: chunks = [] while True: item = self.read_item() if item is self.BREAK: break if not isinstance(item, str): raise ValueError('Indefinite-length text string contains non-text-string chunk.') chunks.append(item) return ''.join(chunks) length = self.read_argument(additional) return bytes(self.read_exactly(length)).decode('utf-8') if major == 4: if additional == 31: items = [] while True: item = self.read_item() if item is self.BREAK: break items.append(item) return items count = self.read_argument(additional) return [self.read_item() for _ in range(count)] if major == 5: if additional == 31: pairs = {} while True: key = self.read_item() if key is self.BREAK: break value = self.read_item() pairs[key] = value return pairs count = self.read_argument(additional) pairs = {} for _ in range(count): key = self.read_item() value = self.read_item() pairs[key] = value return pairs if major == 6: tag_number = self.read_argument(additional) content = self.read_item() return self._decode_tagged(tag_number, content) if major == 7: if additional == 31: return self.BREAK if additional <= 23: return self._decode_simple(additional) if additional == 24: return self._decode_simple(self.u8()) if additional == 25: return self.read_float16() if additional == 26: return self._json_float(self.f32()) if additional == 27: return self._json_float(self.f64()) raise ValueError(F'Invalid additional information for major type 7: {additional}') raise ValueError(F'Unknown major type: {major}')
Inherited members
class cbor (encode=None, digest=None, arrays=False)-
Parses CBOR (Concise Binary Object Representation) data and converts it to JSON. CBOR is a binary data serialization format defined in RFC 8949. It supports integers, byte strings, text strings, arrays, maps, tags, and simple values including booleans, null, and floating-point numbers.
Expand source code Browse git
class cbor(JSONEncoderUnit): """ Parses CBOR (Concise Binary Object Representation) data and converts it to JSON. CBOR is a binary data serialization format defined in RFC 8949. It supports integers, byte strings, text strings, arrays, maps, tags, and simple values including booleans, null, and floating-point numbers. """ def process(self, data): reader = CBORReader(memoryview(data), bigendian=True) message = reader.read_item() return self.to_json(message)Ancestors
Subclasses
Class variables
var reverse-
The type of the None singleton.
Inherited members
JSONEncoderUnit:FilterEverythingRequiresactassemblecodecconsolefilterfinishhandlesis_quietis_reversibleisattylabelledleniencylog_alwayslog_debuglog_detachlog_faillog_infolog_levellog_warnloggernamenozzleoptional_dependenciesprocessreadread1required_dependenciesresetrunsourcesuperinit