Module refinery.units.meta.eat

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from refinery.units import Arg, Unit, Chunk
from refinery.lib.meta import check_variable_name, metavars
from refinery.lib.tools import isbuffer


class eat(Unit):
    """
    Consume a meta variable and replace the contents of the current chunk with it. If the variable
    contains a string, it is encoded with the default codec. If the variable cannot be converted to
    a byte string, the data is lost and an empty chunk is returned.
    """
    def __init__(
        self,
        name: Arg(help='The name of the variable to be used.', type=str),
    ):
        super().__init__(name=check_variable_name(name))

    def process(self, data: Chunk):
        def invalid_type():
            return F'variable {name} is of type "{type}", unable to convert to byte string - data is lost'
        name = self.args.name
        meta = metavars(data)
        data = meta.pop(name)
        type = data.__class__.__name__
        if isinstance(data, int):
            self.log_info(F'variable {name} is an integer, converting to string.')
            data = str(data).encode(self.codec)
        if isinstance(data, str):
            self.log_info(F'variable {name} is a string, encoding as {self.codec}')
            data = data.encode(self.codec)
        elif not isbuffer(data):
            try:
                wrapped = bytearray(data)
            except Exception:
                self.log_warn(invalid_type())
                data = None
            else:
                data = wrapped
        return data

Classes

class eat (name)

Consume a meta variable and replace the contents of the current chunk with it. If the variable contains a string, it is encoded with the default codec. If the variable cannot be converted to a byte string, the data is lost and an empty chunk is returned.

Expand source code Browse git
class eat(Unit):
    """
    Consume a meta variable and replace the contents of the current chunk with it. If the variable
    contains a string, it is encoded with the default codec. If the variable cannot be converted to
    a byte string, the data is lost and an empty chunk is returned.
    """
    def __init__(
        self,
        name: Arg(help='The name of the variable to be used.', type=str),
    ):
        super().__init__(name=check_variable_name(name))

    def process(self, data: Chunk):
        def invalid_type():
            return F'variable {name} is of type "{type}", unable to convert to byte string - data is lost'
        name = self.args.name
        meta = metavars(data)
        data = meta.pop(name)
        type = data.__class__.__name__
        if isinstance(data, int):
            self.log_info(F'variable {name} is an integer, converting to string.')
            data = str(data).encode(self.codec)
        if isinstance(data, str):
            self.log_info(F'variable {name} is a string, encoding as {self.codec}')
            data = data.encode(self.codec)
        elif not isbuffer(data):
            try:
                wrapped = bytearray(data)
            except Exception:
                self.log_warn(invalid_type())
                data = None
            else:
                data = wrapped
        return data

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Inherited members