Module refinery.units.sinks.peek

Expand source code Browse git
from __future__ import annotations

import codecs
import collections
import itertools
import os
import sys
import textwrap

from typing import Generator

from refinery.lib.environment import environment
from refinery.lib.meta import (
    ByteStringWrapper,
    CustomStringRepresentation,
    LazyMetaOracle,
    SizeInt,
    metavars,
)
from refinery.lib.tools import get_terminal_size, isbuffer
from refinery.lib.types import INF, Param
from refinery.units import Chunk
from refinery.units.sinks import Arg, HexViewer


class peek(HexViewer):
    """
    The unit extracts preview information of the input data and displays it on the standard error stream. If the standard
    output of this unit is connected by a pipe, the incoming data is forwarded. However, if the unit outputs to a terminal,
    the data is discarded instead.
    """

    def __init__(
        self,
        lines: Param[int, Arg.Number('-l', group='SIZE', help='Specify number N of lines in the preview, default is 10.')] = 10,
        all: Param[bool, Arg.Switch('-a', group='SIZE', help='Output all possible preview lines without restriction')] = False,
        brief: Param[bool, Arg.Switch('-b', group='SIZE', help='One line peek, implies --lines=1.')] = False,
        decode: Param[int, Arg.Counts('-d', group='MODE', help=(
            'Attempt to decode and display printable data. Specify twice to enable line wrapping.'))] = 0,
        escape: Param[bool, Arg.Switch('-e', group='MODE', help='Always peek data as string, escape characters if necessary.')] = False,
        bare: Param[bool, Arg.Switch('-r', group='META', help='Only peek the data itself, do not show a metadata preview.')] = False,
        meta: Param[int, Arg.Counts('-m', group='META', help=(
            'Show more auto-derivable metadata. Specify multiple times to populate more variables.'))] = 0,
        gray: Param[bool, Arg.Switch('-g', help='Do not colorize the output.')] = False,
        index: Param[bool, Arg.Switch('-i', help='Display the index of each chunk within the current frame.')] = False,
        stdout: Param[bool, Arg.Switch('-2', help='Print the peek to STDOUT rather than STDERR; the input data is lost.')] = False,
        narrow=False, blocks=1, dense=False, expand=False, width=0
    ):
        if decode and escape:
            raise ValueError('The decode and esc options are exclusive.')
        if brief:
            narrow = True
        if environment.colorless.value:
            gray = True
        lines = 1 if brief else INF if all else lines
        super().__init__(
            brief=brief,
            gray=gray,
            blocks=blocks,
            decode=decode,
            dense=dense,
            index=index,
            escape=escape,
            expand=expand,
            narrow=narrow,
            lines=lines,
            meta=meta,
            bare=bare,
            width=width,
            stdout=stdout,
        )

    @HexViewer.Requires('colorama', ['display', 'default', 'extended'])
    def _colorama():
        import colorama
        return colorama

    def process(self, data):
        colorize = not self.args.gray and not self.args.stdout
        lines = self._peeklines(data, colorize)

        if self.args.stdout:
            for line in lines:
                yield line.encode(self.codec)
            return

        stderr = sys.stderr

        if colorize:
            colorama = self._colorama
            if os.name == 'nt':
                stderr = colorama.AnsiToWin32(stderr).stream
            _erase = ' ' * get_terminal_size()
            _reset = F'\r{colorama.Style.RESET_ALL}{_erase}\r'
        else:
            _reset = ''

        try:
            for line in lines:
                print(line, file=stderr)
        except BaseException:
            stderr.write(_reset)
            raise
        if not self.isatty():
            self.log_info('forwarding input to next unit')
            yield data

    def _peekmeta(self, linewidth, sep, meta: dict, peek=None) -> Generator[str]:
        if not meta and not peek:
            return
        width = max((len(name) for name in meta), default=0)
        separators = iter([sep])
        if peek is not None:
            if len(peek) > linewidth:
                peek = peek[:linewidth - 3] + '...'
            yield from separators
            yield peek
        for name in sorted(meta, key=lambda s: (len(s) <= 3, s)):
            if not self.args.index and name == LazyMetaOracle.IndexKey:
                continue
            value = meta[name]
            if value is None:
                continue
            if isinstance(value, CustomStringRepresentation):
                value = repr(value).strip()
            elif isbuffer(value):
                value = repr(ByteStringWrapper(value))
            elif isinstance(value, int):
                if value in range(-999, 1000):
                    value = str(value)
                elif value > 0:
                    value = F'0x{value:X}'
                else:
                    value = F'-0x{-value:X}'
            elif isinstance(value, float):
                value = F'{value:.4f}'
            metavar = F'{name:>{width + 2}} = {value!s}'
            if len(metavar) > linewidth:
                metavar = metavar[:linewidth - 3] + '...'
            yield from separators
            yield metavar

    def _trydecode(self, data, codec: str | None, width: int, linecount: int) -> str:
        remaining = linecount
        result = []
        wrap = self.args.decode > 1
        if codec is None:
            from refinery.units.encoding.esc import esc
            decoded = data[:abs(width * linecount)]
            decoded = str(decoded | -esc(bare=True))
            limit = abs(min(linecount * width, len(decoded)))
            for k in range(0, limit, width):
                result.append(decoded[k:k + width])
            return result
        try:
            import unicodedata
            unprintable = {'Cc', 'Cf', 'Co', 'Cs'}
            self.log_info(F'trying to decode as {codec}.')
            decoded = codecs.decode(data, codec, errors='strict')
            count = sum(unicodedata.category(c) not in unprintable for c in decoded)
            ratio = count / len(decoded)
        except UnicodeDecodeError as DE:
            self.log_info('decoding failed:', DE.reason)
            return None
        except ValueError as V:
            self.log_info('decoding failed:', V)
            return None
        if ratio < 0.8:
            self.log_info(F'data contains {ratio * 100:.2f}% printable characters, this is too low.')
            return None
        decoded = decoded.splitlines(False)
        if not wrap:
            for k, line in enumerate(decoded):
                line = line.replace('\t', '\x20' * 4)
                if len(line) <= width:
                    continue
                clipped = line[:width - 3]
                if self.args.gray:
                    color = ''
                    reset = ''
                else:
                    colorama = self._colorama
                    color = colorama.Fore.LIGHTRED_EX
                    reset = colorama.Style.RESET_ALL
                decoded[k] = F'{clipped}{color}...{reset}'
            return decoded[:abs(linecount)]
        for paragraph in decoded:
            if not remaining:
                break
            wrapped = [
                line for chunk in textwrap.wrap(
                    paragraph,
                    width,
                    break_long_words=True,
                    break_on_hyphens=False,
                    drop_whitespace=False,
                    expand_tabs=True,
                    max_lines=abs(remaining + 1),
                    replace_whitespace=False,
                    tabsize=4,
                )
                for line in chunk.splitlines(keepends=False)
            ]
            remaining -= len(wrapped)
            result.extend(wrapped)
        return result[:abs(linecount)]

    def _peeklines(self, data: Chunk, colorize: bool) -> Generator[str]:

        meta = metavars(data)

        codec = None
        lines = None
        final = data.temp or False
        empty = True

        if not self.args.index:
            index = None
        else:
            index = meta.get('index', None)

        if not self.args.brief:
            padding = 0
        else:
            padding = SizeInt.width + 2
            if index is not None:
                padding += 6

        metrics = self._get_metrics(len(data), self.args.lines, padding)

        if self.args.brief:
            metrics.address_width = 0
            metrics.fit_to_width(allow_increase=True)

        sepsize = metrics.hexdump_width
        txtsize = self.args.width or sepsize

        if self.args.lines and data:
            if self.args.escape:
                lines = self._trydecode(data, None, txtsize, metrics.line_count)
            if self.args.decode > 0:
                for codec in ('utf8', 'cp1251', 'cp1252', 'utf-16le', 'utf-16', 'utf-16be'):
                    lines = self._trydecode(data, codec, txtsize, metrics.line_count)
                    if lines:
                        codec = codec
                        break
                else:
                    codec = None
            if lines is None:
                lines = list(self.hexdump(data, metrics, colorize))
            else:
                sepsize = txtsize

        def separator(title=None):
            if title is None or sepsize <= len(title) + 8:
                return sepsize * '-'
            return '-' * (sepsize - len(title) - 5) + F'[{title}]---'

        if self.args.brief:
            final = False
        elif not self.args.bare:
            peek = repr(meta.size)
            line = separator()
            if len(data) <= 5_000_000:
                peek = F'{peek}; {meta.entropy!r} entropy'
            peek = F'{peek}; {meta.magic!s}'
            if self.args.lines == 0:
                peek = None
            elif not data:
                peek = None
                line = separator('empty chunk')
            if self.args.meta > 0:
                meta.derive('size')
                meta.derive('magic')
                meta.derive('entropy')
                peek = None
            if self.args.meta > 1:
                meta.derive('crc32')
                meta.derive('sha256')
            if self.args.meta > 2:
                for name in meta.derivations:
                    meta[name]
            for line in self._peekmeta(metrics.hexdump_width, line, meta, peek=peek):
                empty = False
                yield line

        if lines:
            empty = False
            if not self.args.brief:
                yield separator(codec or None)
                yield from lines
            else:
                brief = next(iter(lines))
                brief = F'{SizeInt(len(data))!r}: {brief}'
                if index is not None:
                    brief = F'#{index:03d}: {brief}'
                yield brief

        if final and (self.args.bare or not empty):
            yield separator()

    def filter(self, chunks):
        try:
            self._colorama.init(wrap=False)
        except ImportError:
            pass

        discarded = 0

        if self.args.brief:
            for chunk in chunks:
                if not chunk.visible and self.isatty():
                    discarded += 1
                    continue
                self.log_debug(chunk)
                yield chunk
        else:
            it = iter(chunks)
            buffer = collections.deque(itertools.islice(it, 0, 2))
            buffer.reverse()
            while buffer:
                if self.isatty() and not buffer[0].visible:
                    buffer.popleft()
                    discarded += 1
                else:
                    item = buffer.pop()
                    last = not bool(buffer)
                    item.temp = last
                    if not item.visible and self.isatty():
                        discarded += 1
                    else:
                        yield item
                try:
                    buffer.appendleft(next(it))
                except StopIteration:
                    pass

        if discarded:
            self.log_warn(F'discarded {discarded} invisible chunks to prevent them from leaking into the terminal.')

