Module refinery.units.formats.sqlite
Expand source code Browse git
from __future__ import annotations
import json
import sqlite3
from refinery.lib.types import Param
from refinery.units import Arg, Unit
class sqlite(Unit):
"""
Extracts data from SQLite3 databases. Each row is returned as a single output chunk in JSON
format. If no query is provided, the unit will extract all table metadata from the database.
"""
def __init__(
self,
query: Param[
str, Arg.String('query', help='The SQL query to execute.')
] = "SELECT * FROM sqlite_master WHERE type='table';",
):
super().__init__(query=query)
def process(self, data):
try:
with sqlite3.connect(':memory:') as database:
try:
database.deserialize(data)
except AttributeError:
raise NotImplementedError(F'Python >= 3.11 is required to use {self.name}.')
cursor = database.cursor().execute(self.args.query)
fields = (
[i[0] for i in cursor.description] if cursor.description else []
)
for row in cursor:
if fields:
cleaned_row = {}
for i in range(len(fields)):
if isinstance(row[i], bytes):
cleaned_row[i] = row[i].decode(self.codec)
else:
cleaned_row[fields[i]] = row[i]
yield json.dumps(cleaned_row).encode(self.codec)
else:
yield json.dumps(list(row)).encode(self.codec)
except sqlite3.Error as e:
raise ValueError(F'Failed to process SQLite database: {e}')
Classes
class sqlite (query="SELECT * FROM sqlite_master WHERE type='table';")-
Extracts data from SQLite3 databases. Each row is returned as a single output chunk in JSON format. If no query is provided, the unit will extract all table metadata from the database.
Expand source code Browse git
class sqlite(Unit): """ Extracts data from SQLite3 databases. Each row is returned as a single output chunk in JSON format. If no query is provided, the unit will extract all table metadata from the database. """ def __init__( self, query: Param[ str, Arg.String('query', help='The SQL query to execute.') ] = "SELECT * FROM sqlite_master WHERE type='table';", ): super().__init__(query=query) def process(self, data): try: with sqlite3.connect(':memory:') as database: try: database.deserialize(data) except AttributeError: raise NotImplementedError(F'Python >= 3.11 is required to use {self.name}.') cursor = database.cursor().execute(self.args.query) fields = ( [i[0] for i in cursor.description] if cursor.description else [] ) for row in cursor: if fields: cleaned_row = {} for i in range(len(fields)): if isinstance(row[i], bytes): cleaned_row[i] = row[i].decode(self.codec) else: cleaned_row[fields[i]] = row[i] yield json.dumps(cleaned_row).encode(self.codec) else: yield json.dumps(list(row)).encode(self.codec) except sqlite3.Error as e: raise ValueError(F'Failed to process SQLite database: {e}')Ancestors
Subclasses
Class variables
var required_dependenciesvar optional_dependenciesvar consolevar reverse
Inherited members