Module refinery.units.formats.json
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Union, Optional, Iterable
import json
from refinery.units import Chunk
from refinery.units.formats import PathExtractorUnit, UnpackResult, Unit
from refinery.lib.meta import is_valid_variable_name, metavars
from refinery.lib.patterns import checks
class xtjson(PathExtractorUnit):
"""
Extract values from a JSON document.
"""
CustomPathSeparator = '.'
def unpack(self, data):
sep = self.CustomPathSeparator
def crawl(path, cursor):
if isinstance(cursor, dict):
for key, value in cursor.items():
yield from crawl(F'{path}{sep}{key}', value)
elif isinstance(cursor, list):
for key, value in enumerate(cursor):
yield from crawl(F'{path}{sep}{key:d}', value)
if path:
yield path, cursor, cursor.__class__.__name__
for path, item, typename in crawl('', json.loads(data)):
def extract(item=item):
if isinstance(item, (list, dict)):
dumped = json.dumps(item, indent=4)
else:
dumped = str(item)
return dumped.encode('latin1')
yield UnpackResult(path, extract, type=typename)
@classmethod
def handles(self, data: bytearray) -> Optional[bool]:
return bool(checks.json.fullmatch(data))
class xj0(Unit):
"""
Extracts a single field from a JSON document at depth 0. By default, the unit applies a heuristic to
extract remaining fields as metadata: String values are extracted only if they do not exceed 80
characters in length and do not contain any line breaks. Floating-point, integer, boolean values, and
lists of the latter are also extracted.
"""
def __init__(
self,
fmt: Unit.Arg.String(help=(
'Format expression for the output chunk; may use previously extracted JSON items. The default '
'is {default}, which represents the input data.')) = '',
all: Unit.Arg.Switch('-a', group='META', help='Extract all other fields as metadata regardless of length and type.') = False,
one: Unit.Arg.Switch('-x', group='META', help='Do not extract any other fields as metadata.') = False,
raw: Unit.Arg.Switch('-r', help='Disable conversion of JSON strings to binary strings in metadata') = False,
):
super().__init__(fmt=fmt, one=one, raw=raw, all=all)
def process(self, data: Chunk):
def convert(value, iskey=False):
if self.args.raw:
return value
if isinstance(value, (float, int, bool)):
return value
if isinstance(value, str):
return value.encode(self.codec)
if iskey:
raise TypeError
if isinstance(value, dict):
return {convert(k): convert(v) for k, v in value.items()}
if isinstance(value, list):
return [convert(k) for k in value]
def acceptable(key, value, nested=False, convert=False):
if not is_valid_variable_name(key):
self.log_info(F'rejecting item with invalid name {key}')
return None
if isinstance(value, (float, int, bool)):
return value
if isinstance(value, dict):
if not self.args.all:
self.log_info(F'rejecting item {key} with dictionary value')
return False
return True
if isinstance(value, list):
if nested:
self.log_info(F'rejecting item {key} containing a doubly nested list')
return False
return all(acceptable(key, t, True) for t in value)
if isinstance(value, str):
if not self.args.all:
if len(value) not in range(1, 80):
self.log_info(F'rejecting string item {key} because {len(value)} exceeds the length limit')
return False
if '\n' in value:
self.log_info(F'rejecting string item {key} because it contains line breaks')
return False
return True
return False
jdoc: dict = json.loads(data)
if not isinstance(jdoc, dict):
raise ValueError('The input must be a JSON dictionary.')
meta = metavars(data)
args = {k: convert(v) for k, v in jdoc.items() if acceptable(k, v)}
used = set()
data[:] = meta.format_bin(self.args.fmt, self.codec, [data], args, used)
for u in used:
args.pop(u, None)
if not self.args.one:
data.meta.update(args)
return data
class xjl(Unit):
"""
Returns all JSON elements from a JSON iterable as individual outputs. When reversed, the unit
collects all chunks in the frame and wraps them as a JSON list.
"""
def process(self, data):
try:
doc: Union[list, dict] = json.loads(data)
except Exception:
from refinery.units.pattern.carve_json import carve_json
doc = data | carve_json | json.loads
try:
it = doc.values()
except AttributeError:
it = doc
for item in it:
yield json.dumps(item, indent=4).encode(self.codec)
def reverse(self, data):
return json.dumps(data.temp).encode(self.codec)
def filter(self, chunks: Iterable[Chunk]):
if not self.args.reverse:
yield from chunks
from refinery.lib.tools import begin
if it := begin(chunks):
head, rest = it
collected = [head.decode(self.codec)]
collected.extend(chunk.decode(self.codec) for chunk in rest)
head.temp = collected
yield head
Classes
class xtjson (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path')
-
Extract values from a JSON document.
Expand source code Browse git
class xtjson(PathExtractorUnit): """ Extract values from a JSON document. """ CustomPathSeparator = '.' def unpack(self, data): sep = self.CustomPathSeparator def crawl(path, cursor): if isinstance(cursor, dict): for key, value in cursor.items(): yield from crawl(F'{path}{sep}{key}', value) elif isinstance(cursor, list): for key, value in enumerate(cursor): yield from crawl(F'{path}{sep}{key:d}', value) if path: yield path, cursor, cursor.__class__.__name__ for path, item, typename in crawl('', json.loads(data)): def extract(item=item): if isinstance(item, (list, dict)): dumped = json.dumps(item, indent=4) else: dumped = str(item) return dumped.encode('latin1') yield UnpackResult(path, extract, type=typename) @classmethod def handles(self, data: bytearray) -> Optional[bool]: return bool(checks.json.fullmatch(data))
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Methods
def unpack(self, data)
-
Expand source code Browse git
def unpack(self, data): sep = self.CustomPathSeparator def crawl(path, cursor): if isinstance(cursor, dict): for key, value in cursor.items(): yield from crawl(F'{path}{sep}{key}', value) elif isinstance(cursor, list): for key, value in enumerate(cursor): yield from crawl(F'{path}{sep}{key:d}', value) if path: yield path, cursor, cursor.__class__.__name__ for path, item, typename in crawl('', json.loads(data)): def extract(item=item): if isinstance(item, (list, dict)): dumped = json.dumps(item, indent=4) else: dumped = str(item) return dumped.encode('latin1') yield UnpackResult(path, extract, type=typename)
Inherited members
class xj0 (fmt='', all=False, one=False, raw=False)
-
Extracts a single field from a JSON document at depth 0. By default, the unit applies a heuristic to extract remaining fields as metadata: String values are extracted only if they do not exceed 80 characters in length and do not contain any line breaks. Floating-point, integer, boolean values, and lists of the latter are also extracted.
Expand source code Browse git
class xj0(Unit): """ Extracts a single field from a JSON document at depth 0. By default, the unit applies a heuristic to extract remaining fields as metadata: String values are extracted only if they do not exceed 80 characters in length and do not contain any line breaks. Floating-point, integer, boolean values, and lists of the latter are also extracted. """ def __init__( self, fmt: Unit.Arg.String(help=( 'Format expression for the output chunk; may use previously extracted JSON items. The default ' 'is {default}, which represents the input data.')) = '', all: Unit.Arg.Switch('-a', group='META', help='Extract all other fields as metadata regardless of length and type.') = False, one: Unit.Arg.Switch('-x', group='META', help='Do not extract any other fields as metadata.') = False, raw: Unit.Arg.Switch('-r', help='Disable conversion of JSON strings to binary strings in metadata') = False, ): super().__init__(fmt=fmt, one=one, raw=raw, all=all) def process(self, data: Chunk): def convert(value, iskey=False): if self.args.raw: return value if isinstance(value, (float, int, bool)): return value if isinstance(value, str): return value.encode(self.codec) if iskey: raise TypeError if isinstance(value, dict): return {convert(k): convert(v) for k, v in value.items()} if isinstance(value, list): return [convert(k) for k in value] def acceptable(key, value, nested=False, convert=False): if not is_valid_variable_name(key): self.log_info(F'rejecting item with invalid name {key}') return None if isinstance(value, (float, int, bool)): return value if isinstance(value, dict): if not self.args.all: self.log_info(F'rejecting item {key} with dictionary value') return False return True if isinstance(value, list): if nested: self.log_info(F'rejecting item {key} containing a doubly nested list') return False return all(acceptable(key, t, True) for t in value) if isinstance(value, str): if not self.args.all: if len(value) not in range(1, 80): self.log_info(F'rejecting string item {key} because {len(value)} exceeds the length limit') return False if '\n' in value: self.log_info(F'rejecting string item {key} because it contains line breaks') return False return True return False jdoc: dict = json.loads(data) if not isinstance(jdoc, dict): raise ValueError('The input must be a JSON dictionary.') meta = metavars(data) args = {k: convert(v) for k, v in jdoc.items() if acceptable(k, v)} used = set() data[:] = meta.format_bin(self.args.fmt, self.codec, [data], args, used) for u in used: args.pop(u, None) if not self.args.one: data.meta.update(args) return data
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Inherited members
class xjl
-
Returns all JSON elements from a JSON iterable as individual outputs. When reversed, the unit collects all chunks in the frame and wraps them as a JSON list.
Expand source code Browse git
class xjl(Unit): """ Returns all JSON elements from a JSON iterable as individual outputs. When reversed, the unit collects all chunks in the frame and wraps them as a JSON list. """ def process(self, data): try: doc: Union[list, dict] = json.loads(data) except Exception: from refinery.units.pattern.carve_json import carve_json doc = data | carve_json | json.loads try: it = doc.values() except AttributeError: it = doc for item in it: yield json.dumps(item, indent=4).encode(self.codec) def reverse(self, data): return json.dumps(data.temp).encode(self.codec) def filter(self, chunks: Iterable[Chunk]): if not self.args.reverse: yield from chunks from refinery.lib.tools import begin if it := begin(chunks): head, rest = it collected = [head.decode(self.codec)] collected.extend(chunk.decode(self.codec) for chunk in rest) head.temp = collected yield head
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Inherited members