Classes

class peek (lines=10, all=False, brief=False, decode=0, escape=False, bare=False, meta=0, gray=False, index=False, stdout=False, narrow=False, blocks=1, dense=False, expand=False, width=0)

The unit extracts preview information of the input data and displays it on the standard error stream. If the standard output of this unit is connected by a pipe, the incoming data is forwarded. However, if the unit outputs to a terminal, the data is discarded instead.

Expand source code Browse git
class peek(HexViewer):
    """
    The unit extracts preview information of the input data and displays it on the standard error stream. If the standard
    output of this unit is connected by a pipe, the incoming data is forwarded. However, if the unit outputs to a terminal,
    the data is discarded instead.
    """

    def __init__(
        self,
        lines: Param[int, Arg.Number('-l', group='SIZE', help='Specify number N of lines in the preview, default is 10.')] = 10,
        all: Param[bool, Arg.Switch('-a', group='SIZE', help='Output all possible preview lines without restriction')] = False,
        brief: Param[bool, Arg.Switch('-b', group='SIZE', help='One line peek, implies --lines=1.')] = False,
        decode: Param[int, Arg.Counts('-d', group='MODE', help=(
            'Attempt to decode and display printable data. Specify twice to enable line wrapping.'))] = 0,
        escape: Param[bool, Arg.Switch('-e', group='MODE', help='Always peek data as string, escape characters if necessary.')] = False,
        bare: Param[bool, Arg.Switch('-r', group='META', help='Only peek the data itself, do not show a metadata preview.')] = False,
        meta: Param[int, Arg.Counts('-m', group='META', help=(
            'Show more auto-derivable metadata. Specify multiple times to populate more variables.'))] = 0,
        gray: Param[bool, Arg.Switch('-g', help='Do not colorize the output.')] = False,
        index: Param[bool, Arg.Switch('-i', help='Display the index of each chunk within the current frame.')] = False,
        stdout: Param[bool, Arg.Switch('-2', help='Print the peek to STDOUT rather than STDERR; the input data is lost.')] = False,
        narrow=False, blocks=1, dense=False, expand=False, width=0
    ):
        if decode and escape:
            raise ValueError('The decode and esc options are exclusive.')
        if brief:
            narrow = True
        if environment.colorless.value:
            gray = True
        lines = 1 if brief else INF if all else lines
        super().__init__(
            brief=brief,
            gray=gray,
            blocks=blocks,
            decode=decode,
            dense=dense,
            index=index,
            escape=escape,
            expand=expand,
            narrow=narrow,
            lines=lines,
            meta=meta,
            bare=bare,
            width=width,
            stdout=stdout,
        )

    @HexViewer.Requires('colorama', ['display', 'default', 'extended'])
    def _colorama():
        import colorama
        return colorama

    def process(self, data):
        colorize = not self.args.gray and not self.args.stdout
        lines = self._peeklines(data, colorize)

        if self.args.stdout:
            for line in lines:
                yield line.encode(self.codec)
            return

        stderr = sys.stderr

        if colorize:
            colorama = self._colorama
            if os.name == 'nt':
                stderr = colorama.AnsiToWin32(stderr).stream
            _erase = ' ' * get_terminal_size()
            _reset = F'\r{colorama.Style.RESET_ALL}{_erase}\r'
        else:
            _reset = ''

        try:
            for line in lines:
                print(line, file=stderr)
        except BaseException:
            stderr.write(_reset)
            raise
        if not self.isatty():
            self.log_info('forwarding input to next unit')
            yield data

    def _peekmeta(self, linewidth, sep, meta: dict, peek=None) -> Generator[str]:
        if not meta and not peek:
            return
        width = max((len(name) for name in meta), default=0)
        separators = iter([sep])
        if peek is not None:
            if len(peek) > linewidth:
                peek = peek[:linewidth - 3] + '...'
            yield from separators
            yield peek
        for name in sorted(meta, key=lambda s: (len(s) <= 3, s)):
            if not self.args.index and name == LazyMetaOracle.IndexKey:
                continue
            value = meta[name]
            if value is None:
                continue
            if isinstance(value, CustomStringRepresentation):
                value = repr(value).strip()
            elif isbuffer(value):
                value = repr(ByteStringWrapper(value))
            elif isinstance(value, int):
                if value in range(-999, 1000):
                    value = str(value)
                elif value > 0:
                    value = F'0x{value:X}'
                else:
                    value = F'-0x{-value:X}'
            elif isinstance(value, float):
                value = F'{value:.4f}'
            metavar = F'{name:>{width + 2}} = {value!s}'
            if len(metavar) > linewidth:
                metavar = metavar[:linewidth - 3] + '...'
            yield from separators
            yield metavar

    def _trydecode(self, data, codec: str | None, width: int, linecount: int) -> str:
        remaining = linecount
        result = []
        wrap = self.args.decode > 1
        if codec is None:
            from refinery.units.encoding.esc import esc
            decoded = data[:abs(width * linecount)]
            decoded = str(decoded | -esc(bare=True))
            limit = abs(min(linecount * width, len(decoded)))
            for k in range(0, limit, width):
                result.append(decoded[k:k + width])
            return result
        try:
            import unicodedata
            unprintable = {'Cc', 'Cf', 'Co', 'Cs'}
            self.log_info(F'trying to decode as {codec}.')
            decoded = codecs.decode(data, codec, errors='strict')
            count = sum(unicodedata.category(c) not in unprintable for c in decoded)
            ratio = count / len(decoded)
        except UnicodeDecodeError as DE:
            self.log_info('decoding failed:', DE.reason)
            return None
        except ValueError as V:
            self.log_info('decoding failed:', V)
            return None
        if ratio < 0.8:
            self.log_info(F'data contains {ratio * 100:.2f}% printable characters, this is too low.')
            return None
        decoded = decoded.splitlines(False)
        if not wrap:
            for k, line in enumerate(decoded):
                line = line.replace('\t', '\x20' * 4)
                if len(line) <= width:
                    continue
                clipped = line[:width - 3]
                if self.args.gray:
                    color = ''
                    reset = ''
                else:
                    colorama = self._colorama
                    color = colorama.Fore.LIGHTRED_EX
                    reset = colorama.Style.RESET_ALL
                decoded[k] = F'{clipped}{color}...{reset}'
            return decoded[:abs(linecount)]
        for paragraph in decoded:
            if not remaining:
                break
            wrapped = [
                line for chunk in textwrap.wrap(
                    paragraph,
                    width,
                    break_long_words=True,
                    break_on_hyphens=False,
                    drop_whitespace=False,
                    expand_tabs=True,
                    max_lines=abs(remaining + 1),
                    replace_whitespace=False,
                    tabsize=4,
                )
                for line in chunk.splitlines(keepends=False)
            ]
            remaining -= len(wrapped)
            result.extend(wrapped)
        return result[:abs(linecount)]

    def _peeklines(self, data: Chunk, colorize: bool) -> Generator[str]:

        meta = metavars(data)

        codec = None
        lines = None
        final = data.temp or False
        empty = True

        if not self.args.index:
            index = None
        else:
            index = meta.get('index', None)

        if not self.args.brief:
            padding = 0
        else:
            padding = SizeInt.width + 2
            if index is not None:
                padding += 6

        metrics = self._get_metrics(len(data), self.args.lines, padding)

        if self.args.brief:
            metrics.address_width = 0
            metrics.fit_to_width(allow_increase=True)

        sepsize = metrics.hexdump_width
        txtsize = self.args.width or sepsize

        if self.args.lines and data:
            if self.args.escape:
                lines = self._trydecode(data, None, txtsize, metrics.line_count)
            if self.args.decode > 0:
                for codec in ('utf8', 'cp1251', 'cp1252', 'utf-16le', 'utf-16', 'utf-16be'):
                    lines = self._trydecode(data, codec, txtsize, metrics.line_count)
                    if lines:
                        codec = codec
                        break
                else:
                    codec = None
            if lines is None:
                lines = list(self.hexdump(data, metrics, colorize))
            else:
                sepsize = txtsize

        def separator(title=None):
            if title is None or sepsize <= len(title) + 8:
                return sepsize * '-'
            return '-' * (sepsize - len(title) - 5) + F'[{title}]---'

        if self.args.brief:
            final = False
        elif not self.args.bare:
            peek = repr(meta.size)
            line = separator()
            if len(data) <= 5_000_000:
                peek = F'{peek}; {meta.entropy!r} entropy'
            peek = F'{peek}; {meta.magic!s}'
            if self.args.lines == 0:
                peek = None
            elif not data:
                peek = None
                line = separator('empty chunk')
            if self.args.meta > 0:
                meta.derive('size')
                meta.derive('magic')
                meta.derive('entropy')
                peek = None
            if self.args.meta > 1:
                meta.derive('crc32')
                meta.derive('sha256')
            if self.args.meta > 2:
                for name in meta.derivations:
                    meta[name]
            for line in self._peekmeta(metrics.hexdump_width, line, meta, peek=peek):
                empty = False
                yield line

        if lines:
            empty = False
            if not self.args.brief:
                yield separator(codec or None)
                yield from lines
            else:
                brief = next(iter(lines))
                brief = F'{SizeInt(len(data))!r}: {brief}'
                if index is not None:
                    brief = F'#{index:03d}: {brief}'
                yield brief

        if final and (self.args.bare or not empty):
            yield separator()

    def filter(self, chunks):
        try:
            self._colorama.init(wrap=False)
        except ImportError:
            pass

        discarded = 0

        if self.args.brief:
            for chunk in chunks:
                if not chunk.visible and self.isatty():
                    discarded += 1
                    continue
                self.log_debug(chunk)
                yield chunk
        else:
            it = iter(chunks)
            buffer = collections.deque(itertools.islice(it, 0, 2))
            buffer.reverse()
            while buffer:
                if self.isatty() and not buffer[0].visible:
                    buffer.popleft()
                    discarded += 1
                else:
                    item = buffer.pop()
                    last = not bool(buffer)
                    item.temp = last
                    if not item.visible and self.isatty():
                        discarded += 1
                    else:
                        yield item
                try:
                    buffer.appendleft(next(it))
                except StopIteration:
                    pass

        if discarded:
            self.log_warn(F'discarded {discarded} invisible chunks to prevent them from leaking into the terminal.')

Ancestors

Subclasses

Class variables

var required_dependencies
var console
var reverse
var optional_dependencies

Inherited members