Module refinery.units.scripting

Expand source code Browse git
from __future__ import annotations

import abc
import codecs
import sys

from refinery.lib.id import guess_text_encoding
from refinery.lib.scripts import Node
from refinery.lib.types import Param, buf
from refinery.units import Arg, Chunk, RefineryPartialResult, Unit


class AutoDeobfuscationTimeout(RefineryPartialResult):
    def __init__(self, partial):
        super().__init__(
            'The deobfuscation timeout was reached before the data stabilized.',
            partial=partial,
        )


class IterativeDeobfuscator(Unit, abstract=True):

    def __init__(
        self,
        timeout: Param[int, Arg.Number(
            '-t', help='Maximum number of iterations; the default is {default}.')] = 500,
    ):
        super().__init__(timeout=timeout)

    @abc.abstractmethod
    def parse(self, data: str) -> Node:
        ...

    @abc.abstractmethod
    def transform(self, ast: Node) -> int:
        ...

    @abc.abstractmethod
    def synthesize(self, ast: Node) -> str:
        ...

    def process(self, data: Chunk) -> buf:
        old_limit = sys.getrecursionlimit()
        sys.setrecursionlimit(max(old_limit, 10000))
        try:
            return self._process(data)
        finally:
            sys.setrecursionlimit(old_limit)

    def _process(self, data: Chunk) -> buf:
        def _result():
            return codecs.encode(
                self.synthesize(ast), self.codec, errors='surrogateescape')

        self.log_info('parsing input data')
        view = memoryview(data)

        if format := guess_text_encoding(data):
            codec = format.codec
            view = view[format.bom:]
        else:
            codec = self.codec

        txt = codecs.decode(view, codec, errors='surrogateescape')
        ast = self.parse(txt)

        for k in range(self.args.timeout):
            self.log_info(F'starting round {k}')
            if self.transform(ast):
                continue
            return _result()
        else:
            raise AutoDeobfuscationTimeout(_result())

Sub-modules

refinery.units.scripting.bat
refinery.units.scripting.cmdarg
refinery.units.scripting.deobfuscate
refinery.units.scripting.js
refinery.units.scripting.ps1
refinery.units.scripting.vba

Classes

class AutoDeobfuscationTimeout (partial)

This exception indicates that a partial result is available.

Expand source code Browse git
class AutoDeobfuscationTimeout(RefineryPartialResult):
    def __init__(self, partial):
        super().__init__(
            'The deobfuscation timeout was reached before the data stabilized.',
            partial=partial,
        )

Ancestors

class IterativeDeobfuscator (timeout=500)
Expand source code Browse git
class IterativeDeobfuscator(Unit, abstract=True):

    def __init__(
        self,
        timeout: Param[int, Arg.Number(
            '-t', help='Maximum number of iterations; the default is {default}.')] = 500,
    ):
        super().__init__(timeout=timeout)

    @abc.abstractmethod
    def parse(self, data: str) -> Node:
        ...

    @abc.abstractmethod
    def transform(self, ast: Node) -> int:
        ...

    @abc.abstractmethod
    def synthesize(self, ast: Node) -> str:
        ...

    def process(self, data: Chunk) -> buf:
        old_limit = sys.getrecursionlimit()
        sys.setrecursionlimit(max(old_limit, 10000))
        try:
            return self._process(data)
        finally:
            sys.setrecursionlimit(old_limit)

    def _process(self, data: Chunk) -> buf:
        def _result():
            return codecs.encode(
                self.synthesize(ast), self.codec, errors='surrogateescape')

        self.log_info('parsing input data')
        view = memoryview(data)

        if format := guess_text_encoding(data):
            codec = format.codec
            view = view[format.bom:]
        else:
            codec = self.codec

        txt = codecs.decode(view, codec, errors='surrogateescape')
        ast = self.parse(txt)

        for k in range(self.args.timeout):
            self.log_info(F'starting round {k}')
            if self.transform(ast):
                continue
            return _result()
        else:
            raise AutoDeobfuscationTimeout(_result())

Ancestors

Subclasses

Methods

def parse(self, data)
Expand source code Browse git
@abc.abstractmethod
def parse(self, data: str) -> Node:
    ...
def transform(self, ast)
Expand source code Browse git
@abc.abstractmethod
def transform(self, ast: Node) -> int:
    ...
def synthesize(self, ast)
Expand source code Browse git
@abc.abstractmethod
def synthesize(self, ast: Node) -> str:
    ...

Inherited members