Module refinery.units.sinks.iemap

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations

import math
import itertools
import random

from refinery.units import Unit
from refinery.lib.tools import get_terminal_size, entropy
from refinery.lib.structures import MemoryFile
from refinery.lib.meta import metavars


class iemap(Unit):
    """
    The information entropy map displays a colored bar on the terminal visualizing the file's local
    entropy from beginning to end.
    """
    def __init__(
        self,
        legend: Unit.Arg.Switch('-l', help='Show entropy color legend.') = False,
        background: Unit.Arg.Switch('-b', help='Generate the bar by coloring the background.') = False,
        block_char: Unit.Arg('-c', '--block-char', type=str, metavar='C',
            help='Character used for filling the bar, default is {default}') = '#',
        *label: Unit.Arg(type=str, metavar='label-part', help=(
            'The remaining command line specifies a format string expression that will be printed '
            'over the heat map display of each processed chunk.'
        ))
    ):
        super().__init__(label=' '.join(label), background=background, legend=legend, block_char=block_char)

    @Unit.Requires('colorama', 'display', 'default', 'extended')
    def _colorama():
        import colorama
        return colorama

    def process(self, data):
        from sys import stderr
        from os import name as os_name
        colorama = self._colorama
        colorama.init(autoreset=False, convert=(os_name == 'nt'))

        nobg = not self.args.background
        meta = metavars(data)

        label = meta.format_str(self.args.label, self.codec, [data])
        if label:
            if not label.endswith(' '):
                label = F'{label} '
            if not label.startswith(' '):
                label = F' {label}'

        bgmap = [
            colorama.Back.BLACK,
            colorama.Back.WHITE,
            colorama.Back.YELLOW,
            colorama.Back.CYAN,
            colorama.Back.BLUE,
            colorama.Back.GREEN,
            colorama.Back.LIGHTRED_EX,
            colorama.Back.MAGENTA,
        ]
        fgmap = [
            colorama.Fore.LIGHTBLACK_EX,
            colorama.Fore.LIGHTWHITE_EX,
            colorama.Fore.LIGHTYELLOW_EX,
            colorama.Fore.LIGHTCYAN_EX,
            colorama.Fore.LIGHTBLUE_EX,
            colorama.Fore.LIGHTGREEN_EX,
            colorama.Fore.LIGHTRED_EX,
            colorama.Fore.LIGHTMAGENTA_EX,
        ]

        _reset = colorama.Back.BLACK + colorama.Fore.WHITE + colorama.Style.RESET_ALL

        clrmap = fgmap if nobg else bgmap
        header = '['
        header_length = 1
        footer_length = 4 + 7

        if self.args.legend:
            header = '[{1}{0}] {2}'.format(_reset, ''.join(F'{bg}{k}' for k, bg in enumerate(clrmap, 1)), header)
            header_length += 3 + len(clrmap)

        _tw = get_terminal_size()
        width = _tw - header_length - footer_length
        if width < 16:
            raise RuntimeError(F'computed terminal width {_tw} is too small for heatmap')

        def entropy_select(value, map):
            index = min(len(map) - 1, math.floor(value * len(map)))
            return map[index]

        view = memoryview(data)
        size = len(data)
        chunk_size = 0

        for block_size in range(1, width + 1):
            block_count = width // block_size
            chunk_size = size // block_count
            if chunk_size > 1024:
                break

        q, remainder = divmod(width, block_size)
        assert q == block_count
        indices = list(range(q))
        random.seed(sum(view[:1024]))
        random.shuffle(indices)

        block_sizes = [block_size] * q
        q, r = divmod(remainder, block_count)
        for i in indices:
            block_sizes[i] += q
        for i in indices[:r]:
            block_sizes[i] += 1
        assert sum(block_sizes) == width

        q, remainder = divmod(size, block_count)
        assert q == chunk_size
        chunk_sizes = [chunk_size] * block_count
        for i in indices[:remainder]:
            chunk_sizes[i] += 1
        assert sum(chunk_sizes) == size

        stream = MemoryFile(view)
        filler = self.args.block_char if nobg else ' '

        try:
            stderr.write(header)
            if label is not None:
                stderr.write(colorama.Fore.WHITE)
                stderr.flush()
            it = itertools.chain(itertools.repeat(filler, 3), label, itertools.cycle(filler))
            cp = None
            for chunk_size, block_size in zip(chunk_sizes, block_sizes):
                chunk = stream.read(chunk_size)
                chunk_entropy = entropy(chunk)
                pp = entropy_select(chunk_entropy, clrmap)
                string = ''.join(itertools.islice(it, block_size))
                if pp != cp:
                    string = F'{pp}{string}'
                cp = pp
                stderr.write(string)
                stderr.flush()
        except BaseException:
            eraser = ' ' * width
            stderr.write(F'\r{_reset}{eraser}\r')
            raise
        else:
            stderr.write(F'{_reset}] [---.--%]')
            te = meta['entropy']
            stderr.write('\b' * footer_length)
            stderr.write(F'] [{te!r:>7}]\n')
            stderr.flush()
        if not self.isatty:
            yield data

