Module refinery.units.formats.archive.xtsql
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import sqlite3
import sys
import functools
from refinery.units.formats import PathExtractorUnit, UnpackResult
from refinery.lib.json import BytesAsStringEncoder
class xtsql(PathExtractorUnit):
"""
Extract files from SQLite3 databases.
"""
def unpack(self, data: bytearray):
def _json(object):
with BytesAsStringEncoder as encoder:
return encoder.dumps(object).encode(self.codec)
if sys.version_info[:2] < (3, 11):
raise NotImplementedError(F'python 3.11 is required to use {self.__class__.__name__}.')
database = sqlite3.connect(':memory:')
database.deserialize(data)
cursor = database.cursor()
result: dict[str, list[dict[str, int | float | str | bytes]]] = {}
listing: list[tuple[str, str]] = cursor.execute(
"SELECT name, sql FROM sqlite_master WHERE type='table';").fetchall()
for table, spec in listing:
result[table] = t = []
ct, _table, names = spec.partition(table)
names = names.strip()
if (
table != _table
or ct.strip().upper().split() != ['CREATE', 'TABLE']
or not names.endswith(')')
or not names.startswith('(')
):
raise ValueError(F'Unexpeted SQL statement in master table: {spec}')
names = [next(iter(name.strip().split()))
for name in names[1:-1].split(',')]
for row in cursor.execute(F'SELECT * FROM {table}').fetchall():
t.append(dict(zip(names, row)))
yield UnpackResult('db', functools.partial(_json, result))
for table, rows in result.items():
yield UnpackResult(F'db/{table}', functools.partial(_json, rows))
for k, row in enumerate(rows):
root = F'db/{table}/{k}'
yield UnpackResult(root, functools.partial(_json, row))
for name, value in row.items():
path = F'{root}/{name}'
if value is None:
continue
if isinstance(value, (int, float)):
value = str(value)
if isinstance(value, str):
value = value.encode(self.codec)
if isinstance(value, bytes):
yield UnpackResult(path, value)
@classmethod
def handles(cls, data: bytearray):
return memoryview(data)[:15] == B'SQLite format 3'
Classes
class xtsql (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path')
-
Extract files from SQLite3 databases.
Expand source code Browse git
class xtsql(PathExtractorUnit): """ Extract files from SQLite3 databases. """ def unpack(self, data: bytearray): def _json(object): with BytesAsStringEncoder as encoder: return encoder.dumps(object).encode(self.codec) if sys.version_info[:2] < (3, 11): raise NotImplementedError(F'python 3.11 is required to use {self.__class__.__name__}.') database = sqlite3.connect(':memory:') database.deserialize(data) cursor = database.cursor() result: dict[str, list[dict[str, int | float | str | bytes]]] = {} listing: list[tuple[str, str]] = cursor.execute( "SELECT name, sql FROM sqlite_master WHERE type='table';").fetchall() for table, spec in listing: result[table] = t = [] ct, _table, names = spec.partition(table) names = names.strip() if ( table != _table or ct.strip().upper().split() != ['CREATE', 'TABLE'] or not names.endswith(')') or not names.startswith('(') ): raise ValueError(F'Unexpeted SQL statement in master table: {spec}') names = [next(iter(name.strip().split())) for name in names[1:-1].split(',')] for row in cursor.execute(F'SELECT * FROM {table}').fetchall(): t.append(dict(zip(names, row))) yield UnpackResult('db', functools.partial(_json, result)) for table, rows in result.items(): yield UnpackResult(F'db/{table}', functools.partial(_json, rows)) for k, row in enumerate(rows): root = F'db/{table}/{k}' yield UnpackResult(root, functools.partial(_json, row)) for name, value in row.items(): path = F'{root}/{name}' if value is None: continue if isinstance(value, (int, float)): value = str(value) if isinstance(value, str): value = value.encode(self.codec) if isinstance(value, bytes): yield UnpackResult(path, value) @classmethod def handles(cls, data: bytearray): return memoryview(data)[:15] == B'SQLite format 3'
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Methods
def unpack(self, data)
-
Expand source code Browse git
def unpack(self, data: bytearray): def _json(object): with BytesAsStringEncoder as encoder: return encoder.dumps(object).encode(self.codec) if sys.version_info[:2] < (3, 11): raise NotImplementedError(F'python 3.11 is required to use {self.__class__.__name__}.') database = sqlite3.connect(':memory:') database.deserialize(data) cursor = database.cursor() result: dict[str, list[dict[str, int | float | str | bytes]]] = {} listing: list[tuple[str, str]] = cursor.execute( "SELECT name, sql FROM sqlite_master WHERE type='table';").fetchall() for table, spec in listing: result[table] = t = [] ct, _table, names = spec.partition(table) names = names.strip() if ( table != _table or ct.strip().upper().split() != ['CREATE', 'TABLE'] or not names.endswith(')') or not names.startswith('(') ): raise ValueError(F'Unexpeted SQL statement in master table: {spec}') names = [next(iter(name.strip().split())) for name in names[1:-1].split(',')] for row in cursor.execute(F'SELECT * FROM {table}').fetchall(): t.append(dict(zip(names, row))) yield UnpackResult('db', functools.partial(_json, result)) for table, rows in result.items(): yield UnpackResult(F'db/{table}', functools.partial(_json, rows)) for k, row in enumerate(rows): root = F'db/{table}/{k}' yield UnpackResult(root, functools.partial(_json, row)) for name, value in row.items(): path = F'{root}/{name}' if value is None: continue if isinstance(value, (int, float)): value = str(value) if isinstance(value, str): value = value.encode(self.codec) if isinstance(value, bytes): yield UnpackResult(path, value)
Inherited members