Module refinery.units.sinks

This module contains units who are primarily intended to sit at the end of a pipeline where they perform a final operation such as visualizing the incoming data in some way, or e.g. dumping it to disk.

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
This module contains units who are primarily intended to sit at the end of a pipeline where they
perform a final operation such as visualizing the incoming data in some way, or e.g. dumping it to
disk.
"""
import re
import io
import dataclasses

from typing import ByteString, Iterable, Optional

from refinery.units import Arg, Unit
from refinery.lib.tools import get_terminal_size, lookahead
from refinery.lib import chunks


@dataclasses.dataclass
class HexDumpMetrics:
    hex_columns: int = 0
    address_width: int = 0
    line_count: int = 0
    block_size: int = 1
    big_endian: bool = True
    padding: int = 0
    expand: bool = False
    max_width: int = 0
    txt_separator: str = '  '
    hex_char_prefix: str = ''
    hex_char_spacer: str = ' '
    hex_addr_spacer: str = ': '

    @property
    def hex_char_format(self):
        return F'{self.hex_char_prefix}{{:0{2 * self.block_size}X}}'

    @property
    def hex_column_width(self):
        return len(self.hex_char_format.format(0)) + len(self.hex_char_spacer)

    def get_max_width(self):
        width = self.max_width
        if not width:
            width = get_terminal_size()
            width = width and width or 75
            self.max_width = width
        return width

    def fit_to_width(self, width: int = 0, allow_increase: bool = False):
        padding = self.padding + len(self.txt_separator)
        if self.address_width:
            padding += self.address_width + len(self.hex_addr_spacer)
        width_max = width or self.get_max_width()
        width_available_for_hexdump = width_max - padding
        width_required_per_column = self.hex_column_width + self.block_size
        limit, r = divmod(width_available_for_hexdump, width_required_per_column)
        if r + len(self.hex_char_spacer) >= width_required_per_column:
            limit += 1
        if allow_increase or not self.hex_columns or limit < self.hex_columns:
            self.hex_columns = limit
        if self.address_width:
            gap = width_max - self.hexdump_width
            self.address_width += gap

    @property
    def hexdump_width(self):
        width = (self.hex_columns * (self.hex_column_width + self.block_size))
        width -= len(self.hex_char_spacer)
        width += len(self.txt_separator)
        if self.address_width:
            width += self.address_width + len(self.hex_addr_spacer)
        return width


def hexdump(data: ByteString, metrics: HexDumpMetrics, colorize=False) -> Iterable[str]:
    separator = metrics.hex_char_spacer
    hex_width = metrics.hex_column_width
    addr_width = metrics.address_width
    columns = metrics.hex_columns
    hexformat = metrics.hex_char_format
    printable = range(0x21, 0x7F)

    from colorama import Fore as FG
    color_reset = FG.RESET

    if columns <= 0:
        raise RuntimeError('Requested width is too small.')

    view = memoryview(data)
    step = columns * metrics.block_size
    previous = None
    repetitions = 0
    skipped = 0

    for last, (lno, offset) in lookahead(enumerate(range(0, len(data), step))):
        chunk = view[offset:offset + step]
        if not metrics.expand:
            if chunk == previous and not last:
                repetitions += 1
                continue
            elif repetitions > 0:
                format = ' {} repetitions'
                message = format.format(repetitions)
                pad = (hex_width * columns - len(format) + 1) // 2
                pad = pad - len(message) + len(format)
                line = ' ' * pad + message
                if colorize:
                    line = F'{FG.LIGHTBLACK_EX}{line}{color_reset}'
                if addr_width:
                    line = F'{".":.>{addr_width}}{metrics.hex_addr_spacer}{line}'
                yield line
                skipped += repetitions - 1
                repetitions = 0

        if metrics.line_count and lno - skipped >= metrics.line_count:
            break

        blocks = chunks.unpack(chunk, metrics.block_size, metrics.big_endian)

        if not colorize:
            color_prefix = ''
            dump = separator.join(hexformat.format(b) for b in blocks)
            ascii_preview = re.sub(B'[^!-~]', B'.', chunk).decode('ascii')
            line = (
                F'{dump:<{hex_width * columns - len(separator)}}'
                F'{metrics.txt_separator}{ascii_preview:<{columns}}'
            )
        else:
            def byte_color(value: int):
                if not value:
                    return FG.LIGHTBLACK_EX
                elif value in B'\x20\t\n\r':
                    return FG.CYAN
                elif value not in printable:
                    return FG.LIGHTRED_EX
                else:
                    return color_reset
            color_prefix = current_color = color_reset
            with io.StringIO() as _hex, io.StringIO() as _asc:
                block_size = metrics.block_size
                prefix = metrics.hex_char_prefix
                remaining_hex_width = hex_width * columns - len(separator)
                for k, b in enumerate(chunk):
                    if k % block_size == 0:
                        if k != 0:
                            _hex.write(separator)
                            remaining_hex_width -= len(separator)
                        if prefix:
                            _hex.write(prefix)
                            remaining_hex_width -= len(prefix)
                    color = byte_color(b)
                    if color != current_color:
                        _hex.write(color)
                        _asc.write(color)
                        current_color = color
                    _hex.write(F'{b:02X}')
                    remaining_hex_width -= 2
                    _asc.write(chr(b) if b in printable else '.')
                _hex.write(color_reset)
                _hex.write(' ' * remaining_hex_width)
                _asc.write(color_reset)
                line = F'{_hex.getvalue()}{metrics.txt_separator}{_asc.getvalue():<{columns}}'

        if addr_width:
            line = F'{color_prefix}{lno * columns:0{addr_width}X}: {line}'

        yield line

        if not metrics.expand:
            previous = chunk


class HexViewer(Unit, abstract=True):

    def __init__(
        self,
        blocks  : Arg.Number('-B', help='Group hexadecimal bytes in blocks of the given size; default is {default}.') = 1,
        dense   : Arg.Switch('-D', help='Do not insert spaces in hexdump.') = False,
        expand  : Arg.Switch('-E', help='Do not compress sequences of identical lines in hexdump') = False,
        narrow  : Arg.Switch('-N', help='Do not show addresses in hexdump') = False,
        width   : Arg.Number('-W', help='Specify the number of hexadecimal characters to use in preview.') = 0,
        **kwargs
    ):
        super().__init__(
            blocks=blocks,
            dense=dense,
            expand=expand,
            narrow=narrow,
            width=width,
            **kwargs
        )

    def _get_metrics(self, data_size: int, line_count: Optional[int] = None, padding: int = 0) -> HexDumpMetrics:
        blocks = self.args.blocks
        metrics = HexDumpMetrics(
            self.args.width,
            line_count=line_count,
            padding=padding,
            expand=self.args.expand,
            block_size=blocks,
        )
        if not self.args.narrow:
            metrics.address_width = len(F'{data_size:X}')
        if self.args.dense:
            metrics.hex_char_spacer = ''
        if not metrics.hex_columns:
            metrics.fit_to_width()
        return metrics

    def hexdump(self, data: ByteString, metrics: Optional[HexDumpMetrics] = None, colorize=False):
        metrics = metrics or self._get_metrics(len(data))
        yield from hexdump(data, metrics, colorize)

Sub-modules

refinery.units.sinks.asm
refinery.units.sinks.dump
refinery.units.sinks.iemap
refinery.units.sinks.peek
refinery.units.sinks.ppjscript
refinery.units.sinks.ppjson
refinery.units.sinks.ppxml

Functions

def hexdump(data, metrics, colorize=False)
Expand source code Browse git
def hexdump(data: ByteString, metrics: HexDumpMetrics, colorize=False) -> Iterable[str]:
    separator = metrics.hex_char_spacer
    hex_width = metrics.hex_column_width
    addr_width = metrics.address_width
    columns = metrics.hex_columns
    hexformat = metrics.hex_char_format
    printable = range(0x21, 0x7F)

    from colorama import Fore as FG
    color_reset = FG.RESET

    if columns <= 0:
        raise RuntimeError('Requested width is too small.')

    view = memoryview(data)
    step = columns * metrics.block_size
    previous = None
    repetitions = 0
    skipped = 0

    for last, (lno, offset) in lookahead(enumerate(range(0, len(data), step))):
        chunk = view[offset:offset + step]
        if not metrics.expand:
            if chunk == previous and not last:
                repetitions += 1
                continue
            elif repetitions > 0:
                format = ' {} repetitions'
                message = format.format(repetitions)
                pad = (hex_width * columns - len(format) + 1) // 2
                pad = pad - len(message) + len(format)
                line = ' ' * pad + message
                if colorize:
                    line = F'{FG.LIGHTBLACK_EX}{line}{color_reset}'
                if addr_width:
                    line = F'{".":.>{addr_width}}{metrics.hex_addr_spacer}{line}'
                yield line
                skipped += repetitions - 1
                repetitions = 0

        if metrics.line_count and lno - skipped >= metrics.line_count:
            break

        blocks = chunks.unpack(chunk, metrics.block_size, metrics.big_endian)

        if not colorize:
            color_prefix = ''
            dump = separator.join(hexformat.format(b) for b in blocks)
            ascii_preview = re.sub(B'[^!-~]', B'.', chunk).decode('ascii')
            line = (
                F'{dump:<{hex_width * columns - len(separator)}}'
                F'{metrics.txt_separator}{ascii_preview:<{columns}}'
            )
        else:
            def byte_color(value: int):
                if not value:
                    return FG.LIGHTBLACK_EX
                elif value in B'\x20\t\n\r':
                    return FG.CYAN
                elif value not in printable:
                    return FG.LIGHTRED_EX
                else:
                    return color_reset
            color_prefix = current_color = color_reset
            with io.StringIO() as _hex, io.StringIO() as _asc:
                block_size = metrics.block_size
                prefix = metrics.hex_char_prefix
                remaining_hex_width = hex_width * columns - len(separator)
                for k, b in enumerate(chunk):
                    if k % block_size == 0:
                        if k != 0:
                            _hex.write(separator)
                            remaining_hex_width -= len(separator)
                        if prefix:
                            _hex.write(prefix)
                            remaining_hex_width -= len(prefix)
                    color = byte_color(b)
                    if color != current_color:
                        _hex.write(color)
                        _asc.write(color)
                        current_color = color
                    _hex.write(F'{b:02X}')
                    remaining_hex_width -= 2
                    _asc.write(chr(b) if b in printable else '.')
                _hex.write(color_reset)
                _hex.write(' ' * remaining_hex_width)
                _asc.write(color_reset)
                line = F'{_hex.getvalue()}{metrics.txt_separator}{_asc.getvalue():<{columns}}'

        if addr_width:
            line = F'{color_prefix}{lno * columns:0{addr_width}X}: {line}'

        yield line

        if not metrics.expand:
            previous = chunk

Classes

class HexDumpMetrics (hex_columns=0, address_width=0, line_count=0, block_size=1, big_endian=True, padding=0, expand=False, max_width=0, txt_separator=' ', hex_char_prefix='', hex_char_spacer=' ', hex_addr_spacer=': ')

HexDumpMetrics(hex_columns: int = 0, address_width: int = 0, line_count: int = 0, block_size: int = 1, big_endian: bool = True, padding: int = 0, expand: bool = False, max_width: int = 0, txt_separator: str = ' ', hex_char_prefix: str = '', hex_char_spacer: str = ' ', hex_addr_spacer: str = ': ')

Expand source code Browse git
class HexDumpMetrics:
    hex_columns: int = 0
    address_width: int = 0
    line_count: int = 0
    block_size: int = 1
    big_endian: bool = True
    padding: int = 0
    expand: bool = False
    max_width: int = 0
    txt_separator: str = '  '
    hex_char_prefix: str = ''
    hex_char_spacer: str = ' '
    hex_addr_spacer: str = ': '

    @property
    def hex_char_format(self):
        return F'{self.hex_char_prefix}{{:0{2 * self.block_size}X}}'

    @property
    def hex_column_width(self):
        return len(self.hex_char_format.format(0)) + len(self.hex_char_spacer)

    def get_max_width(self):
        width = self.max_width
        if not width:
            width = get_terminal_size()
            width = width and width or 75
            self.max_width = width
        return width

    def fit_to_width(self, width: int = 0, allow_increase: bool = False):
        padding = self.padding + len(self.txt_separator)
        if self.address_width:
            padding += self.address_width + len(self.hex_addr_spacer)
        width_max = width or self.get_max_width()
        width_available_for_hexdump = width_max - padding
        width_required_per_column = self.hex_column_width + self.block_size
        limit, r = divmod(width_available_for_hexdump, width_required_per_column)
        if r + len(self.hex_char_spacer) >= width_required_per_column:
            limit += 1
        if allow_increase or not self.hex_columns or limit < self.hex_columns:
            self.hex_columns = limit
        if self.address_width:
            gap = width_max - self.hexdump_width
            self.address_width += gap

    @property
    def hexdump_width(self):
        width = (self.hex_columns * (self.hex_column_width + self.block_size))
        width -= len(self.hex_char_spacer)
        width += len(self.txt_separator)
        if self.address_width:
            width += self.address_width + len(self.hex_addr_spacer)
        return width

Class variables

var hex_columns
var address_width
var line_count
var block_size
var big_endian
var padding
var expand
var max_width
var txt_separator
var hex_char_prefix
var hex_char_spacer
var hex_addr_spacer

Instance variables

var hex_char_format
Expand source code Browse git
@property
def hex_char_format(self):
    return F'{self.hex_char_prefix}{{:0{2 * self.block_size}X}}'
var hex_column_width
Expand source code Browse git
@property
def hex_column_width(self):
    return len(self.hex_char_format.format(0)) + len(self.hex_char_spacer)
var hexdump_width
Expand source code Browse git
@property
def hexdump_width(self):
    width = (self.hex_columns * (self.hex_column_width + self.block_size))
    width -= len(self.hex_char_spacer)
    width += len(self.txt_separator)
    if self.address_width:
        width += self.address_width + len(self.hex_addr_spacer)
    return width

Methods

def get_max_width(self)
Expand source code Browse git
def get_max_width(self):
    width = self.max_width
    if not width:
        width = get_terminal_size()
        width = width and width or 75
        self.max_width = width
    return width
def fit_to_width(self, width=0, allow_increase=False)
Expand source code Browse git
def fit_to_width(self, width: int = 0, allow_increase: bool = False):
    padding = self.padding + len(self.txt_separator)
    if self.address_width:
        padding += self.address_width + len(self.hex_addr_spacer)
    width_max = width or self.get_max_width()
    width_available_for_hexdump = width_max - padding
    width_required_per_column = self.hex_column_width + self.block_size
    limit, r = divmod(width_available_for_hexdump, width_required_per_column)
    if r + len(self.hex_char_spacer) >= width_required_per_column:
        limit += 1
    if allow_increase or not self.hex_columns or limit < self.hex_columns:
        self.hex_columns = limit
    if self.address_width:
        gap = width_max - self.hexdump_width
        self.address_width += gap
class HexViewer (blocks=1, dense=False, expand=False, narrow=False, width=0, **kwargs)
Expand source code Browse git
class HexViewer(Unit, abstract=True):

    def __init__(
        self,
        blocks  : Arg.Number('-B', help='Group hexadecimal bytes in blocks of the given size; default is {default}.') = 1,
        dense   : Arg.Switch('-D', help='Do not insert spaces in hexdump.') = False,
        expand  : Arg.Switch('-E', help='Do not compress sequences of identical lines in hexdump') = False,
        narrow  : Arg.Switch('-N', help='Do not show addresses in hexdump') = False,
        width   : Arg.Number('-W', help='Specify the number of hexadecimal characters to use in preview.') = 0,
        **kwargs
    ):
        super().__init__(
            blocks=blocks,
            dense=dense,
            expand=expand,
            narrow=narrow,
            width=width,
            **kwargs
        )

    def _get_metrics(self, data_size: int, line_count: Optional[int] = None, padding: int = 0) -> HexDumpMetrics:
        blocks = self.args.blocks
        metrics = HexDumpMetrics(
            self.args.width,
            line_count=line_count,
            padding=padding,
            expand=self.args.expand,
            block_size=blocks,
        )
        if not self.args.narrow:
            metrics.address_width = len(F'{data_size:X}')
        if self.args.dense:
            metrics.hex_char_spacer = ''
        if not metrics.hex_columns:
            metrics.fit_to_width()
        return metrics

    def hexdump(self, data: ByteString, metrics: Optional[HexDumpMetrics] = None, colorize=False):
        metrics = metrics or self._get_metrics(len(data))
        yield from hexdump(data, metrics, colorize)

Ancestors

Subclasses

Class variables

var required_dependencies
var optional_dependencies

Methods

def hexdump(self, data, metrics=None, colorize=False)
Expand source code Browse git
def hexdump(self, data: ByteString, metrics: Optional[HexDumpMetrics] = None, colorize=False):
    metrics = metrics or self._get_metrics(len(data))
    yield from hexdump(data, metrics, colorize)

Inherited members