Module refinery.units.meta.xfcc
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from collections import defaultdict
from typing import Iterable, Dict
from refinery.units import Arg, Unit, Chunk
class xfcc(Unit):
"""
The cross frame chunk count unit! It computes the number of times a chunk occurs across several frames
of input. It consumes all frames at its current level of the frame tree and counts the number of times
each item occurs in each of them. It converts a frame tree of depth 2 into a new frame tree of depth 2
where the parent of every leaf has this leaf as its only child. The leaves of this tree have been
enriched with a meta variable containing the number of times the corresponding chunk has occurred in
the input frame tree. This unit can be used to compute set intersections across frames as follows:
(1) [| (2) [| dedup | xfcc -r t | iff t==1 ]| (3) ]
A sequence of chunks is emitted at (1), each of which has chunks extracted at (2). It is then important
to use dedup before calling xfcc, since xfcc performs an absolute count. The frame at (3) contains the
intersection of all datasets that were extracted at (2).
"""
def __init__(
self,
variable: Arg(help='The variable which is used as the accumulator') = 'count',
relative: Arg.Switch('-r', help='Normalize the accumulator to a number between 0 and 1.') = False
):
super().__init__(variable=variable, relative=relative)
self._trunk = None
self._store: Dict[Chunk, int] = defaultdict(int)
def finish(self):
vn = self.args.variable
rc = self.args.relative
if rc and self._store:
maximum = max(self._store.values())
for index, (chunk, count) in enumerate(self._store.items()):
if rc:
count /= maximum
chunk.path[-2] = 0
chunk.path[-1] = index
chunk.meta[vn] = count
yield chunk
self._store.clear()
def _getcount(self, chunk):
try:
count = int(chunk.meta[self.args.variable])
except (AttributeError, KeyError, TypeError):
return 1
else:
return count
def filter(self, chunks: Iterable[Chunk]):
it = iter(chunks)
try:
head = next(it)
except StopIteration:
return
if len(head.path) < 2:
self.log_warn(F'the current frame is nested {len(head.path)} layers deep, at least two layers are required.')
yield head
yield from it
return
trunk = head.path[:-2]
store = self._store
if trunk != self._trunk:
yield from self.finish()
self._trunk = trunk
store[head] += self._getcount(head)
for chunk in it:
store[chunk] += self._getcount(chunk)
Classes
class xfcc (variable='count', relative=False)
-
The cross frame chunk count unit! It computes the number of times a chunk occurs across several frames of input. It consumes all frames at its current level of the frame tree and counts the number of times each item occurs in each of them. It converts a frame tree of depth 2 into a new frame tree of depth 2 where the parent of every leaf has this leaf as its only child. The leaves of this tree have been enriched with a meta variable containing the number of times the corresponding chunk has occurred in the input frame tree. This unit can be used to compute set intersections across frames as follows:
(1) [| (2) [| dedup | xfcc -r t | iff t==1 ]| (3) ]
A sequence of chunks is emitted at (1), each of which has chunks extracted at (2). It is then important to use dedup before calling xfcc, since xfcc performs an absolute count. The frame at (3) contains the intersection of all datasets that were extracted at (2).
Expand source code Browse git
class xfcc(Unit): """ The cross frame chunk count unit! It computes the number of times a chunk occurs across several frames of input. It consumes all frames at its current level of the frame tree and counts the number of times each item occurs in each of them. It converts a frame tree of depth 2 into a new frame tree of depth 2 where the parent of every leaf has this leaf as its only child. The leaves of this tree have been enriched with a meta variable containing the number of times the corresponding chunk has occurred in the input frame tree. This unit can be used to compute set intersections across frames as follows: (1) [| (2) [| dedup | xfcc -r t | iff t==1 ]| (3) ] A sequence of chunks is emitted at (1), each of which has chunks extracted at (2). It is then important to use dedup before calling xfcc, since xfcc performs an absolute count. The frame at (3) contains the intersection of all datasets that were extracted at (2). """ def __init__( self, variable: Arg(help='The variable which is used as the accumulator') = 'count', relative: Arg.Switch('-r', help='Normalize the accumulator to a number between 0 and 1.') = False ): super().__init__(variable=variable, relative=relative) self._trunk = None self._store: Dict[Chunk, int] = defaultdict(int) def finish(self): vn = self.args.variable rc = self.args.relative if rc and self._store: maximum = max(self._store.values()) for index, (chunk, count) in enumerate(self._store.items()): if rc: count /= maximum chunk.path[-2] = 0 chunk.path[-1] = index chunk.meta[vn] = count yield chunk self._store.clear() def _getcount(self, chunk): try: count = int(chunk.meta[self.args.variable]) except (AttributeError, KeyError, TypeError): return 1 else: return count def filter(self, chunks: Iterable[Chunk]): it = iter(chunks) try: head = next(it) except StopIteration: return if len(head.path) < 2: self.log_warn(F'the current frame is nested {len(head.path)} layers deep, at least two layers are required.') yield head yield from it return trunk = head.path[:-2] store = self._store if trunk != self._trunk: yield from self.finish() self._trunk = trunk store[head] += self._getcount(head) for chunk in it: store[chunk] += self._getcount(chunk)
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Inherited members