Module refinery.lib.scripts.js.deobfuscation.cff
Recover sequential code from control-flow-flattened dispatchers. The obfuscator replaces a sequence of statements with a dispatcher loop:
var _order = ['2', '0', '4', '1', '3'];
var _idx = 0;
while (true) {
switch (_order[_idx++]) {
case '0': /* block 0 */; continue;
case '1': /* block 1 */; continue;
...
}
break;
}
Recovery reads the order sequence and emits the case bodies in that order, replacing the entire dispatcher span with the re-sequenced statements.
Expand source code Browse git
"""
Recover sequential code from control-flow-flattened dispatchers. The obfuscator replaces a sequence
of statements with a dispatcher loop:
var _order = ['2', '0', '4', '1', '3'];
var _idx = 0;
while (true) {
switch (_order[_idx++]) {
case '0': /* block 0 */; continue;
case '1': /* block 1 */; continue;
...
}
break;
}
Recovery reads the order sequence and emits the case bodies in that order, replacing the entire
dispatcher span with the re-sequenced statements.
"""
from __future__ import annotations
from refinery.lib.scripts import Node, Statement
from refinery.lib.scripts.js.deobfuscation.helpers import BodyProcessingTransformer, string_value
from refinery.lib.scripts.js.model import (
JsArrayExpression,
JsBlockStatement,
JsBooleanLiteral,
JsBreakStatement,
JsCallExpression,
JsContinueStatement,
JsIdentifier,
JsMemberExpression,
JsNumericLiteral,
JsStringLiteral,
JsSwitchCase,
JsSwitchStatement,
JsUnaryExpression,
JsUpdateExpression,
JsVariableDeclaration,
JsVariableDeclarator,
JsWhileStatement,
)
def _is_while_true(node: JsWhileStatement) -> bool:
"""
Check whether the while-loop condition is `true`, `!![]`, or `!0` — the forms the obfuscator
uses for infinite loops.
"""
test = node.test
if isinstance(test, JsBooleanLiteral) and test.value is True:
return True
if not isinstance(test, JsUnaryExpression) or test.operator != '!':
return False
inner = test.operand
if isinstance(inner, JsNumericLiteral) and inner.value == 0:
return True
if isinstance(inner, JsUnaryExpression) and inner.operator == '!':
return True
return False
def _strip_trailing_flow(stmts: list[Statement]) -> list[Statement]:
"""
Return a copy of the statement list with a trailing `continue` or `break` removed.
"""
if not stmts:
return []
last = stmts[-1]
if isinstance(last, (JsContinueStatement, JsBreakStatement)):
return stmts[:-1]
return stmts
def _match_dispatcher(
while_node: JsWhileStatement,
) -> tuple[str, str, dict[str, list[Statement]]] | None:
"""
Test whether *while_node* is a CFF dispatcher. Returns a tuple
(order_var, counter_var, case_map)
or `None`.
Expected structure::
while (true) {
switch (ORDER[COUNTER++]) {
case 'N': ...; continue;
...
}
break;
}
"""
if not _is_while_true(while_node):
return None
body = while_node.body
if not isinstance(body, JsBlockStatement):
return None
stmts = body.body
if len(stmts) != 2:
return None
switch = stmts[0]
tail = stmts[1]
if not isinstance(switch, JsSwitchStatement):
return None
if not isinstance(tail, JsBreakStatement):
return None
disc = switch.discriminant
if not isinstance(disc, JsMemberExpression) or not disc.computed:
return None
if not isinstance(disc.object, JsIdentifier):
return None
order_var = disc.object.name
prop = disc.property
if not isinstance(prop, JsUpdateExpression):
return None
if prop.operator != '++' or prop.prefix:
return None
if not isinstance(prop.argument, JsIdentifier):
return None
counter_var = prop.argument.name
case_map: dict[str, list[Statement]] = {}
for case in switch.cases:
if not isinstance(case, JsSwitchCase) or case.test is None:
return None
label = string_value(case.test)
if label is None:
if isinstance(case.test, JsNumericLiteral):
label = str(int(case.test.value))
else:
return None
case_map[label] = _strip_trailing_flow(case.consequent)
return order_var, counter_var, case_map
def _extract_order_sequence(node: Node | None) -> list[str] | None:
"""
Extract the dispatch sequence from the order variable's initializer. Accepts an array literal
of constants (the canonical form after simplification) or a `'...'['split']('|')` call (the
raw obfuscator output, as fallback).
"""
if isinstance(node, JsArrayExpression):
result: list[str] = []
for el in node.elements:
s = string_value(el)
if s is not None:
result.append(s)
elif isinstance(el, JsNumericLiteral):
result.append(str(int(el.value)))
else:
return None
return result
if not isinstance(node, JsCallExpression) or len(node.arguments) != 1:
return None
sep = string_value(node.arguments[0])
if sep != '|':
return None
callee = node.callee
if not isinstance(callee, JsMemberExpression):
return None
method = callee.property
if isinstance(method, JsStringLiteral):
if method.value != 'split':
return None
elif isinstance(method, JsIdentifier):
if method.name != 'split':
return None
else:
return None
obj = string_value(callee.object)
if obj is None:
return None
return obj.split('|')
def _find_order_sequence(
body: list[Statement],
end_idx: int,
order_var: str,
counter_var: str,
) -> tuple[list[str], int, int] | None:
"""
Scan backwards from *end_idx* in *body* to find the initialization of the order array and
the counter variable. Returns a tuple
(order_sequence, first_init_idx, last_init_idx)
or `None`.
"""
order_init_idx: int | None = None
counter_init_idx: int | None = None
order_sequence: list[str] | None = None
for i in range(end_idx - 1, -1, -1):
stmt = body[i]
if not isinstance(stmt, JsVariableDeclaration):
continue
if len(stmt.declarations) != 1:
continue
decl = stmt.declarations[0]
if not isinstance(decl, JsVariableDeclarator) or not isinstance(decl.id, JsIdentifier):
continue
name = decl.id.name
if name == counter_var and counter_init_idx is None:
if isinstance(decl.init, JsNumericLiteral) and decl.init.value == 0:
counter_init_idx = i
elif name == order_var and order_init_idx is None:
seq = _extract_order_sequence(decl.init)
if seq is not None:
order_sequence = seq
order_init_idx = i
if order_init_idx is not None and counter_init_idx is not None:
break
if order_sequence is None or order_init_idx is None or counter_init_idx is None:
return None
first = min(order_init_idx, counter_init_idx)
last = max(order_init_idx, counter_init_idx)
return order_sequence, first, last
class JsControlFlowUnflattening(BodyProcessingTransformer):
"""
Detect and recover CFF dispatchers in function bodies and script-level code.
"""
def _process_body(self, parent: Node, body: list[Statement]) -> None:
i = 0
while i < len(body):
stmt = body[i]
if not isinstance(stmt, JsWhileStatement):
i += 1
continue
match = _match_dispatcher(stmt)
if match is None:
i += 1
continue
order_var, counter_var, case_map = match
order_info = _find_order_sequence(body, i, order_var, counter_var)
if order_info is None:
i += 1
continue
order_sequence, first_init, _ = order_info
if not all(label in case_map for label in order_sequence):
i += 1
continue
recovered: list[Statement] = []
for label in order_sequence:
recovered.extend(case_map[label])
remove_start = first_init
remove_end = i
replacement = body[:remove_start] + recovered + body[remove_end + 1:]
self._replace_body(parent, body, replacement)
i = remove_start + len(recovered)
Classes
class JsControlFlowUnflattening-
Detect and recover CFF dispatchers in function bodies and script-level code.
Expand source code Browse git
class JsControlFlowUnflattening(BodyProcessingTransformer): """ Detect and recover CFF dispatchers in function bodies and script-level code. """ def _process_body(self, parent: Node, body: list[Statement]) -> None: i = 0 while i < len(body): stmt = body[i] if not isinstance(stmt, JsWhileStatement): i += 1 continue match = _match_dispatcher(stmt) if match is None: i += 1 continue order_var, counter_var, case_map = match order_info = _find_order_sequence(body, i, order_var, counter_var) if order_info is None: i += 1 continue order_sequence, first_init, _ = order_info if not all(label in case_map for label in order_sequence): i += 1 continue recovered: list[Statement] = [] for label in order_sequence: recovered.extend(case_map[label]) remove_start = first_init remove_end = i replacement = body[:remove_start] + recovered + body[remove_end + 1:] self._replace_body(parent, body, replacement) i = remove_start + len(recovered)Ancestors