Module refinery.units.meta.dedup

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from refinery.units import Unit, Arg
from refinery.lib.tools import isbuffer
from refinery.lib.meta import metavars
from refinery.lib.argformats import PythonExpression

from hashlib import md5


class dedup(Unit):
    """
    Deduplicates a sequence of multiple inputs. The deduplication is limited to the current `refinery.lib.frame`.
    """
    def __init__(
        self,
        key: Arg('key', type=str, help='An optional meta variable expression to deduplicate.') = None,
        count: Arg.Switch('-c', help='Store the count of each deduplicated chunk.') = False
    ):
        super().__init__(key=key, count=count)

    def filter(self, chunks):
        keyvar = self.args.key

        if keyvar is not None:
            def key(chunk):
                v = PythonExpression.Evaluate(keyvar, metavars(chunk))
                if isbuffer(v):
                    v = md5(v).digest()
                return v
        else:
            def key(chunk):
                return md5(chunk).digest()

        if self.args.count:
            counts = {}
            buffer = {}
            hashes = None
        else:
            hashes = set()
            counts = None
            buffer = None

        for chunk in chunks:
            if not chunk.visible:
                yield chunk
                continue

            uid = key(chunk)

            if hashes is None:
                counts[uid] = counts.get(uid, 0) + 1
                buffer.setdefault(uid, chunk)
            elif uid in hashes:
                continue
            else:
                hashes.add(uid)
                yield chunk

        if hashes is None:
            for uid, chunk in buffer.items():
                yield self.labelled(chunk, count=counts[uid])

Classes

class dedup (key=None, count=False)

Deduplicates a sequence of multiple inputs. The deduplication is limited to the current refinery.lib.frame.

Expand source code Browse git
class dedup(Unit):
    """
    Deduplicates a sequence of multiple inputs. The deduplication is limited to the current `refinery.lib.frame`.
    """
    def __init__(
        self,
        key: Arg('key', type=str, help='An optional meta variable expression to deduplicate.') = None,
        count: Arg.Switch('-c', help='Store the count of each deduplicated chunk.') = False
    ):
        super().__init__(key=key, count=count)

    def filter(self, chunks):
        keyvar = self.args.key

        if keyvar is not None:
            def key(chunk):
                v = PythonExpression.Evaluate(keyvar, metavars(chunk))
                if isbuffer(v):
                    v = md5(v).digest()
                return v
        else:
            def key(chunk):
                return md5(chunk).digest()

        if self.args.count:
            counts = {}
            buffer = {}
            hashes = None
        else:
            hashes = set()
            counts = None
            buffer = None

        for chunk in chunks:
            if not chunk.visible:
                yield chunk
                continue

            uid = key(chunk)

            if hashes is None:
                counts[uid] = counts.get(uid, 0) + 1
                buffer.setdefault(uid, chunk)
            elif uid in hashes:
                continue
            else:
                hashes.add(uid)
                yield chunk

        if hashes is None:
            for uid, chunk in buffer.items():
                yield self.labelled(chunk, count=counts[uid])

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Inherited members