Module refinery.units.formats.msgpack
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import msgpack as mp
from refinery.units import RefineryPartialResult, Unit
from refinery.lib.structures import MemoryFile
class msgpack(Unit):
"""
Converts a message-pack (msgpack) buffer to JSON and vice-versa.
"""
def reverse(self, data):
return mp.dumps(json.loads(data))
def process(self, data):
unpacker = mp.Unpacker(MemoryFile(data, read_as_bytes=True))
while True:
try:
item = unpacker.unpack()
except mp.exceptions.OutOfData:
position = unpacker.tell()
if position < len(data):
self.log_warn("oops")
break
except Exception as E:
position = unpacker.tell()
if not position:
raise
view = memoryview(data)
raise RefineryPartialResult(str(E), view[position:])
else:
yield json.dumps(item).encode(self.codec)
Classes
class msgpack
-
Converts a message-pack (msgpack) buffer to JSON and vice-versa.
Expand source code Browse git
class msgpack(Unit): """ Converts a message-pack (msgpack) buffer to JSON and vice-versa. """ def reverse(self, data): return mp.dumps(json.loads(data)) def process(self, data): unpacker = mp.Unpacker(MemoryFile(data, read_as_bytes=True)) while True: try: item = unpacker.unpack() except mp.exceptions.OutOfData: position = unpacker.tell() if position < len(data): self.log_warn("oops") break except Exception as E: position = unpacker.tell() if not position: raise view = memoryview(data) raise RefineryPartialResult(str(E), view[position:]) else: yield json.dumps(item).encode(self.codec)
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Inherited members