Module refinery.units.obfuscation

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import abc
import re
import io

from typing import ByteString
from zlib import crc32

from .. import arg, Unit, RefineryPartialResult
from ...lib.decorators import unicoded


__all__ = [
    'Deobfuscator',
    'IterativeDeobfuscator',
    'outside',
    'unicoded',
]


class AutoDeobfuscationTimeout(RefineryPartialResult):
    def __init__(self, partial):
        super().__init__('The deobfuscation timeout was reached before the data stabilized.', partial=partial)


def outside(*exceptions):
    """
    A decorator which allows to apply the transformation only to areas where
    a set of given regular expressions does not match. Here, this is mostly
    used to apply deobfuscations only to code outside of strings.
    """

    exclusion = '|'.join(F'(?:{e})' for e in exceptions)

    def excluded(method):
        def wrapper(self, data):
            with io.StringIO() as out:
                cursor = 0
                for m in re.finditer(exclusion, data, re.DOTALL):
                    out.write(method(self, data[cursor:m.start()]))
                    out.write(m[0])
                    cursor = m.end()
                out.write(method(self, data[cursor:]))
                return out.getvalue()
        return wrapper

    return excluded


class Deobfuscator(Unit, abstract=True):

    def __init__(self): super().__init__()

    @unicoded
    def process(self, data: str) -> str:
        return self.deobfuscate(data)

    @abc.abstractmethod
    def deobfuscate(self, data: str) -> str:
        return data


class IterativeDeobfuscator(Deobfuscator, abstract=True):

    def __init__(self, timeout: arg('-t', help='Maximum number of iterations; the default is 100.') = 100):
        if timeout < 1:
            raise ValueError('The timeout must be at least 1.')
        super().__init__()
        self.args.timeout = timeout

    def process(self, data: ByteString) -> ByteString:
        previous = crc32(data)
        for _ in range(self.args.timeout):
            try:
                data = super().process(data)
            except KeyboardInterrupt:
                raise RefineryPartialResult('Returning partially deobfuscated data', partial=data)
            checksum = crc32(data)
            if checksum == previous:
                break
            previous = checksum
        else:
            raise AutoDeobfuscationTimeout(data)
        return data

Sub-modules

refinery.units.obfuscation.js

Deobfuscation of JavaScript documents.

refinery.units.obfuscation.ps1
refinery.units.obfuscation.vba

A package containing deobfuscators for Visual Basic for Applications (VBA).

Functions

def outside(*exceptions)

A decorator which allows to apply the transformation only to areas where a set of given regular expressions does not match. Here, this is mostly used to apply deobfuscations only to code outside of strings.

Expand source code Browse git
def outside(*exceptions):
    """
    A decorator which allows to apply the transformation only to areas where
    a set of given regular expressions does not match. Here, this is mostly
    used to apply deobfuscations only to code outside of strings.
    """

    exclusion = '|'.join(F'(?:{e})' for e in exceptions)

    def excluded(method):
        def wrapper(self, data):
            with io.StringIO() as out:
                cursor = 0
                for m in re.finditer(exclusion, data, re.DOTALL):
                    out.write(method(self, data[cursor:m.start()]))
                    out.write(m[0])
                    cursor = m.end()
                out.write(method(self, data[cursor:]))
                return out.getvalue()
        return wrapper

    return excluded
def unicoded(method)

Can be used to decorate a Unit.process() routine that takes a string argument and also returns one. The resulting routine takes a binary buffer as input and attempts to decode it as unicode text. If certain characters cannot be decoded, then these ranges are skipped and the decorated routine is called once for each string patch that was successfully decoded.

Expand source code Browse git
def unicoded(method: Callable[[Unit, str], str]) -> Callable[[Unit, bytes], bytes]:
    """
    Can be used to decorate a `refinery.units.Unit.process` routine that takes a
    string argument and also returns one. The resulting routine takes a binary buffer
    as input and attempts to decode it as unicode text. If certain characters cannot
    be decoded, then these ranges are skipped and the decorated routine is called
    once for each string patch that was successfully decoded.
    """
    @wraps_without_annotations(method)
    def method_wrapper(self, data: bytes) -> bytes:
        input_codec = self.codec if any(data[::2]) else 'UTF-16LE'
        partial = re.split(R'([\uDC80-\uDCFF]+)',  # surrogate escape range
            codecs.decode(data, input_codec, errors='surrogateescape'))
        partial[::2] = [method(self, p) if p else '' for p in partial[::2]]
        return codecs.encode(''.join(partial),
            self.codec, errors='surrogateescape')
    return method_wrapper

Classes

class Deobfuscator
Expand source code Browse git
class Deobfuscator(Unit, abstract=True):

    def __init__(self): super().__init__()

    @unicoded
    def process(self, data: str) -> str:
        return self.deobfuscate(data)

    @abc.abstractmethod
    def deobfuscate(self, data: str) -> str:
        return data

Ancestors

Subclasses

Methods

def deobfuscate(self, data)
Expand source code Browse git
@abc.abstractmethod
def deobfuscate(self, data: str) -> str:
    return data

Inherited members

class IterativeDeobfuscator (timeout=100)
Expand source code Browse git
class IterativeDeobfuscator(Deobfuscator, abstract=True):

    def __init__(self, timeout: arg('-t', help='Maximum number of iterations; the default is 100.') = 100):
        if timeout < 1:
            raise ValueError('The timeout must be at least 1.')
        super().__init__()
        self.args.timeout = timeout

    def process(self, data: ByteString) -> ByteString:
        previous = crc32(data)
        for _ in range(self.args.timeout):
            try:
                data = super().process(data)
            except KeyboardInterrupt:
                raise RefineryPartialResult('Returning partially deobfuscated data', partial=data)
            checksum = crc32(data)
            if checksum == previous:
                break
            previous = checksum
        else:
            raise AutoDeobfuscationTimeout(data)
        return data

Ancestors

Subclasses

Inherited members