Module refinery.units.formats.deserialize_php
Expand source code Browse git
from __future__ import annotations
from io import BytesIO
from refinery.lib import json
from refinery.units import Unit
class _phpobject:
__slots__ = ('__name__', '__php_vars__')
def __init__(self, name: str, d: dict | None = None):
object.__setattr__(self, '__name__', name)
object.__setattr__(self, '__php_vars__', d or {})
def _asdict(self) -> dict:
result = {}
for key, value in self.__php_vars__.items():
if key[:1] in (' ', '\x00'):
key = key.split(key[:1], 2)[-1]
result[key] = value
return result
class dsphp(Unit):
"""
Deserialize PHP serialized data and re-serialize as JSON.
"""
@staticmethod
def _loads(data: bytes | bytearray | memoryview) -> object:
fp = BytesIO(bytes(data))
def _expect(e: bytes):
v = fp.read(len(e))
if v != e:
raise ValueError(F'expected {e!r}, got {v!r}')
def _read_until(delim: bytes) -> bytes:
buf = []
while True:
char = fp.read(1)
if char == delim:
break
if not char:
raise ValueError('unexpected end of stream')
buf.append(char)
return b''.join(buf)
def _load_array() -> list[tuple]:
items = int(_read_until(b':')) * 2
_expect(b'{')
result = []
last_item = Ellipsis
for _ in range(items):
item = _unserialize()
if last_item is Ellipsis:
last_item = item
else:
result.append((last_item, item))
last_item = Ellipsis
_expect(b'}')
return result
def _unserialize() -> object:
opcode = fp.read(1).lower()
if opcode == b'n':
_expect(b';')
return None
if opcode in b'idb':
_expect(b':')
value = _read_until(b';')
if opcode == b'i':
return int(value)
if opcode == b'd':
return float(value)
return int(value) != 0
if opcode == b's':
_expect(b':')
length = int(_read_until(b':'))
_expect(b'"')
value = fp.read(length)
_expect(b'"')
_expect(b';')
return value.decode('utf-8', 'surrogateescape')
if opcode == b'a':
_expect(b':')
return dict(_load_array())
if opcode == b'o':
_expect(b':')
name_length = int(_read_until(b':'))
_expect(b'"')
name = fp.read(name_length).decode('utf-8', 'surrogateescape')
_expect(b'":')
return _phpobject(name, dict(_load_array()))
raise ValueError(F'unexpected opcode: {opcode!r}')
return _unserialize()
@staticmethod
def _dumps(data: object) -> bytes:
def _serialize(obj: object, keypos: bool) -> bytes:
if keypos:
if isinstance(obj, (int, float, bool)):
return F'i:{int(obj)};'.encode('latin1')
if isinstance(obj, str):
encoded = obj.encode('utf-8', 'surrogateescape')
return F's:{len(encoded)}:'.encode('latin1') + b'"' + encoded + b'";'
if isinstance(obj, bytes):
return F's:{len(obj)}:'.encode('latin1') + b'"' + obj + b'";'
if obj is None:
return b's:0:"";'
raise TypeError(F'cannot serialize {type(obj)!r} as key')
if obj is None:
return b'N;'
if isinstance(obj, bool):
return F'b:{int(obj)};'.encode('latin1')
if isinstance(obj, int):
return F'i:{obj};'.encode('latin1')
if isinstance(obj, float):
return F'd:{obj};'.encode('latin1')
if isinstance(obj, str):
encoded = obj.encode('utf-8', 'surrogateescape')
return F's:{len(encoded)}:'.encode('latin1') + b'"' + encoded + b'";'
if isinstance(obj, bytes):
return F's:{len(obj)}:'.encode('latin1') + b'"' + obj + b'";'
if isinstance(obj, dict):
parts = []
for key, value in obj.items():
parts.append(_serialize(key, True))
parts.append(_serialize(value, False))
return (
F'a:{len(obj)}:'.encode('latin1')
+ b'{' + b''.join(parts) + b'}'
)
if isinstance(obj, (list, tuple)):
parts = []
for index, value in enumerate(obj):
parts.append(_serialize(index, True))
parts.append(_serialize(value, False))
return (
F'a:{len(obj)}:'.encode('latin1')
+ b'{' + b''.join(parts) + b'}'
)
if isinstance(obj, _phpobject):
name = _serialize(obj.__name__, True)
body = _serialize(obj.__php_vars__, False)
return b'O' + name[1:-1] + body[1:]
raise TypeError(F'cannot serialize {type(obj)!r}')
return _serialize(data, False)
def reverse(self, data):
return self._dumps(json.loads(data))
def process(self, data):
def tojson(obj):
if isinstance(obj, _phpobject):
return obj._asdict()
return json.dumps(self._loads(data), tojson=tojson)
Classes
class dsphp-
Deserialize PHP serialized data and re-serialize as JSON.
Expand source code Browse git
class dsphp(Unit): """ Deserialize PHP serialized data and re-serialize as JSON. """ @staticmethod def _loads(data: bytes | bytearray | memoryview) -> object: fp = BytesIO(bytes(data)) def _expect(e: bytes): v = fp.read(len(e)) if v != e: raise ValueError(F'expected {e!r}, got {v!r}') def _read_until(delim: bytes) -> bytes: buf = [] while True: char = fp.read(1) if char == delim: break if not char: raise ValueError('unexpected end of stream') buf.append(char) return b''.join(buf) def _load_array() -> list[tuple]: items = int(_read_until(b':')) * 2 _expect(b'{') result = [] last_item = Ellipsis for _ in range(items): item = _unserialize() if last_item is Ellipsis: last_item = item else: result.append((last_item, item)) last_item = Ellipsis _expect(b'}') return result def _unserialize() -> object: opcode = fp.read(1).lower() if opcode == b'n': _expect(b';') return None if opcode in b'idb': _expect(b':') value = _read_until(b';') if opcode == b'i': return int(value) if opcode == b'd': return float(value) return int(value) != 0 if opcode == b's': _expect(b':') length = int(_read_until(b':')) _expect(b'"') value = fp.read(length) _expect(b'"') _expect(b';') return value.decode('utf-8', 'surrogateescape') if opcode == b'a': _expect(b':') return dict(_load_array()) if opcode == b'o': _expect(b':') name_length = int(_read_until(b':')) _expect(b'"') name = fp.read(name_length).decode('utf-8', 'surrogateescape') _expect(b'":') return _phpobject(name, dict(_load_array())) raise ValueError(F'unexpected opcode: {opcode!r}') return _unserialize() @staticmethod def _dumps(data: object) -> bytes: def _serialize(obj: object, keypos: bool) -> bytes: if keypos: if isinstance(obj, (int, float, bool)): return F'i:{int(obj)};'.encode('latin1') if isinstance(obj, str): encoded = obj.encode('utf-8', 'surrogateescape') return F's:{len(encoded)}:'.encode('latin1') + b'"' + encoded + b'";' if isinstance(obj, bytes): return F's:{len(obj)}:'.encode('latin1') + b'"' + obj + b'";' if obj is None: return b's:0:"";' raise TypeError(F'cannot serialize {type(obj)!r} as key') if obj is None: return b'N;' if isinstance(obj, bool): return F'b:{int(obj)};'.encode('latin1') if isinstance(obj, int): return F'i:{obj};'.encode('latin1') if isinstance(obj, float): return F'd:{obj};'.encode('latin1') if isinstance(obj, str): encoded = obj.encode('utf-8', 'surrogateescape') return F's:{len(encoded)}:'.encode('latin1') + b'"' + encoded + b'";' if isinstance(obj, bytes): return F's:{len(obj)}:'.encode('latin1') + b'"' + obj + b'";' if isinstance(obj, dict): parts = [] for key, value in obj.items(): parts.append(_serialize(key, True)) parts.append(_serialize(value, False)) return ( F'a:{len(obj)}:'.encode('latin1') + b'{' + b''.join(parts) + b'}' ) if isinstance(obj, (list, tuple)): parts = [] for index, value in enumerate(obj): parts.append(_serialize(index, True)) parts.append(_serialize(value, False)) return ( F'a:{len(obj)}:'.encode('latin1') + b'{' + b''.join(parts) + b'}' ) if isinstance(obj, _phpobject): name = _serialize(obj.__name__, True) body = _serialize(obj.__php_vars__, False) return b'O' + name[1:-1] + body[1:] raise TypeError(F'cannot serialize {type(obj)!r}') return _serialize(data, False) def reverse(self, data): return self._dumps(json.loads(data)) def process(self, data): def tojson(obj): if isinstance(obj, _phpobject): return obj._asdict() return json.dumps(self._loads(data), tojson=tojson)Ancestors
Subclasses
Inherited members