Classes

class iemap (legend=False, background=False, block_char='#', *label)

The information entropy map displays a colored bar on the terminal visualizing the file's local entropy from beginning to end.

Expand source code Browse git
class iemap(Unit):
    """
    The information entropy map displays a colored bar on the terminal visualizing the file's local
    entropy from beginning to end.
    """
    def __init__(
        self,
        legend: Unit.Arg.Switch('-l', help='Show entropy color legend.') = False,
        background: Unit.Arg.Switch('-b', help='Generate the bar by coloring the background.') = False,
        block_char: Unit.Arg('-c', '--block-char', type=str, metavar='C',
            help='Character used for filling the bar, default is {default}') = '#',
        *label: Unit.Arg(type=str, metavar='label-part', help=(
            'The remaining command line specifies a format string expression that will be printed '
            'over the heat map display of each processed chunk.'
        ))
    ):
        super().__init__(label=' '.join(label), background=background, legend=legend, block_char=block_char)

    @Unit.Requires('colorama', 'display', 'default', 'extended')
    def _colorama():
        import colorama
        return colorama

    def process(self, data):
        from sys import stderr
        from os import name as os_name
        colorama = self._colorama
        colorama.init(autoreset=False, convert=(os_name == 'nt'))

        nobg = not self.args.background
        meta = metavars(data)

        label = meta.format_str(self.args.label, self.codec, [data])
        if label:
            if not label.endswith(' '):
                label = F'{label} '
            if not label.startswith(' '):
                label = F' {label}'

        bgmap = [
            colorama.Back.BLACK,
            colorama.Back.WHITE,
            colorama.Back.YELLOW,
            colorama.Back.CYAN,
            colorama.Back.BLUE,
            colorama.Back.GREEN,
            colorama.Back.LIGHTRED_EX,
            colorama.Back.MAGENTA,
        ]
        fgmap = [
            colorama.Fore.LIGHTBLACK_EX,
            colorama.Fore.LIGHTWHITE_EX,
            colorama.Fore.LIGHTYELLOW_EX,
            colorama.Fore.LIGHTCYAN_EX,
            colorama.Fore.LIGHTBLUE_EX,
            colorama.Fore.LIGHTGREEN_EX,
            colorama.Fore.LIGHTRED_EX,
            colorama.Fore.LIGHTMAGENTA_EX,
        ]

        _reset = colorama.Back.BLACK + colorama.Fore.WHITE + colorama.Style.RESET_ALL

        clrmap = fgmap if nobg else bgmap
        header = '['
        header_length = 1
        footer_length = 4 + 7

        if self.args.legend:
            header = '[{1}{0}] {2}'.format(_reset, ''.join(F'{bg}{k}' for k, bg in enumerate(clrmap, 1)), header)
            header_length += 3 + len(clrmap)

        _tw = get_terminal_size()
        width = _tw - header_length - footer_length
        if width < 16:
            raise RuntimeError(F'computed terminal width {_tw} is too small for heatmap')

        def entropy_select(value, map):
            index = min(len(map) - 1, math.floor(value * len(map)))
            return map[index]

        view = memoryview(data)
        size = len(data)
        chunk_size = 0

        for block_size in range(1, width + 1):
            block_count = width // block_size
            chunk_size = size // block_count
            if chunk_size > 1024:
                break

        q, remainder = divmod(width, block_size)
        assert q == block_count
        indices = list(range(q))
        random.seed(sum(view[:1024]))
        random.shuffle(indices)

        block_sizes = [block_size] * q
        q, r = divmod(remainder, block_count)
        for i in indices:
            block_sizes[i] += q
        for i in indices[:r]:
            block_sizes[i] += 1
        assert sum(block_sizes) == width

        q, remainder = divmod(size, block_count)
        assert q == chunk_size
        chunk_sizes = [chunk_size] * block_count
        for i in indices[:remainder]:
            chunk_sizes[i] += 1
        assert sum(chunk_sizes) == size

        stream = MemoryFile(view)
        filler = self.args.block_char if nobg else ' '

        try:
            stderr.write(header)
            if label is not None:
                stderr.write(colorama.Fore.WHITE)
                stderr.flush()
            it = itertools.chain(itertools.repeat(filler, 3), label, itertools.cycle(filler))
            cp = None
            for chunk_size, block_size in zip(chunk_sizes, block_sizes):
                chunk = stream.read(chunk_size)
                chunk_entropy = entropy(chunk)
                pp = entropy_select(chunk_entropy, clrmap)
                string = ''.join(itertools.islice(it, block_size))
                if pp != cp:
                    string = F'{pp}{string}'
                cp = pp
                stderr.write(string)
                stderr.flush()
        except BaseException:
            eraser = ' ' * width
            stderr.write(F'\r{_reset}{eraser}\r')
            raise
        else:
            stderr.write(F'{_reset}] [---.--%]')
            te = meta['entropy']
            stderr.write('\b' * footer_length)
            stderr.write(F'] [{te!r:>7}]\n')
            stderr.flush()
        if not self.isatty:
            yield data

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Inherited members