Module refinery.units.pattern.carve_json
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import string
import json
import re
from refinery.units import Arg, Unit
_JSON_DELIMITER = re.compile(BR'[\[\]\{\}"]')
_JSON_TOKEN_TO_TERMINATOR = {
B'['[0]: B']'[0],
B'{'[0]: B'}'[0],
}
class JSONCarver:
_PRINTABLE_BYTES = set(bytes(string.printable, 'ascii'))
_MAX_PARSE_DEPTH = 200
def __init__(self, data: bytearray, dictonly=False):
self.data = data
self.dictonly = dictonly
self.cursor = 0
def __iter__(self):
return self
def __next__(self):
data = self.data
while True:
start = data.find(B'{', self.cursor)
if not self.dictonly:
start_list = data.find(B'[', self.cursor)
start_dict = start % len(data)
if start_dict > start_list >= 0:
start = start_list
if start < self.cursor:
raise StopIteration
self.cursor = start + 1
end = self.find_end(data, start)
if end is None:
continue
try:
if not json.loads(data[start:end]):
continue
except json.JSONDecodeError:
continue
except UnicodeDecodeError:
continue
self.cursor = end + 1
return start, data[start:end]
@classmethod
def find_end(cls, data: bytearray, start: int):
token = data[start]
scope = bytearray()
cursor = start
view = memoryview(data)
scope.append(_JSON_TOKEN_TO_TERMINATOR[token])
printable = cls._PRINTABLE_BYTES
while scope:
if len(scope) >= cls._MAX_PARSE_DEPTH:
return None
delim = _JSON_DELIMITER.search(data, cursor + 1)
if delim is None:
return None
cursor = delim.start()
token = data[cursor]
if token == 0x22:
while True:
m = re.search(B'\\"', view[cursor + 1:])
if m is not None:
cursor += m.start() + 1
else:
return None
if data[cursor - 1] != 0x5C:
break
continue
if token not in printable:
return None
if scope[~0] == token:
if token != B'"' or data[cursor - 1] != B'\\'[0]:
scope.pop()
else:
try:
scope.append(_JSON_TOKEN_TO_TERMINATOR[token])
except KeyError:
return None
return cursor + 1
class carve_json(Unit):
"""
Extracts anything from the input data that looks like JSON.
"""
def __init__(self, dictonly: Arg.Switch('-d', help='only extract JSON dictionaries, do not extract lists.') = False):
super().__init__(dictonly=dictonly)
def process(self, data):
for start, chunk in JSONCarver(data, dictonly=self.args.dictonly):
yield self.labelled(chunk, offset=start)
Classes
class JSONCarver (data, dictonly=False)
-
Expand source code Browse git
class JSONCarver: _PRINTABLE_BYTES = set(bytes(string.printable, 'ascii')) _MAX_PARSE_DEPTH = 200 def __init__(self, data: bytearray, dictonly=False): self.data = data self.dictonly = dictonly self.cursor = 0 def __iter__(self): return self def __next__(self): data = self.data while True: start = data.find(B'{', self.cursor) if not self.dictonly: start_list = data.find(B'[', self.cursor) start_dict = start % len(data) if start_dict > start_list >= 0: start = start_list if start < self.cursor: raise StopIteration self.cursor = start + 1 end = self.find_end(data, start) if end is None: continue try: if not json.loads(data[start:end]): continue except json.JSONDecodeError: continue except UnicodeDecodeError: continue self.cursor = end + 1 return start, data[start:end] @classmethod def find_end(cls, data: bytearray, start: int): token = data[start] scope = bytearray() cursor = start view = memoryview(data) scope.append(_JSON_TOKEN_TO_TERMINATOR[token]) printable = cls._PRINTABLE_BYTES while scope: if len(scope) >= cls._MAX_PARSE_DEPTH: return None delim = _JSON_DELIMITER.search(data, cursor + 1) if delim is None: return None cursor = delim.start() token = data[cursor] if token == 0x22: while True: m = re.search(B'\\"', view[cursor + 1:]) if m is not None: cursor += m.start() + 1 else: return None if data[cursor - 1] != 0x5C: break continue if token not in printable: return None if scope[~0] == token: if token != B'"' or data[cursor - 1] != B'\\'[0]: scope.pop() else: try: scope.append(_JSON_TOKEN_TO_TERMINATOR[token]) except KeyError: return None return cursor + 1
Static methods
def find_end(data, start)
-
Expand source code Browse git
@classmethod def find_end(cls, data: bytearray, start: int): token = data[start] scope = bytearray() cursor = start view = memoryview(data) scope.append(_JSON_TOKEN_TO_TERMINATOR[token]) printable = cls._PRINTABLE_BYTES while scope: if len(scope) >= cls._MAX_PARSE_DEPTH: return None delim = _JSON_DELIMITER.search(data, cursor + 1) if delim is None: return None cursor = delim.start() token = data[cursor] if token == 0x22: while True: m = re.search(B'\\"', view[cursor + 1:]) if m is not None: cursor += m.start() + 1 else: return None if data[cursor - 1] != 0x5C: break continue if token not in printable: return None if scope[~0] == token: if token != B'"' or data[cursor - 1] != B'\\'[0]: scope.pop() else: try: scope.append(_JSON_TOKEN_TO_TERMINATOR[token]) except KeyError: return None return cursor + 1
class carve_json (dictonly=False)
-
Extracts anything from the input data that looks like JSON.
Expand source code Browse git
class carve_json(Unit): """ Extracts anything from the input data that looks like JSON. """ def __init__(self, dictonly: Arg.Switch('-d', help='only extract JSON dictionaries, do not extract lists.') = False): super().__init__(dictonly=dictonly) def process(self, data): for start, chunk in JSONCarver(data, dictonly=self.args.dictonly): yield self.labelled(chunk, offset=start)
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Inherited members