Module refinery.units.sinks.peek
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Generator, Optional
import sys
import os
import textwrap
import codecs
import itertools
import collections
from refinery.units.sinks import Arg, HexViewer
from refinery.lib.meta import ByteStringWrapper, metavars, CustomStringRepresentation, SizeInt
from refinery.lib.types import INF
from refinery.lib.tools import get_terminal_size, isbuffer
from refinery.lib.environment import environment
class peek(HexViewer):
"""
The unit extracts preview information of the input data and displays it on the standard error stream. If the standard
output of this unit is connected by a pipe, the incoming data is forwarded. However, if the unit outputs to a terminal,
the data is discarded instead.
"""
def __init__(
self,
lines : Arg.Number('-l', group='SIZE', help='Specify number N of lines in the preview, default is 10.') = 10,
all : Arg.Switch('-a', group='SIZE', help='Output all possible preview lines without restriction') = False,
brief : Arg.Switch('-b', group='SIZE', help='One line peek, implies --lines=1.') = False,
decode : Arg.Counts('-d', group='MODE', help=(
'Attempt to decode and display printable data. Specify twice to enable line wrapping.')) = 0,
escape : Arg.Switch('-e', group='MODE', help='Always peek data as string, escape characters if necessary.') = False,
bare : Arg.Switch('-r', group='META', help='Only peek the data itself, do not show a metadata preview.') = False,
meta : Arg.Counts('-m', group='META', help=(
'Show more auto-derivable metadata. Specify multiple times to populate more variables.')) = 0,
gray : Arg.Switch('-g', help='Do not colorize the output.') = False,
index : Arg.Switch('-i', help='Display the index of each chunk within the current frame.') = False,
stdout : Arg.Switch('-2', help='Print the peek to STDOUT rather than STDERR; the input data is lost.') = False,
narrow=False, blocks=1, dense=False, expand=False, width=0
):
if decode and escape:
raise ValueError('The decode and esc options are exclusive.')
if brief:
narrow = True
if environment.colorless.value:
gray = True
lines = 1 if brief else INF if all else lines
super(peek, self).__init__(
brief=brief,
gray=gray,
blocks=blocks,
decode=decode,
dense=dense,
index=index,
escape=escape,
expand=expand,
narrow=narrow,
lines=lines,
meta=meta,
bare=bare,
width=width,
stdout=stdout,
)
@HexViewer.Requires('colorama', 'display', 'default', 'extended')
def _colorama():
import colorama
return colorama
def process(self, data):
colorize = not self.args.gray and not self.args.stdout
lines = self._peeklines(data, colorize)
if self.args.stdout:
for line in lines:
yield line.encode(self.codec)
return
stderr = sys.stderr
if colorize:
colorama = self._colorama
if os.name == 'nt':
stderr = colorama.AnsiToWin32(stderr).stream
_erase = ' ' * get_terminal_size()
_reset = F'\r{colorama.Style.RESET_ALL}{_erase}\r'
else:
_reset = ''
try:
for line in lines:
print(line, file=stderr)
except BaseException:
stderr.write(_reset)
raise
if not self.isatty:
self.log_info('forwarding input to next unit')
yield data
def _peekmeta(self, linewidth, sep, meta: dict, peek=None) -> Generator[str, None, None]:
if not meta and not peek:
return
width = max((len(name) for name in meta), default=0)
separators = iter([sep])
if peek is not None:
if len(peek) > linewidth:
peek = peek[:linewidth - 3] + '...'
yield from separators
yield peek
for name in sorted(meta):
value = meta[name]
if value is None:
continue
if isinstance(value, CustomStringRepresentation):
value = repr(value).strip()
elif isbuffer(value):
value = repr(ByteStringWrapper(value))
elif isinstance(value, int):
if value in range(-999, 1000):
value = str(value)
elif value > 0:
value = F'0x{value:X}'
else:
value = F'-0x{-value:X}'
elif isinstance(value, float):
value = F'{value:.4f}'
metavar = F'{name:>{width + 2}} = {value!s}'
if len(metavar) > linewidth:
metavar = metavar[:linewidth - 3] + '...'
yield from separators
yield metavar
def _trydecode(self, data, codec: Optional[str], width: int, linecount: int) -> str:
remaining = linecount
result = []
wrap = self.args.decode > 1
if codec is None:
from refinery.units.encoding.esc import esc
decoded = data[:abs(width * linecount)]
decoded = str(decoded | -esc(bare=True))
limit = abs(min(linecount * width, len(decoded)))
for k in range(0, limit, width):
result.append(decoded[k:k + width])
return result
try:
import unicodedata
unprintable = {'Cc', 'Cf', 'Co', 'Cs'}
self.log_info(F'trying to decode as {codec}.')
decoded = codecs.decode(data, codec, errors='strict')
count = sum(unicodedata.category(c) not in unprintable for c in decoded)
ratio = count / len(decoded)
except UnicodeDecodeError as DE:
self.log_info('decoding failed:', DE.reason)
return None
except ValueError as V:
self.log_info('decoding failed:', V)
return None
if ratio < 0.8:
self.log_info(F'data contains {ratio * 100:.2f}% printable characters, this is too low.')
return None
decoded = decoded.splitlines(False)
if not wrap:
for k, line in enumerate(decoded):
line = line.replace('\t', '\x20' * 4)
if len(line) <= width:
continue
clipped = line[:width - 3]
if self.args.gray:
color = ''
reset = ''
else:
colorama = self._colorama
color = colorama.Fore.LIGHTRED_EX
reset = colorama.Style.RESET_ALL
decoded[k] = F'{clipped}{color}...{reset}'
return decoded[:abs(linecount)]
for paragraph in decoded:
if not remaining:
break
wrapped = [
line for chunk in textwrap.wrap(
paragraph,
width,
break_long_words=True,
break_on_hyphens=False,
drop_whitespace=False,
expand_tabs=True,
max_lines=abs(remaining + 1),
replace_whitespace=False,
tabsize=4,
)
for line in chunk.splitlines(keepends=False)
]
remaining -= len(wrapped)
result.extend(wrapped)
return result[:abs(linecount)]
def _peeklines(self, data: bytearray, colorize: bool) -> Generator[str, None, None]:
meta = metavars(data)
codec = None
lines = None
final = data.temp or False
empty = True
if not self.args.index:
meta.discard('index')
index = None
else:
index = meta.get('index', None)
if not self.args.brief:
padding = 0
else:
padding = SizeInt.width + 2
if index is not None:
padding += 6
metrics = self._get_metrics(len(data), self.args.lines, padding)
if self.args.brief:
metrics.address_width = 0
metrics.fit_to_width(allow_increase=True)
sepsize = metrics.hexdump_width
txtsize = self.args.width or sepsize
if self.args.lines and data:
if self.args.escape:
lines = self._trydecode(data, None, txtsize, metrics.line_count)
if self.args.decode > 0:
for codec in ('utf8', 'utf-16le', 'utf-16', 'utf-16be'):
lines = self._trydecode(data, codec, txtsize, metrics.line_count)
if lines:
codec = codec
break
else:
codec = None
if lines is None:
lines = list(self.hexdump(data, metrics, colorize))
else:
sepsize = txtsize
def separator(title=None):
if title is None or sepsize <= len(title) + 8:
return sepsize * '-'
return '-' * (sepsize - len(title) - 5) + F'[{title}]---'
if self.args.brief:
final = False
elif not self.args.bare:
peek = repr(meta.size)
line = separator()
if len(data) <= 5_000_000:
peek = F'{peek}; {meta.entropy!r} entropy'
peek = F'{peek}; {meta.magic!s}'
if self.args.lines == 0:
peek = None
elif not data:
peek = None
line = separator('empty chunk')
if self.args.meta > 0:
meta.derive('size')
meta.derive('magic')
meta.derive('entropy')
peek = None
if self.args.meta > 1:
meta.derive('crc32')
meta.derive('sha256')
if self.args.meta > 2:
for name in meta.derivations:
meta[name]
for line in self._peekmeta(metrics.hexdump_width, line, meta, peek=peek):
empty = False
yield line
if lines:
empty = False
if not self.args.brief:
yield separator(codec or None)
yield from lines
else:
brief = next(iter(lines))
brief = F'{SizeInt(len(data))!r}: {brief}'
if index is not None:
brief = F'#{index:03d}: {brief}'
yield brief
if final and (self.args.bare or not empty):
yield separator()
def filter(self, chunks):
try:
self._colorama.init(wrap=False)
except ImportError:
pass
discarded = 0
it = iter(chunks)
buffer = collections.deque(itertools.islice(it, 0, 2))
buffer.reverse()
while buffer:
if self.isatty and not buffer[0].visible:
buffer.popleft()
discarded += 1
else:
item = buffer.pop()
last = not bool(buffer)
item.temp = last
if not item.visible and self.isatty:
discarded += 1
else:
yield item
try:
buffer.appendleft(next(it))
except StopIteration:
pass
if discarded:
self.log_warn(F'discarded {discarded} invisible chunks to prevent them from leaking into the terminal.')
Classes
class peek (lines=10, all=False, brief=False, decode=0, escape=False, bare=False, meta=0, gray=False, index=False, stdout=False, narrow=False, blocks=1, dense=False, expand=False, width=0)
-
The unit extracts preview information of the input data and displays it on the standard error stream. If the standard output of this unit is connected by a pipe, the incoming data is forwarded. However, if the unit outputs to a terminal, the data is discarded instead.
Expand source code Browse git
class peek(HexViewer): """ The unit extracts preview information of the input data and displays it on the standard error stream. If the standard output of this unit is connected by a pipe, the incoming data is forwarded. However, if the unit outputs to a terminal, the data is discarded instead. """ def __init__( self, lines : Arg.Number('-l', group='SIZE', help='Specify number N of lines in the preview, default is 10.') = 10, all : Arg.Switch('-a', group='SIZE', help='Output all possible preview lines without restriction') = False, brief : Arg.Switch('-b', group='SIZE', help='One line peek, implies --lines=1.') = False, decode : Arg.Counts('-d', group='MODE', help=( 'Attempt to decode and display printable data. Specify twice to enable line wrapping.')) = 0, escape : Arg.Switch('-e', group='MODE', help='Always peek data as string, escape characters if necessary.') = False, bare : Arg.Switch('-r', group='META', help='Only peek the data itself, do not show a metadata preview.') = False, meta : Arg.Counts('-m', group='META', help=( 'Show more auto-derivable metadata. Specify multiple times to populate more variables.')) = 0, gray : Arg.Switch('-g', help='Do not colorize the output.') = False, index : Arg.Switch('-i', help='Display the index of each chunk within the current frame.') = False, stdout : Arg.Switch('-2', help='Print the peek to STDOUT rather than STDERR; the input data is lost.') = False, narrow=False, blocks=1, dense=False, expand=False, width=0 ): if decode and escape: raise ValueError('The decode and esc options are exclusive.') if brief: narrow = True if environment.colorless.value: gray = True lines = 1 if brief else INF if all else lines super(peek, self).__init__( brief=brief, gray=gray, blocks=blocks, decode=decode, dense=dense, index=index, escape=escape, expand=expand, narrow=narrow, lines=lines, meta=meta, bare=bare, width=width, stdout=stdout, ) @HexViewer.Requires('colorama', 'display', 'default', 'extended') def _colorama(): import colorama return colorama def process(self, data): colorize = not self.args.gray and not self.args.stdout lines = self._peeklines(data, colorize) if self.args.stdout: for line in lines: yield line.encode(self.codec) return stderr = sys.stderr if colorize: colorama = self._colorama if os.name == 'nt': stderr = colorama.AnsiToWin32(stderr).stream _erase = ' ' * get_terminal_size() _reset = F'\r{colorama.Style.RESET_ALL}{_erase}\r' else: _reset = '' try: for line in lines: print(line, file=stderr) except BaseException: stderr.write(_reset) raise if not self.isatty: self.log_info('forwarding input to next unit') yield data def _peekmeta(self, linewidth, sep, meta: dict, peek=None) -> Generator[str, None, None]: if not meta and not peek: return width = max((len(name) for name in meta), default=0) separators = iter([sep]) if peek is not None: if len(peek) > linewidth: peek = peek[:linewidth - 3] + '...' yield from separators yield peek for name in sorted(meta): value = meta[name] if value is None: continue if isinstance(value, CustomStringRepresentation): value = repr(value).strip() elif isbuffer(value): value = repr(ByteStringWrapper(value)) elif isinstance(value, int): if value in range(-999, 1000): value = str(value) elif value > 0: value = F'0x{value:X}' else: value = F'-0x{-value:X}' elif isinstance(value, float): value = F'{value:.4f}' metavar = F'{name:>{width + 2}} = {value!s}' if len(metavar) > linewidth: metavar = metavar[:linewidth - 3] + '...' yield from separators yield metavar def _trydecode(self, data, codec: Optional[str], width: int, linecount: int) -> str: remaining = linecount result = [] wrap = self.args.decode > 1 if codec is None: from refinery.units.encoding.esc import esc decoded = data[:abs(width * linecount)] decoded = str(decoded | -esc(bare=True)) limit = abs(min(linecount * width, len(decoded))) for k in range(0, limit, width): result.append(decoded[k:k + width]) return result try: import unicodedata unprintable = {'Cc', 'Cf', 'Co', 'Cs'} self.log_info(F'trying to decode as {codec}.') decoded = codecs.decode(data, codec, errors='strict') count = sum(unicodedata.category(c) not in unprintable for c in decoded) ratio = count / len(decoded) except UnicodeDecodeError as DE: self.log_info('decoding failed:', DE.reason) return None except ValueError as V: self.log_info('decoding failed:', V) return None if ratio < 0.8: self.log_info(F'data contains {ratio * 100:.2f}% printable characters, this is too low.') return None decoded = decoded.splitlines(False) if not wrap: for k, line in enumerate(decoded): line = line.replace('\t', '\x20' * 4) if len(line) <= width: continue clipped = line[:width - 3] if self.args.gray: color = '' reset = '' else: colorama = self._colorama color = colorama.Fore.LIGHTRED_EX reset = colorama.Style.RESET_ALL decoded[k] = F'{clipped}{color}...{reset}' return decoded[:abs(linecount)] for paragraph in decoded: if not remaining: break wrapped = [ line for chunk in textwrap.wrap( paragraph, width, break_long_words=True, break_on_hyphens=False, drop_whitespace=False, expand_tabs=True, max_lines=abs(remaining + 1), replace_whitespace=False, tabsize=4, ) for line in chunk.splitlines(keepends=False) ] remaining -= len(wrapped) result.extend(wrapped) return result[:abs(linecount)] def _peeklines(self, data: bytearray, colorize: bool) -> Generator[str, None, None]: meta = metavars(data) codec = None lines = None final = data.temp or False empty = True if not self.args.index: meta.discard('index') index = None else: index = meta.get('index', None) if not self.args.brief: padding = 0 else: padding = SizeInt.width + 2 if index is not None: padding += 6 metrics = self._get_metrics(len(data), self.args.lines, padding) if self.args.brief: metrics.address_width = 0 metrics.fit_to_width(allow_increase=True) sepsize = metrics.hexdump_width txtsize = self.args.width or sepsize if self.args.lines and data: if self.args.escape: lines = self._trydecode(data, None, txtsize, metrics.line_count) if self.args.decode > 0: for codec in ('utf8', 'utf-16le', 'utf-16', 'utf-16be'): lines = self._trydecode(data, codec, txtsize, metrics.line_count) if lines: codec = codec break else: codec = None if lines is None: lines = list(self.hexdump(data, metrics, colorize)) else: sepsize = txtsize def separator(title=None): if title is None or sepsize <= len(title) + 8: return sepsize * '-' return '-' * (sepsize - len(title) - 5) + F'[{title}]---' if self.args.brief: final = False elif not self.args.bare: peek = repr(meta.size) line = separator() if len(data) <= 5_000_000: peek = F'{peek}; {meta.entropy!r} entropy' peek = F'{peek}; {meta.magic!s}' if self.args.lines == 0: peek = None elif not data: peek = None line = separator('empty chunk') if self.args.meta > 0: meta.derive('size') meta.derive('magic') meta.derive('entropy') peek = None if self.args.meta > 1: meta.derive('crc32') meta.derive('sha256') if self.args.meta > 2: for name in meta.derivations: meta[name] for line in self._peekmeta(metrics.hexdump_width, line, meta, peek=peek): empty = False yield line if lines: empty = False if not self.args.brief: yield separator(codec or None) yield from lines else: brief = next(iter(lines)) brief = F'{SizeInt(len(data))!r}: {brief}' if index is not None: brief = F'#{index:03d}: {brief}' yield brief if final and (self.args.bare or not empty): yield separator() def filter(self, chunks): try: self._colorama.init(wrap=False) except ImportError: pass discarded = 0 it = iter(chunks) buffer = collections.deque(itertools.islice(it, 0, 2)) buffer.reverse() while buffer: if self.isatty and not buffer[0].visible: buffer.popleft() discarded += 1 else: item = buffer.pop() last = not bool(buffer) item.temp = last if not item.visible and self.isatty: discarded += 1 else: yield item try: buffer.appendleft(next(it)) except StopIteration: pass if discarded: self.log_warn(F'discarded {discarded} invisible chunks to prevent them from leaking into the terminal.')
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Inherited members