Module refinery.units.formats.archive.xtsql
Expand source code Browse git
from __future__ import annotations
import functools
import sqlite3
import sys
from refinery.lib.json import BytesAsStringEncoder
from refinery.units.formats import PathExtractorUnit, UnpackResult
class xtsql(PathExtractorUnit):
"""
Extract files from SQLite3 databases.
"""
def unpack(self, data: bytearray):
def _json(object):
with BytesAsStringEncoder as encoder:
return encoder.dumps(object).encode(self.codec)
if sys.version_info[:2] < (3, 11):
raise NotImplementedError(F'python 3.11 is required to use {self.__class__.__name__}.')
database = sqlite3.connect(':memory:')
database.text_factory = bytes
database.deserialize(data)
cursor = database.cursor()
result: dict[str, list[dict[str, int | float | str | bytes]]] = {}
listing: list[tuple[bytes, bytes]] = cursor.execute(
"SELECT name, sql FROM sqlite_master WHERE type='table';").fetchall()
for tbl, spec in listing:
table = tbl.decode('utf8')
result[table] = t = []
ct, _tbl, names = spec.partition(tbl)
ct = ct.rstrip(B'"')
names = names.lstrip(B'"')
names = names.strip()
names, _, _ = names.rpartition(B')')
if (
tbl != _tbl
or ct.strip().upper().split() != [B'CREATE', B'TABLE']
or not names.startswith(B'(')
):
raise ValueError(F'Unexpeted SQL statement for {table} in master table: {spec}')
names = [next(iter(name.strip().split()))
for name in names[1:-1].decode().split(',')]
for row in cursor.execute(F'SELECT * FROM {table}').fetchall():
t.append(dict(zip(names, row)))
yield UnpackResult('db', functools.partial(_json, result))
for table, rows in result.items():
yield UnpackResult(F'db/{table}', functools.partial(_json, rows))
for k, row in enumerate(rows):
root = F'db/{table}/{k}'
yield UnpackResult(root, functools.partial(_json, row))
for name, value in row.items():
path = F'{root}/{name}'
if value is None:
continue
if isinstance(value, (int, float)):
value = str(value)
if isinstance(value, str):
value = value.encode(self.codec)
if isinstance(value, bytes):
yield UnpackResult(path, value)
@classmethod
def handles(cls, data):
return memoryview(data)[:15] == B'SQLite format 3'
Classes
class xtsql (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path')-
Extract files from SQLite3 databases.
Expand source code Browse git
class xtsql(PathExtractorUnit): """ Extract files from SQLite3 databases. """ def unpack(self, data: bytearray): def _json(object): with BytesAsStringEncoder as encoder: return encoder.dumps(object).encode(self.codec) if sys.version_info[:2] < (3, 11): raise NotImplementedError(F'python 3.11 is required to use {self.__class__.__name__}.') database = sqlite3.connect(':memory:') database.text_factory = bytes database.deserialize(data) cursor = database.cursor() result: dict[str, list[dict[str, int | float | str | bytes]]] = {} listing: list[tuple[bytes, bytes]] = cursor.execute( "SELECT name, sql FROM sqlite_master WHERE type='table';").fetchall() for tbl, spec in listing: table = tbl.decode('utf8') result[table] = t = [] ct, _tbl, names = spec.partition(tbl) ct = ct.rstrip(B'"') names = names.lstrip(B'"') names = names.strip() names, _, _ = names.rpartition(B')') if ( tbl != _tbl or ct.strip().upper().split() != [B'CREATE', B'TABLE'] or not names.startswith(B'(') ): raise ValueError(F'Unexpeted SQL statement for {table} in master table: {spec}') names = [next(iter(name.strip().split())) for name in names[1:-1].decode().split(',')] for row in cursor.execute(F'SELECT * FROM {table}').fetchall(): t.append(dict(zip(names, row))) yield UnpackResult('db', functools.partial(_json, result)) for table, rows in result.items(): yield UnpackResult(F'db/{table}', functools.partial(_json, rows)) for k, row in enumerate(rows): root = F'db/{table}/{k}' yield UnpackResult(root, functools.partial(_json, row)) for name, value in row.items(): path = F'{root}/{name}' if value is None: continue if isinstance(value, (int, float)): value = str(value) if isinstance(value, str): value = value.encode(self.codec) if isinstance(value, bytes): yield UnpackResult(path, value) @classmethod def handles(cls, data): return memoryview(data)[:15] == B'SQLite format 3'Ancestors
Subclasses
Class variables
var required_dependenciesvar optional_dependenciesvar consolevar reverse
Methods
def unpack(self, data)-
Expand source code Browse git
def unpack(self, data: bytearray): def _json(object): with BytesAsStringEncoder as encoder: return encoder.dumps(object).encode(self.codec) if sys.version_info[:2] < (3, 11): raise NotImplementedError(F'python 3.11 is required to use {self.__class__.__name__}.') database = sqlite3.connect(':memory:') database.text_factory = bytes database.deserialize(data) cursor = database.cursor() result: dict[str, list[dict[str, int | float | str | bytes]]] = {} listing: list[tuple[bytes, bytes]] = cursor.execute( "SELECT name, sql FROM sqlite_master WHERE type='table';").fetchall() for tbl, spec in listing: table = tbl.decode('utf8') result[table] = t = [] ct, _tbl, names = spec.partition(tbl) ct = ct.rstrip(B'"') names = names.lstrip(B'"') names = names.strip() names, _, _ = names.rpartition(B')') if ( tbl != _tbl or ct.strip().upper().split() != [B'CREATE', B'TABLE'] or not names.startswith(B'(') ): raise ValueError(F'Unexpeted SQL statement for {table} in master table: {spec}') names = [next(iter(name.strip().split())) for name in names[1:-1].decode().split(',')] for row in cursor.execute(F'SELECT * FROM {table}').fetchall(): t.append(dict(zip(names, row))) yield UnpackResult('db', functools.partial(_json, result)) for table, rows in result.items(): yield UnpackResult(F'db/{table}', functools.partial(_json, rows)) for k, row in enumerate(rows): root = F'db/{table}/{k}' yield UnpackResult(root, functools.partial(_json, row)) for name, value in row.items(): path = F'{root}/{name}' if value is None: continue if isinstance(value, (int, float)): value = str(value) if isinstance(value, str): value = value.encode(self.codec) if isinstance(value, bytes): yield UnpackResult(path, value)
Inherited members