Module refinery.units.meta.max

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Iterable
from refinery.units import Arg, Unit, Chunk
from refinery.lib.meta import metavars


class max_(Unit):
    """
    Picks the maximum of all elements in the current `refinery.lib.frame`.
    """

    def __init__(
        self,
        key: Arg('key', type=str, help='A meta variable expression to sort by instead of sorting the content.') = None,
    ):
        super().__init__(key=key)

    def filter(self, chunks: Iterable[Chunk]):
        def get_value(chunk: Chunk):
            if key is None:
                return chunk
            return metavars(chunk).get(key)

        key = self.args.key
        it = iter(chunks)

        for max_chunk in it:
            if not max_chunk.visible:
                yield max_chunk
            else:
                max_index = 0
                max_value = get_value(max_chunk)
                break
        else:
            return

        for index, chunk in enumerate(chunks, 1):
            if not chunk.visible:
                yield chunk
                continue
            value = get_value(chunk)
            try:
                is_max = value > max_value
            except TypeError:
                if max_value is None:
                    self.log_info(
                        F'Discarding chunk {max_index} in favor of {index} because {key} was not '
                        F'set on the former; new maximum is {value!r}.')
                    is_max = True
                else:
                    self.log_info(
                        F'Discarding chunk {index} because {key} had value {value!r}; it could not '
                        F'be compared to the current maximum {max_value!r} on chunk {max_index}.')
                    is_max = False
            if is_max:
                max_value = value
                max_chunk = chunk
                max_index = index

        yield max_chunk

Classes

class max_ (key=None)

Picks the maximum of all elements in the current refinery.lib.frame.

Expand source code Browse git
class max_(Unit):
    """
    Picks the maximum of all elements in the current `refinery.lib.frame`.
    """

    def __init__(
        self,
        key: Arg('key', type=str, help='A meta variable expression to sort by instead of sorting the content.') = None,
    ):
        super().__init__(key=key)

    def filter(self, chunks: Iterable[Chunk]):
        def get_value(chunk: Chunk):
            if key is None:
                return chunk
            return metavars(chunk).get(key)

        key = self.args.key
        it = iter(chunks)

        for max_chunk in it:
            if not max_chunk.visible:
                yield max_chunk
            else:
                max_index = 0
                max_value = get_value(max_chunk)
                break
        else:
            return

        for index, chunk in enumerate(chunks, 1):
            if not chunk.visible:
                yield chunk
                continue
            value = get_value(chunk)
            try:
                is_max = value > max_value
            except TypeError:
                if max_value is None:
                    self.log_info(
                        F'Discarding chunk {max_index} in favor of {index} because {key} was not '
                        F'set on the former; new maximum is {value!r}.')
                    is_max = True
                else:
                    self.log_info(
                        F'Discarding chunk {index} because {key} had value {value!r}; it could not '
                        F'be compared to the current maximum {max_value!r} on chunk {max_index}.')
                    is_max = False
            if is_max:
                max_value = value
                max_chunk = chunk
                max_index = index

        yield max_chunk

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Inherited members