Module refinery.lib.scripts.ps1.deobfuscation.securestring

PowerShell SecureString decryption transformer.

Expand source code Browse git
"""
PowerShell SecureString decryption transformer.
"""
from __future__ import annotations

from refinery.lib.scripts import Transformer
from refinery.lib.scripts.ps1.deobfuscation._helpers import (
    _get_command_name,
    _make_string_literal,
)
from refinery.lib.scripts.ps1.model import (
    Ps1ArrayLiteral,
    Ps1CommandArgument,
    Ps1CommandArgumentKind,
    Ps1CommandInvocation,
    Ps1IntegerLiteral,
    Ps1ParenExpression,
    Ps1Pipeline,
    Ps1PipelineElement,
    Ps1RangeExpression,
    Ps1StringLiteral,
)


def _collect_key_bytes(node) -> bytes | None:
    if isinstance(node, Ps1ParenExpression) and node.expression is not None:
        return _collect_key_bytes(node.expression)
    if isinstance(node, Ps1RangeExpression):
        if isinstance(node.start, Ps1IntegerLiteral) and isinstance(node.end, Ps1IntegerLiteral):
            a = node.start.value
            b = node.end.value
            r = range(min(a, b), max(a, b) + 1)
            if a > b:
                r = reversed(r)
            try:
                return bytes(bytearray(r))
            except (ValueError, OverflowError):
                return None
    if isinstance(node, Ps1ArrayLiteral):
        values = []
        for elem in node.elements:
            if not isinstance(elem, Ps1IntegerLiteral):
                return None
            values.append(elem.value)
        try:
            return bytes(bytearray(values))
        except (ValueError, OverflowError):
            return None
    return None


def _find_key_argument(cmd: Ps1CommandInvocation) -> bytes | None:
    for arg in cmd.arguments:
        if not isinstance(arg, Ps1CommandArgument):
            continue
        if arg.kind == Ps1CommandArgumentKind.NAMED:
            if arg.name.lower().startswith('ke') and arg.value is not None:
                return _collect_key_bytes(arg.value)
    for i, arg in enumerate(cmd.arguments):
        if not isinstance(arg, Ps1CommandArgument):
            continue
        if arg.kind != Ps1CommandArgumentKind.SWITCH:
            continue
        if not arg.name.lower().startswith('ke'):
            continue
        if i + 1 < len(cmd.arguments):
            next_arg = cmd.arguments[i + 1]
            if isinstance(next_arg, Ps1CommandArgument):
                if next_arg.kind == Ps1CommandArgumentKind.POSITIONAL and next_arg.value is not None:
                    return _collect_key_bytes(next_arg.value)
    return None


class Ps1SecureStringDecryptor(Transformer):

    def visit_Ps1Pipeline(self, node: Ps1Pipeline):
        self.generic_visit(node)
        if len(node.elements) < 2:
            return None
        k = 0
        while k < len(node.elements) - 1:
            lhs = node.elements[k]
            rhs = node.elements[k + 1]
            if not isinstance(lhs.expression, Ps1StringLiteral):
                k += 1
                continue
            if not isinstance(rhs.expression, Ps1CommandInvocation):
                k += 1
                continue
            cmd = rhs.expression
            cmd_name = _get_command_name(cmd)
            if cmd_name is None or cmd_name.lower() != 'convertto-securestring':
                k += 1
                continue
            ciphertext = lhs.expression.value
            key = _find_key_argument(cmd)
            if key is None:
                k += 1
                continue
            try:
                from refinery.units.crypto.cipher.secstr import secstr
                unit = secstr(key=key)
                decrypted = unit(ciphertext.encode('utf-8'))
                plaintext = decrypted.decode('utf-8')
            except Exception:
                k += 1
                continue
            replacement = _make_string_literal(plaintext)
            new_element = Ps1PipelineElement(expression=replacement)
            new_element.parent = node
            replacement.parent = new_element
            node.elements = node.elements[:k] + [new_element] + node.elements[k + 2:]
            self.mark_changed()
        return None

Classes

class Ps1SecureStringDecryptor

In-place tree rewriter. Each visit method may return a replacement node or None to keep the original. Tracks whether any transformation was applied via the changed flag.

Expand source code Browse git
class Ps1SecureStringDecryptor(Transformer):

    def visit_Ps1Pipeline(self, node: Ps1Pipeline):
        self.generic_visit(node)
        if len(node.elements) < 2:
            return None
        k = 0
        while k < len(node.elements) - 1:
            lhs = node.elements[k]
            rhs = node.elements[k + 1]
            if not isinstance(lhs.expression, Ps1StringLiteral):
                k += 1
                continue
            if not isinstance(rhs.expression, Ps1CommandInvocation):
                k += 1
                continue
            cmd = rhs.expression
            cmd_name = _get_command_name(cmd)
            if cmd_name is None or cmd_name.lower() != 'convertto-securestring':
                k += 1
                continue
            ciphertext = lhs.expression.value
            key = _find_key_argument(cmd)
            if key is None:
                k += 1
                continue
            try:
                from refinery.units.crypto.cipher.secstr import secstr
                unit = secstr(key=key)
                decrypted = unit(ciphertext.encode('utf-8'))
                plaintext = decrypted.decode('utf-8')
            except Exception:
                k += 1
                continue
            replacement = _make_string_literal(plaintext)
            new_element = Ps1PipelineElement(expression=replacement)
            new_element.parent = node
            replacement.parent = new_element
            node.elements = node.elements[:k] + [new_element] + node.elements[k + 2:]
            self.mark_changed()
        return None

Ancestors

Methods

def visit_Ps1Pipeline(self, node)
Expand source code Browse git
def visit_Ps1Pipeline(self, node: Ps1Pipeline):
    self.generic_visit(node)
    if len(node.elements) < 2:
        return None
    k = 0
    while k < len(node.elements) - 1:
        lhs = node.elements[k]
        rhs = node.elements[k + 1]
        if not isinstance(lhs.expression, Ps1StringLiteral):
            k += 1
            continue
        if not isinstance(rhs.expression, Ps1CommandInvocation):
            k += 1
            continue
        cmd = rhs.expression
        cmd_name = _get_command_name(cmd)
        if cmd_name is None or cmd_name.lower() != 'convertto-securestring':
            k += 1
            continue
        ciphertext = lhs.expression.value
        key = _find_key_argument(cmd)
        if key is None:
            k += 1
            continue
        try:
            from refinery.units.crypto.cipher.secstr import secstr
            unit = secstr(key=key)
            decrypted = unit(ciphertext.encode('utf-8'))
            plaintext = decrypted.decode('utf-8')
        except Exception:
            k += 1
            continue
        replacement = _make_string_literal(plaintext)
        new_element = Ps1PipelineElement(expression=replacement)
        new_element.parent = node
        replacement.parent = new_element
        node.elements = node.elements[:k] + [new_element] + node.elements[k + 2:]
        self.mark_changed()
    return None