Module refinery.units.meta.put
Expand source code Browse git
from __future__ import annotations
import itertools
from refinery.units import Arg, Unit, Chunk
from refinery.lib.tools import isbuffer, typename
from refinery.lib.meta import check_variable_name
_EMPTY = object()
class put(Unit):
"""
Can be used to add a meta variable to the processed chunk. Note that meta variables
cease to exist outside a frame.
"""
def __init__(
self,
name : Arg.String(help='The name of the variable to be used.'),
value: Arg.NumSeq(check=False, help=(
'The value for the variable. If no value is given, the entire current chunk is stored.'
)) = _EMPTY
):
super().__init__(name=check_variable_name(name), value=value)
def process(self, data: Chunk):
value = self.args.value
if value is _EMPTY:
value = data
if not isinstance(value, (int, float)) and not isbuffer(value):
try:
len(value)
except TypeError:
if isinstance(value, itertools.repeat):
value = next(value)
if not isinstance(value, (int, float)):
raise NotImplementedError(F'put does not support {value.__class__.__name__} values.')
else:
if not isinstance(value, (dict, list)):
value = list(value)
self.log_debug(F'storing {typename(value)}:', value, clip=True)
data.meta[self.args.name] = value
return data
Classes
class put (name, value=<object object>)
-
Can be used to add a meta variable to the processed chunk. Note that meta variables cease to exist outside a frame.
Expand source code Browse git
class put(Unit): """ Can be used to add a meta variable to the processed chunk. Note that meta variables cease to exist outside a frame. """ def __init__( self, name : Arg.String(help='The name of the variable to be used.'), value: Arg.NumSeq(check=False, help=( 'The value for the variable. If no value is given, the entire current chunk is stored.' )) = _EMPTY ): super().__init__(name=check_variable_name(name), value=value) def process(self, data: Chunk): value = self.args.value if value is _EMPTY: value = data if not isinstance(value, (int, float)) and not isbuffer(value): try: len(value) except TypeError: if isinstance(value, itertools.repeat): value = next(value) if not isinstance(value, (int, float)): raise NotImplementedError(F'put does not support {value.__class__.__name__} values.') else: if not isinstance(value, (dict, list)): value = list(value) self.log_debug(F'storing {typename(value)}:', value, clip=True) data.meta[self.args.name] = value return data
Ancestors
Subclasses
Class variables
var required_dependencies
var optional_dependencies
var console
var reverse
Inherited members