Module refinery.lib.scripts.pipeline
Dependency-tree-based deobfuscation scheduler.
Transformers are organized into groups of co-dependent transforms that iterate internally until stable. Groups form a DAG: a group only runs once all of its declared dependencies are stable. When any group makes changes, all other groups are marked unstable.
Expand source code Browse git
"""
Dependency-tree-based deobfuscation scheduler.
Transformers are organized into groups of co-dependent transforms that iterate internally until
stable. Groups form a DAG: a group only runs once all of its declared dependencies are stable. When
any group makes changes, all other groups are marked unstable.
"""
from __future__ import annotations
from refinery.lib.scripts import Node, Transformer
class DeobfuscationTimeout(Exception):
"""
Raised when the pipeline exceeds the maximum number of transformation steps.
"""
class TransformerGroup:
"""
A named set of co-dependent transformers that iterate until stable.
"""
def __init__(self, name: str, *transformers: type[Transformer]):
self.name = name
self.transformers = transformers
def run(self, ast: Node, steps: int = 0, max_steps: int = 0) -> tuple[bool, int]:
"""
Run all transformers in a loop until none report changes. Returns (changed, steps) where
changed indicates whether any transformation was applied and steps is the updated step
counter.
"""
changed = False
active = set(range(len(self.transformers)))
while True:
round_changed = False
for i, cls in enumerate(self.transformers):
if i not in active:
continue
t = cls()
t.visit(ast)
if t.changed:
steps += 1
round_changed = True
active = set(range(len(self.transformers)))
if max_steps and steps >= max_steps:
raise DeobfuscationTimeout
else:
active.discard(i)
if not round_changed:
break
changed = True
return changed, steps
class DeobfuscationPipeline:
"""
Scheduler that runs transformer groups respecting a dependency DAG.
Groups are run in declaration order, skipping any whose dependencies are not yet stable. When a
group makes changes, all other groups are invalidated unless a selective invalidation set is
configured for that group. The pipeline terminates when every group is stable.
"""
def __init__(
self,
groups: list[TransformerGroup],
dependencies: dict[str, set[str]] | None = None,
invalidates: dict[str, set[str]] | None = None,
):
self._groups = {g.name: g for g in groups}
self._order = [g.name for g in groups]
self._deps = dependencies or {}
self._invalidates = invalidates or {}
all_names = set(self._order)
for name, deps in self._deps.items():
if name not in all_names:
raise ValueError(F'unknown group in dependencies: {name!r}')
unknown = deps - all_names
if unknown:
raise ValueError(
F'group {name!r} depends on unknown groups: {unknown}')
for name, targets in self._invalidates.items():
if name not in all_names:
raise ValueError(F'unknown group in invalidates: {name!r}')
unknown = targets - all_names
if unknown:
raise ValueError(
F'group {name!r} invalidates unknown groups: {unknown}')
def run(self, ast: Node, max_steps: int = 0) -> int:
"""
Execute the pipeline. Returns the number of individual transformer invocations that
resulted in a change. A return value of 0 means the entire pipeline was already stable.
"""
stable: set[str] = set()
steps = 0
while True:
progress = False
for name in self._order:
if name in stable:
continue
deps = self._deps.get(name, set())
if not deps <= stable:
continue
group = self._groups[name]
changed, steps = group.run(ast, steps, max_steps)
stable.add(name)
if changed:
targets = self._invalidates.get(name)
if targets is None:
stable = {name}
else:
stable -= targets
progress = True
break
progress = True
if not progress:
break
return steps
Classes
class DeobfuscationTimeout (*args, **kwargs)-
Raised when the pipeline exceeds the maximum number of transformation steps.
Expand source code Browse git
class DeobfuscationTimeout(Exception): """ Raised when the pipeline exceeds the maximum number of transformation steps. """Ancestors
- builtins.Exception
- builtins.BaseException
class TransformerGroup (name, *transformers)-
A named set of co-dependent transformers that iterate until stable.
Expand source code Browse git
class TransformerGroup: """ A named set of co-dependent transformers that iterate until stable. """ def __init__(self, name: str, *transformers: type[Transformer]): self.name = name self.transformers = transformers def run(self, ast: Node, steps: int = 0, max_steps: int = 0) -> tuple[bool, int]: """ Run all transformers in a loop until none report changes. Returns (changed, steps) where changed indicates whether any transformation was applied and steps is the updated step counter. """ changed = False active = set(range(len(self.transformers))) while True: round_changed = False for i, cls in enumerate(self.transformers): if i not in active: continue t = cls() t.visit(ast) if t.changed: steps += 1 round_changed = True active = set(range(len(self.transformers))) if max_steps and steps >= max_steps: raise DeobfuscationTimeout else: active.discard(i) if not round_changed: break changed = True return changed, stepsMethods
def run(self, ast, steps=0, max_steps=0)-
Run all transformers in a loop until none report changes. Returns (changed, steps) where changed indicates whether any transformation was applied and steps is the updated step counter.
Expand source code Browse git
def run(self, ast: Node, steps: int = 0, max_steps: int = 0) -> tuple[bool, int]: """ Run all transformers in a loop until none report changes. Returns (changed, steps) where changed indicates whether any transformation was applied and steps is the updated step counter. """ changed = False active = set(range(len(self.transformers))) while True: round_changed = False for i, cls in enumerate(self.transformers): if i not in active: continue t = cls() t.visit(ast) if t.changed: steps += 1 round_changed = True active = set(range(len(self.transformers))) if max_steps and steps >= max_steps: raise DeobfuscationTimeout else: active.discard(i) if not round_changed: break changed = True return changed, steps
class DeobfuscationPipeline (groups, dependencies=None, invalidates=None)-
Scheduler that runs transformer groups respecting a dependency DAG.
Groups are run in declaration order, skipping any whose dependencies are not yet stable. When a group makes changes, all other groups are invalidated unless a selective invalidation set is configured for that group. The pipeline terminates when every group is stable.
Expand source code Browse git
class DeobfuscationPipeline: """ Scheduler that runs transformer groups respecting a dependency DAG. Groups are run in declaration order, skipping any whose dependencies are not yet stable. When a group makes changes, all other groups are invalidated unless a selective invalidation set is configured for that group. The pipeline terminates when every group is stable. """ def __init__( self, groups: list[TransformerGroup], dependencies: dict[str, set[str]] | None = None, invalidates: dict[str, set[str]] | None = None, ): self._groups = {g.name: g for g in groups} self._order = [g.name for g in groups] self._deps = dependencies or {} self._invalidates = invalidates or {} all_names = set(self._order) for name, deps in self._deps.items(): if name not in all_names: raise ValueError(F'unknown group in dependencies: {name!r}') unknown = deps - all_names if unknown: raise ValueError( F'group {name!r} depends on unknown groups: {unknown}') for name, targets in self._invalidates.items(): if name not in all_names: raise ValueError(F'unknown group in invalidates: {name!r}') unknown = targets - all_names if unknown: raise ValueError( F'group {name!r} invalidates unknown groups: {unknown}') def run(self, ast: Node, max_steps: int = 0) -> int: """ Execute the pipeline. Returns the number of individual transformer invocations that resulted in a change. A return value of 0 means the entire pipeline was already stable. """ stable: set[str] = set() steps = 0 while True: progress = False for name in self._order: if name in stable: continue deps = self._deps.get(name, set()) if not deps <= stable: continue group = self._groups[name] changed, steps = group.run(ast, steps, max_steps) stable.add(name) if changed: targets = self._invalidates.get(name) if targets is None: stable = {name} else: stable -= targets progress = True break progress = True if not progress: break return stepsMethods
def run(self, ast, max_steps=0)-
Execute the pipeline. Returns the number of individual transformer invocations that resulted in a change. A return value of 0 means the entire pipeline was already stable.
Expand source code Browse git
def run(self, ast: Node, max_steps: int = 0) -> int: """ Execute the pipeline. Returns the number of individual transformer invocations that resulted in a change. A return value of 0 means the entire pipeline was already stable. """ stable: set[str] = set() steps = 0 while True: progress = False for name in self._order: if name in stable: continue deps = self._deps.get(name, set()) if not deps <= stable: continue group = self._groups[name] changed, steps = group.run(ast, steps, max_steps) stable.add(name) if changed: targets = self._invalidates.get(name) if targets is None: stable = {name} else: stable -= targets progress = True break progress = True if not progress: break return steps