Module refinery.units.formats.csv

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import List, Dict, Any

import csv as _csv
import io
import json

from refinery.units import Unit
from refinery.lib.structures import MemoryFile
from refinery.lib.tools import isodate


class csv(Unit):
    """
    Extracts the rows of a CSV document with header and converts them into JSON chunks.
    """
    def __init__(
        self,
        quote: Unit.Arg('-q', help='Specify the quote character, the default is a double quote.') = B'"',
        delim: Unit.Arg('-d', help='Specify the delimiter, the default is a single comma.') = B','
    ):
        super().__init__(quote=quote, delim=delim)

    def json_to_csv(self, table: dict):
        quote = self.args.quote.decode(self.codec)
        delim = self.args.delim.decode(self.codec)

        if not isinstance(table, list):
            raise ValueError('Input must be a JSON list.')

        out = MemoryFile()

        with io.TextIOWrapper(out, self.codec, newline='') as stream:
            writer = _csv.writer(stream, quotechar=quote, delimiter=delim, skipinitialspace=True)
            for row in table:
                if not isinstance(row, list):
                    break
                if not all(isinstance(item, str) for item in row):
                    break
                writer.writerow(row)
            else:
                return out.getvalue()

        keys = {}
        # A dictionary is used here over a set because dictionaries remember insertion order.
        # When feeding the unit a sequence of JSON objects, the user would likely expect the
        # column order in the resulting CSV to derive from the entry oder in the JSON data.

        for row in table:
            for key in row:
                if not isinstance(key, str):
                    continue
                keys[key] = None

        keys = list(keys)
        out = MemoryFile()

        with io.TextIOWrapper(out, self.codec, newline='') as stream:
            writer = _csv.writer(stream, quotechar=quote, delimiter=delim, skipinitialspace=True)
            writer.writerow(keys)
            for row in table:
                writer.writerow([str(row.get(key, '')) for key in keys])
            return out.getvalue()

    def reverse(self, data: bytearray):
        try:
            table: List[Dict[str, Any]] = json.loads(data)
        except Exception:
            table: List[Dict[str, Any]] = [json.loads(line) for line in data.splitlines()]
        return self.json_to_csv(table)

    def process(self, data):
        quote = self.args.quote.decode(self.codec)
        delim = self.args.delim.decode(self.codec)

        def convert(field: str):
            if field.isdigit() and not field.startswith('0'):
                return int(field)
            date = isodate(field)
            if date is not None:
                return date.isoformat(' ', 'seconds')
            return field

        with io.TextIOWrapper(MemoryFile(data), self.codec) as stream:
            rows = _csv.reader(stream, quotechar=quote, delimiter=delim, skipinitialspace=True)
            keys = next(rows)
            for row in rows:
                out = {key: convert(value) for key, value in zip(keys, row)}
                yield json.dumps(out, indent=4).encode(self.codec)

Classes

class csv (quote=b'"', delim=b',')

Extracts the rows of a CSV document with header and converts them into JSON chunks.

Expand source code Browse git
class csv(Unit):
    """
    Extracts the rows of a CSV document with header and converts them into JSON chunks.
    """
    def __init__(
        self,
        quote: Unit.Arg('-q', help='Specify the quote character, the default is a double quote.') = B'"',
        delim: Unit.Arg('-d', help='Specify the delimiter, the default is a single comma.') = B','
    ):
        super().__init__(quote=quote, delim=delim)

    def json_to_csv(self, table: dict):
        quote = self.args.quote.decode(self.codec)
        delim = self.args.delim.decode(self.codec)

        if not isinstance(table, list):
            raise ValueError('Input must be a JSON list.')

        out = MemoryFile()

        with io.TextIOWrapper(out, self.codec, newline='') as stream:
            writer = _csv.writer(stream, quotechar=quote, delimiter=delim, skipinitialspace=True)
            for row in table:
                if not isinstance(row, list):
                    break
                if not all(isinstance(item, str) for item in row):
                    break
                writer.writerow(row)
            else:
                return out.getvalue()

        keys = {}
        # A dictionary is used here over a set because dictionaries remember insertion order.
        # When feeding the unit a sequence of JSON objects, the user would likely expect the
        # column order in the resulting CSV to derive from the entry oder in the JSON data.

        for row in table:
            for key in row:
                if not isinstance(key, str):
                    continue
                keys[key] = None

        keys = list(keys)
        out = MemoryFile()

        with io.TextIOWrapper(out, self.codec, newline='') as stream:
            writer = _csv.writer(stream, quotechar=quote, delimiter=delim, skipinitialspace=True)
            writer.writerow(keys)
            for row in table:
                writer.writerow([str(row.get(key, '')) for key in keys])
            return out.getvalue()

    def reverse(self, data: bytearray):
        try:
            table: List[Dict[str, Any]] = json.loads(data)
        except Exception:
            table: List[Dict[str, Any]] = [json.loads(line) for line in data.splitlines()]
        return self.json_to_csv(table)

    def process(self, data):
        quote = self.args.quote.decode(self.codec)
        delim = self.args.delim.decode(self.codec)

        def convert(field: str):
            if field.isdigit() and not field.startswith('0'):
                return int(field)
            date = isodate(field)
            if date is not None:
                return date.isoformat(' ', 'seconds')
            return field

        with io.TextIOWrapper(MemoryFile(data), self.codec) as stream:
            rows = _csv.reader(stream, quotechar=quote, delimiter=delim, skipinitialspace=True)
            keys = next(rows)
            for row in rows:
                out = {key: convert(value) for key, value in zip(keys, row)}
                yield json.dumps(out, indent=4).encode(self.codec)

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Methods

def json_to_csv(self, table)
Expand source code Browse git
def json_to_csv(self, table: dict):
    quote = self.args.quote.decode(self.codec)
    delim = self.args.delim.decode(self.codec)

    if not isinstance(table, list):
        raise ValueError('Input must be a JSON list.')

    out = MemoryFile()

    with io.TextIOWrapper(out, self.codec, newline='') as stream:
        writer = _csv.writer(stream, quotechar=quote, delimiter=delim, skipinitialspace=True)
        for row in table:
            if not isinstance(row, list):
                break
            if not all(isinstance(item, str) for item in row):
                break
            writer.writerow(row)
        else:
            return out.getvalue()

    keys = {}
    # A dictionary is used here over a set because dictionaries remember insertion order.
    # When feeding the unit a sequence of JSON objects, the user would likely expect the
    # column order in the resulting CSV to derive from the entry oder in the JSON data.

    for row in table:
        for key in row:
            if not isinstance(key, str):
                continue
            keys[key] = None

    keys = list(keys)
    out = MemoryFile()

    with io.TextIOWrapper(out, self.codec, newline='') as stream:
        writer = _csv.writer(stream, quotechar=quote, delimiter=delim, skipinitialspace=True)
        writer.writerow(keys)
        for row in table:
            writer.writerow([str(row.get(key, '')) for key in keys])
        return out.getvalue()

Inherited members