Module refinery.units.scripting
Expand source code Browse git
from __future__ import annotations
import abc
import codecs
import sys
from refinery.lib.id import guess_text_encoding
from refinery.lib.scripts import Node
from refinery.lib.types import Param, buf
from refinery.units import Arg, Chunk, RefineryPartialResult, Unit
class AutoDeobfuscationTimeout(RefineryPartialResult):
def __init__(self, partial):
super().__init__(
'The deobfuscation timeout was reached before the data stabilized.',
partial=partial,
)
class IterativeDeobfuscator(Unit, abstract=True):
def __init__(
self,
timeout: Param[int, Arg.Number(
'-t', help='Maximum number of iterations; the default is {default}.')] = 500,
):
super().__init__(timeout=timeout)
@abc.abstractmethod
def parse(self, data: str) -> Node:
...
@abc.abstractmethod
def transform(self, ast: Node) -> int:
...
@abc.abstractmethod
def synthesize(self, ast: Node) -> str:
...
def process(self, data: Chunk) -> buf:
old_limit = sys.getrecursionlimit()
sys.setrecursionlimit(max(old_limit, 10000))
try:
return self._process(data)
finally:
sys.setrecursionlimit(old_limit)
def _process(self, data: Chunk) -> buf:
def _result():
return codecs.encode(
self.synthesize(ast), self.codec, errors='surrogateescape')
self.log_info('parsing input data')
view = memoryview(data)
if format := guess_text_encoding(data):
codec = format.codec
view = view[format.bom:]
else:
codec = self.codec
txt = codecs.decode(view, codec, errors='surrogateescape')
ast = self.parse(txt)
for k in range(self.args.timeout):
self.log_info(F'starting round {k}')
if self.transform(ast):
continue
return _result()
else:
raise AutoDeobfuscationTimeout(_result())
Sub-modules
refinery.units.scripting.batrefinery.units.scripting.cmdargrefinery.units.scripting.deobfuscaterefinery.units.scripting.jsrefinery.units.scripting.ps1refinery.units.scripting.vba
Classes
class AutoDeobfuscationTimeout (partial)-
This exception indicates that a partial result is available.
Expand source code Browse git
class AutoDeobfuscationTimeout(RefineryPartialResult): def __init__(self, partial): super().__init__( 'The deobfuscation timeout was reached before the data stabilized.', partial=partial, )Ancestors
- RefineryPartialResult
- builtins.ValueError
- builtins.Exception
- builtins.BaseException
class IterativeDeobfuscator (timeout=500)-
Expand source code Browse git
class IterativeDeobfuscator(Unit, abstract=True): def __init__( self, timeout: Param[int, Arg.Number( '-t', help='Maximum number of iterations; the default is {default}.')] = 500, ): super().__init__(timeout=timeout) @abc.abstractmethod def parse(self, data: str) -> Node: ... @abc.abstractmethod def transform(self, ast: Node) -> int: ... @abc.abstractmethod def synthesize(self, ast: Node) -> str: ... def process(self, data: Chunk) -> buf: old_limit = sys.getrecursionlimit() sys.setrecursionlimit(max(old_limit, 10000)) try: return self._process(data) finally: sys.setrecursionlimit(old_limit) def _process(self, data: Chunk) -> buf: def _result(): return codecs.encode( self.synthesize(ast), self.codec, errors='surrogateescape') self.log_info('parsing input data') view = memoryview(data) if format := guess_text_encoding(data): codec = format.codec view = view[format.bom:] else: codec = self.codec txt = codecs.decode(view, codec, errors='surrogateescape') ast = self.parse(txt) for k in range(self.args.timeout): self.log_info(F'starting round {k}') if self.transform(ast): continue return _result() else: raise AutoDeobfuscationTimeout(_result())Ancestors
Subclasses
Methods
def parse(self, data)-
Expand source code Browse git
@abc.abstractmethod def parse(self, data: str) -> Node: ... def transform(self, ast)-
Expand source code Browse git
@abc.abstractmethod def transform(self, ast: Node) -> int: ... def synthesize(self, ast)-
Expand source code Browse git
@abc.abstractmethod def synthesize(self, ast: Node) -> str: ...
Inherited members