Module refinery.units.meta.loop
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from refinery.units import RefineryPartialResult
from refinery.units.pattern import Arg, RegexUnit
from refinery.lib.argformats import regexp, DelayedBinaryArgument
from refinery.lib.tools import exception_to_string
class loop(RegexUnit):
"""
Applies a given multibin suffix to the input chunk repeatedly. For example, the following
command would carve the largest base64-encoded buffer from the input, decode it, and then
decompress the result 20 times:
emit data | loop 20 csd[b64]:zl
Notably, the argument after the count is a suffix, which means that handlers are applied
from left to right (not from right to left). The loop is aborted and the previous result
returned if the newly computed result is empty. If the an error occurs while computing
the suffix and the unit is lenient (i.e. the `-L` switch is set), the last known result
is returned.
"""
def __init__(
self,
count: Arg.Number(metavar='count', help='The number of repeated applications of the suffix.'),
suffix: Arg(type=str, help='A multibin expression suffix.'),
do_while: Arg('-w', '--while', type=regexp, metavar='RE',
help='Halt when the given regular expression does not match the data.'),
do_until: Arg('-u', '--until', type=regexp, metavar='RE',
help='Halt when the given regular expression matches the data.'),
fullmatch=False, multiline=False, ignorecase=False,
):
super().__init__(
count=count,
suffix=suffix,
do_while=do_while,
do_until=do_until,
fullmatch=fullmatch,
multiline=multiline,
ignorecase=ignorecase,
)
def process(self, data):
_count = self.args.count
_width = len(str(_count))
_while = self._while
_until = self._until
for k in range(_count):
if _while and not _while(data):
self.log_info(F'step {k:0{_width}}: stopping, while-condition violated')
break
if _until and _until(data):
self.log_info(F'step {k:0{_width}}: stopping, until-condition satisfied')
break
try:
out = DelayedBinaryArgument(
self.args.suffix, reverse=True, seed=data)(data)
except Exception as error:
self.log_info(F'step {k:0{_width}}: error;', exception_to_string(error))
msg = F'Stopped after {k} steps, increase verbosity for additional details.'
raise RefineryPartialResult(msg, data) from error
if not out:
self.log_info(F'step {k:0{_width}}: stopping after empty result')
break
data[:] = out
self.log_debug(F'step {k:0{_width}}: data =', data, clip=True)
return data
@property
def _while(self):
return self._make_matcher(self.args.do_while)
@property
def _until(self):
return self._make_matcher(self.args.do_until)
Classes
class loop (count, suffix, do_while, do_until, fullmatch=False, multiline=False, ignorecase=False)
-
Applies a given multibin suffix to the input chunk repeatedly. For example, the following command would carve the largest base64-encoded buffer from the input, decode it, and then decompress the result 20 times:
emit data | loop 20 csd[b64]:zl
Notably, the argument after the count is a suffix, which means that handlers are applied from left to right (not from right to left). The loop is aborted and the previous result returned if the newly computed result is empty. If the an error occurs while computing the suffix and the unit is lenient (i.e. the
-L
switch is set), the last known result is returned.Expand source code Browse git
class loop(RegexUnit): """ Applies a given multibin suffix to the input chunk repeatedly. For example, the following command would carve the largest base64-encoded buffer from the input, decode it, and then decompress the result 20 times: emit data | loop 20 csd[b64]:zl Notably, the argument after the count is a suffix, which means that handlers are applied from left to right (not from right to left). The loop is aborted and the previous result returned if the newly computed result is empty. If the an error occurs while computing the suffix and the unit is lenient (i.e. the `-L` switch is set), the last known result is returned. """ def __init__( self, count: Arg.Number(metavar='count', help='The number of repeated applications of the suffix.'), suffix: Arg(type=str, help='A multibin expression suffix.'), do_while: Arg('-w', '--while', type=regexp, metavar='RE', help='Halt when the given regular expression does not match the data.'), do_until: Arg('-u', '--until', type=regexp, metavar='RE', help='Halt when the given regular expression matches the data.'), fullmatch=False, multiline=False, ignorecase=False, ): super().__init__( count=count, suffix=suffix, do_while=do_while, do_until=do_until, fullmatch=fullmatch, multiline=multiline, ignorecase=ignorecase, ) def process(self, data): _count = self.args.count _width = len(str(_count)) _while = self._while _until = self._until for k in range(_count): if _while and not _while(data): self.log_info(F'step {k:0{_width}}: stopping, while-condition violated') break if _until and _until(data): self.log_info(F'step {k:0{_width}}: stopping, until-condition satisfied') break try: out = DelayedBinaryArgument( self.args.suffix, reverse=True, seed=data)(data) except Exception as error: self.log_info(F'step {k:0{_width}}: error;', exception_to_string(error)) msg = F'Stopped after {k} steps, increase verbosity for additional details.' raise RefineryPartialResult(msg, data) from error if not out: self.log_info(F'step {k:0{_width}}: stopping after empty result') break data[:] = out self.log_debug(F'step {k:0{_width}}: data =', data, clip=True) return data @property def _while(self): return self._make_matcher(self.args.do_while) @property def _until(self): return self._make_matcher(self.args.do_until)
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Inherited members