Module refinery.units.obfuscation

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import abc
import re
import io

from functools import wraps
from typing import ByteString, Callable
from zlib import crc32

from refinery.units import Arg, Unit, RefineryPartialResult
from refinery.lib.decorators import unicoded


__all__ = [
    'Deobfuscator',
    'IterativeDeobfuscator',
    'outside',
    'unicoded',
]


class AutoDeobfuscationTimeout(RefineryPartialResult):
    def __init__(self, partial):
        super().__init__('The deobfuscation timeout was reached before the data stabilized.', partial=partial)


def outside(*exceptions):
    """
    A decorator which allows to apply the transformation only to areas where
    a set of given regular expressions does not match. Here, this is mostly
    used to apply deobfuscations only to code outside of strings.
    """

    exclusion = '|'.join(F'(?:{e})' for e in exceptions)

    def excluded(method):
        def wrapper(self, data):
            with io.StringIO() as out:
                cursor = 0
                for m in re.finditer(exclusion, data, re.DOTALL):
                    out.write(method(self, data[cursor:m.start()]))
                    out.write(m[0])
                    cursor = m.end()
                out.write(method(self, data[cursor:]))
                return out.getvalue()
        return wrapper

    return excluded


class Deobfuscator(Unit, abstract=True):

    def __init__(self): super().__init__()

    @unicoded
    def process(self, data: str) -> str:
        return self.deobfuscate(data)

    @abc.abstractmethod
    def deobfuscate(self, data: str) -> str:
        return data


class IterativeDeobfuscator(Deobfuscator, abstract=True):

    def __init__(self, timeout: Arg('-t', help='Maximum number of iterations; the default is 100.') = 100):
        if timeout < 1:
            raise ValueError('The timeout must be at least 1.')
        super().__init__()
        self.args.timeout = timeout

    def process(self, data: ByteString) -> ByteString:
        previous = crc32(data)
        for _ in range(self.args.timeout):
            try:
                data = super().process(data)
            except KeyboardInterrupt:
                raise RefineryPartialResult('Returning partially deobfuscated data', partial=data)
            checksum = crc32(data)
            if checksum == previous:
                break
            previous = checksum
        else:
            raise AutoDeobfuscationTimeout(data)
        return data


class StringLiterals:

    def __init__(self, pattern: str, data: str):
        self.pattern = str(pattern)
        self.update(data)

    def update(self, data):
        self.data = data
        self.ranges = [
            match.span() for match in re.finditer(self.pattern, data)
        ]

    def shift(self, by, start=0):
        for k in range(start, len(self.ranges)):
            a, b = self.ranges[k]
            self.ranges[k] = a + by, b + by

    def outside(self, function: Callable[[re.Match], str]) -> Callable[[re.Match], str]:
        @wraps(function)
        def wrapper(match: re.Match) -> str:
            if match.string != self.data:
                self.update(match.string)
            a, b = match.span()
            for x, y in self.ranges:
                if x > b: break
                if (a in range(x, y) or x in range(a, b)) and (x < a or y > b):
                    return match[0]
            result = function(match)
            if result is not None:
                return result
            return match[0]
        return wrapper

    def __contains__(self, index):
        return any(index in range(*L) for L in self.ranges)

    def get_container(self, offset):
        for k, L in enumerate(self.ranges):
            if offset in range(*L):
                return k
        return None

Sub-modules

refinery.units.obfuscation.js

Deobfuscation of JavaScript documents.

refinery.units.obfuscation.ps1
refinery.units.obfuscation.vba

A package containing deobfuscators for Visual Basic for Applications (VBA).

Functions

def outside(*exceptions)

A decorator which allows to apply the transformation only to areas where a set of given regular expressions does not match. Here, this is mostly used to apply deobfuscations only to code outside of strings.

Expand source code Browse git
def outside(*exceptions):
    """
    A decorator which allows to apply the transformation only to areas where
    a set of given regular expressions does not match. Here, this is mostly
    used to apply deobfuscations only to code outside of strings.
    """

    exclusion = '|'.join(F'(?:{e})' for e in exceptions)

    def excluded(method):
        def wrapper(self, data):
            with io.StringIO() as out:
                cursor = 0
                for m in re.finditer(exclusion, data, re.DOTALL):
                    out.write(method(self, data[cursor:m.start()]))
                    out.write(m[0])
                    cursor = m.end()
                out.write(method(self, data[cursor:]))
                return out.getvalue()
        return wrapper

    return excluded
def unicoded(method)

Can be used to decorate a Unit.process() routine that takes a string argument and also returns one. The resulting routine takes a binary buffer as input and attempts to decode it as unicode text. If certain characters cannot be decoded, then these ranges are skipped and the decorated routine is called once for each string patch that was successfully decoded.

Expand source code Browse git
def unicoded(method: Callable[[Unit, str], Optional[str]]) -> Callable[[Unit, bytes], bytes]:
    """
    Can be used to decorate a `refinery.units.Unit.process` routine that takes a
    string argument and also returns one. The resulting routine takes a binary buffer
    as input and attempts to decode it as unicode text. If certain characters cannot
    be decoded, then these ranges are skipped and the decorated routine is called
    once for each string patch that was successfully decoded.
    """
    @wraps_without_annotations(method)
    def method_wrapper(self: Unit, data: bytes) -> bytes:
        input_codec = self.codec if any(data[::2]) else 'UTF-16LE'
        partial = re.split(R'([\uDC80-\uDCFF]+)',  # surrogate escape range
            codecs.decode(data, input_codec, errors='surrogateescape'))
        partial[::2] = (method(self, p) or '' if p else '' for p in itertools.islice(iter(partial), 0, None, 2))
        nones = sum(1 for p in partial if p is None)
        if nones == len(partial):
            return None
        if nones >= 1:
            for k, p in enumerate(partial):
                if p is None:
                    partial[k] = ''
        return codecs.encode(''.join(partial), self.codec, errors='surrogateescape')
    return method_wrapper

Classes

class Deobfuscator
Expand source code Browse git
class Deobfuscator(Unit, abstract=True):

    def __init__(self): super().__init__()

    @unicoded
    def process(self, data: str) -> str:
        return self.deobfuscate(data)

    @abc.abstractmethod
    def deobfuscate(self, data: str) -> str:
        return data

Ancestors

Subclasses

Class variables

var required_dependencies
var optional_dependencies

Methods

def deobfuscate(self, data)
Expand source code Browse git
@abc.abstractmethod
def deobfuscate(self, data: str) -> str:
    return data

Inherited members

class IterativeDeobfuscator (timeout=100)
Expand source code Browse git
class IterativeDeobfuscator(Deobfuscator, abstract=True):

    def __init__(self, timeout: Arg('-t', help='Maximum number of iterations; the default is 100.') = 100):
        if timeout < 1:
            raise ValueError('The timeout must be at least 1.')
        super().__init__()
        self.args.timeout = timeout

    def process(self, data: ByteString) -> ByteString:
        previous = crc32(data)
        for _ in range(self.args.timeout):
            try:
                data = super().process(data)
            except KeyboardInterrupt:
                raise RefineryPartialResult('Returning partially deobfuscated data', partial=data)
            checksum = crc32(data)
            if checksum == previous:
                break
            previous = checksum
        else:
            raise AutoDeobfuscationTimeout(data)
        return data

Ancestors

Subclasses

Class variables

var required_dependencies
var optional_dependencies

Inherited members