Module refinery.units.scripting
Expand source code Browse git
from __future__ import annotations
import abc
import codecs
from refinery.lib.scripts import Node
from refinery.lib.types import Param, buf
from refinery.units import Arg, Chunk, RefineryPartialResult, Unit
class AutoDeobfuscationTimeout(RefineryPartialResult):
def __init__(self, partial):
super().__init__(
'The deobfuscation timeout was reached before the data stabilized.',
partial=partial,
)
class IterativeDeobfuscator(Unit, abstract=True):
def __init__(
self,
timeout: Param[int, Arg.Number(
'-t', help='Maximum number of iterations; the default is {default}.')] = 100,
):
super().__init__(timeout=timeout)
@abc.abstractmethod
def parse(self, data: str) -> Node:
...
@abc.abstractmethod
def transform(self, ast: Node) -> bool:
...
@abc.abstractmethod
def synthesize(self, ast: Node) -> str:
...
def process(self, data: Chunk) -> buf:
text = codecs.decode(data, self.codec, errors='surrogateescape')
ast = self.parse(text)
for _ in range(self.args.timeout):
try:
if not self.transform(ast):
break
except KeyboardInterrupt:
result = self.synthesize(ast)
raise RefineryPartialResult(
'Returning partially deobfuscated data',
partial=codecs.encode(result, self.codec, errors='surrogateescape'))
else:
result = self.synthesize(ast)
raise AutoDeobfuscationTimeout(
codecs.encode(result, self.codec, errors='surrogateescape'))
return codecs.encode(self.synthesize(ast), self.codec, errors='surrogateescape')
Sub-modules
refinery.units.scripting.jsrefinery.units.scripting.ps1refinery.units.scripting.vba
Classes
class AutoDeobfuscationTimeout (partial)-
This exception indicates that a partial result is available.
Expand source code Browse git
class AutoDeobfuscationTimeout(RefineryPartialResult): def __init__(self, partial): super().__init__( 'The deobfuscation timeout was reached before the data stabilized.', partial=partial, )Ancestors
- RefineryPartialResult
- builtins.ValueError
- builtins.Exception
- builtins.BaseException
class IterativeDeobfuscator (timeout=100)-
Expand source code Browse git
class IterativeDeobfuscator(Unit, abstract=True): def __init__( self, timeout: Param[int, Arg.Number( '-t', help='Maximum number of iterations; the default is {default}.')] = 100, ): super().__init__(timeout=timeout) @abc.abstractmethod def parse(self, data: str) -> Node: ... @abc.abstractmethod def transform(self, ast: Node) -> bool: ... @abc.abstractmethod def synthesize(self, ast: Node) -> str: ... def process(self, data: Chunk) -> buf: text = codecs.decode(data, self.codec, errors='surrogateescape') ast = self.parse(text) for _ in range(self.args.timeout): try: if not self.transform(ast): break except KeyboardInterrupt: result = self.synthesize(ast) raise RefineryPartialResult( 'Returning partially deobfuscated data', partial=codecs.encode(result, self.codec, errors='surrogateescape')) else: result = self.synthesize(ast) raise AutoDeobfuscationTimeout( codecs.encode(result, self.codec, errors='surrogateescape')) return codecs.encode(self.synthesize(ast), self.codec, errors='surrogateescape')Ancestors
Subclasses
Methods
def parse(self, data)-
Expand source code Browse git
@abc.abstractmethod def parse(self, data: str) -> Node: ... def transform(self, ast)-
Expand source code Browse git
@abc.abstractmethod def transform(self, ast: Node) -> bool: ... def synthesize(self, ast)-
Expand source code Browse git
@abc.abstractmethod def synthesize(self, ast: Node) -> str: ...
Inherited members