Module refinery.units.blockwise.alu
Expand source code Browse git
from __future__ import annotations
from refinery.lib.argformats import PythonExpression
from refinery.lib.meta import metavars
from refinery.lib.types import INF, Param
from refinery.units.blockwise import Arg, ArithmeticUnit, FastBlockError
class IndexCounter:
mask: int
index: int
def init(self, mask):
self.mask = mask
self.index = -1
def __iter__(self):
return self
def __next__(self):
self.index = index = self.index + 1 & self.mask
return index
class alu(ArithmeticUnit):
"""
General arithmetic-logic, blockwise operations on the input data. Useful for simple, homebrew
encoding and encryption schemes.
The unit allows you to specify a custom Python expression where the following variables can be
used (rotation operations are interpreted as shifts when arbitrary precision is used):
- `A`: same as `V[0]`
- `B`: current block (already updated in epilogue)
- `E`: block value of encoded input (not changed after update)
- `N`: number of bytes in the input
- `K`: current index in the input
- `S`: the internal state value
- `V`: the vector of arguments
- `I`: function that casts to a signed int in current precision
- `U`: function that casts to unsigned int in current precision
- `R`: function; `R(x,3)` rotates x by 3 to the right
- `L`: function; `L(x,3)` rotates x by 3 to the left
- `M`: function; `M(x,8)` picks the lower 8 bits of x
- `X`: function that negates the bits of the input
Each block of the input is replaced by the value of this expression. It is possible to specify
prologue and epilogue expressions which are used to update the state variable `S` before and
after the update of each block, respectively.
"""
@staticmethod
def _parse_op(definition, default=None):
if definition:
return definition
elif not default:
raise ValueError('No definition given')
else:
return default
def __init__(
self,
operator: Param[str, Arg.String(help='A Python expression defining the operation.')],
*argument,
seed: Param[int | str, Arg.String('-s', help=(
'Optional seed value for the state variable S. The default is zero. This can be an expression '
'involving the variable N.'))] = 0,
skip: Param[int, Arg.Number('-k', help=(
'Optional skip value for the state machine. When specified, prologue and epilogue are exeucted '
'N times before operation on the input buffer begins: the expressions may not depend on block '
'values or indices.'))] = 0,
prologue: Param[str, Arg.String('-p', metavar='E', help=(
'Optional expression with which the state variable S is updated before a block is operated on.'))] = '',
epilogue: Param[str, Arg.String('-e', metavar='E', group='EPI', help=(
'Optional expression with which the state variable S is updated after a block was operated on.'))] = '',
inc: Param[bool, Arg.Switch('-I', group='EPI', help='equivalent to --epilogue=S+1')] = False,
dec: Param[bool, Arg.Switch('-D', group='EPI', help='equivalent to --epilogue=S-1')] = False,
cbc: Param[bool, Arg.Switch('-X', group='EPI', help='equivalent to --epilogue=(B)')] = False,
ctr: Param[bool, Arg.Switch('-C', group='EPI', help='equivalent to --epilogue=S+B')] = False,
lcg: Param[None | slice, Arg.Bounds('-G', group='EPI', metavar='a:b[:m]', help=(
'equivalent to --epilogue=(a*S+b)%%m'))] = None,
bigendian=False, blocksize=1, precision=-1
):
for flag, flag_is_set, expression in [
('--cbc', cbc, '(B)'),
('--inc', inc, 'S+1'),
('--dec', dec, 'S-1'),
('--ctr', ctr, 'S+B'),
('--lcg', lcg, 'S'),
]:
if not flag_is_set:
continue
if not epilogue:
epilogue = expression
continue
raise ValueError(
F'Ambiguous specification; epilogue was already set to {epilogue} '
F'when {flag} was parsed.')
self._index = IndexCounter()
super().__init__(
self._index,
*argument,
bigendian=bigendian,
blocksize=blocksize,
precision=precision,
seed=seed,
skip=skip,
lcg=lcg,
operator=self._parse_op(operator),
prologue=self._parse_op(prologue, 'S'),
epilogue=self._parse_op(epilogue, 'S'),
)
@property
def _is_ecb(self):
return not self.args.epilogue and not self.args.prologue
def _fastblock(self, data):
raise FastBlockError
def process(self, data):
context = dict(metavars(data))
seed = self.args.seed
skip = self.args.skip
fbits = self.fbits
fmask = self.fmask
self._index.init(self.fmask)
def _expression(definition: str):
return PythonExpression(definition, *'IBEASMNVRLX', all_variables_allowed=True, mask=fmask)
_prologue: str = self.args.prologue
_epilogue: str = self.args.epilogue
_operator: str = self.args.operator
lcg: slice[int, int, int | None] | None = self.args.lcg
if lcg is not None:
a = lcg.start
b = lcg.stop
if a is None or b is None:
raise ValueError('Invalid LCG specification. Both a multiplier and an increment must be specified.')
if a != 1:
_epilogue = F'{_epilogue}*{a}'
if b != 0:
_epilogue = F'{_epilogue}+{b}'
if m := lcg.step:
_epilogue = F'({_epilogue})%{m}'
prologue = _expression(_prologue).expression
epilogue = _expression(_epilogue).expression
operator = _expression(_operator).expression
def cast_unsigned(n) -> int:
return int(n) & fmask
def cast_signed(n) -> int:
n = int(n) & fmask
if n >> (fbits - 1):
return -((~n + 1) & fmask)
else:
return n
if fbits is INF:
rotate_r = int.__rshift__
rotate_l = int.__lshift__
else:
def rotate_r(n: int, k: int, /):
return (n >> k) | (n << (fbits - k)) & fmask
def rotate_l(n: int, k: int, /):
return (n << k) | (n >> (fbits - k)) & fmask
def negate_bits(n):
return n ^ fmask
def mask_to_bits(x, b):
return x & ((1 << b) - 1)
context.update(
N=len(data),
I=cast_signed,
U=cast_unsigned,
R=rotate_r,
L=rotate_l,
X=negate_bits,
M=mask_to_bits,
)
if isinstance(seed, str):
seed = PythonExpression(seed, 'IAMNVRLX', constants=context, mask=fmask)
if callable(seed):
seed = seed(context, N=len(data))
self._index.init(self.fmask)
context.update(S=seed)
for _ in range(skip):
context['S'] = eval(prologue, None, context)
context['S'] = eval(epilogue, None, context)
def operate(block, index, *args):
context.update(K=index, B=block, E=block, V=args)
if args:
context['A'] = args[0]
context['S'] = eval(prologue, None, context)
context['B'] = eval(operator, None, context)
context['S'] = eval(epilogue, None, context)
return context['B']
placeholder = self.operate
self.operate = operate
try:
result = super().process(data)
finally:
self.operate = placeholder
return result
@staticmethod
def operate(block, index, *args):
raise RuntimeError('This operate method cannot be called.')
def inplace(self, block, *args) -> None:
super().inplace(block, *args)
Classes
class IndexCounter-
Expand source code Browse git
class IndexCounter: mask: int index: int def init(self, mask): self.mask = mask self.index = -1 def __iter__(self): return self def __next__(self): self.index = index = self.index + 1 & self.mask return indexClass variables
var mask-
The type of the None singleton.
var index-
The type of the None singleton.
Methods
def init(self, mask)-
Expand source code Browse git
def init(self, mask): self.mask = mask self.index = -1
class alu (operator, *argument, seed=0, skip=0, prologue='', epilogue='', inc=False, dec=False, cbc=False, ctr=False, lcg=None, bigendian=False, blocksize=1, precision=-1)-
General arithmetic-logic, blockwise operations on the input data. Useful for simple, homebrew encoding and encryption schemes.
The unit allows you to specify a custom Python expression where the following variables can be used (rotation operations are interpreted as shifts when arbitrary precision is used):
A: same asV[0]B: current block (already updated in epilogue)E: block value of encoded input (not changed after update)N: number of bytes in the inputK: current index in the inputS: the internal state valueV: the vector of argumentsI: function that casts to a signed int in current precisionU: function that casts to unsigned int in current precisionR: function;R(x,3)rotates x by 3 to the rightL: function;L(x,3)rotates x by 3 to the leftM: function;M(x,8)picks the lower 8 bits of xX: function that negates the bits of the input
Each block of the input is replaced by the value of this expression. It is possible to specify prologue and epilogue expressions which are used to update the state variable
Sbefore and after the update of each block, respectively.Expand source code Browse git
class alu(ArithmeticUnit): """ General arithmetic-logic, blockwise operations on the input data. Useful for simple, homebrew encoding and encryption schemes. The unit allows you to specify a custom Python expression where the following variables can be used (rotation operations are interpreted as shifts when arbitrary precision is used): - `A`: same as `V[0]` - `B`: current block (already updated in epilogue) - `E`: block value of encoded input (not changed after update) - `N`: number of bytes in the input - `K`: current index in the input - `S`: the internal state value - `V`: the vector of arguments - `I`: function that casts to a signed int in current precision - `U`: function that casts to unsigned int in current precision - `R`: function; `R(x,3)` rotates x by 3 to the right - `L`: function; `L(x,3)` rotates x by 3 to the left - `M`: function; `M(x,8)` picks the lower 8 bits of x - `X`: function that negates the bits of the input Each block of the input is replaced by the value of this expression. It is possible to specify prologue and epilogue expressions which are used to update the state variable `S` before and after the update of each block, respectively. """ @staticmethod def _parse_op(definition, default=None): if definition: return definition elif not default: raise ValueError('No definition given') else: return default def __init__( self, operator: Param[str, Arg.String(help='A Python expression defining the operation.')], *argument, seed: Param[int | str, Arg.String('-s', help=( 'Optional seed value for the state variable S. The default is zero. This can be an expression ' 'involving the variable N.'))] = 0, skip: Param[int, Arg.Number('-k', help=( 'Optional skip value for the state machine. When specified, prologue and epilogue are exeucted ' 'N times before operation on the input buffer begins: the expressions may not depend on block ' 'values or indices.'))] = 0, prologue: Param[str, Arg.String('-p', metavar='E', help=( 'Optional expression with which the state variable S is updated before a block is operated on.'))] = '', epilogue: Param[str, Arg.String('-e', metavar='E', group='EPI', help=( 'Optional expression with which the state variable S is updated after a block was operated on.'))] = '', inc: Param[bool, Arg.Switch('-I', group='EPI', help='equivalent to --epilogue=S+1')] = False, dec: Param[bool, Arg.Switch('-D', group='EPI', help='equivalent to --epilogue=S-1')] = False, cbc: Param[bool, Arg.Switch('-X', group='EPI', help='equivalent to --epilogue=(B)')] = False, ctr: Param[bool, Arg.Switch('-C', group='EPI', help='equivalent to --epilogue=S+B')] = False, lcg: Param[None | slice, Arg.Bounds('-G', group='EPI', metavar='a:b[:m]', help=( 'equivalent to --epilogue=(a*S+b)%%m'))] = None, bigendian=False, blocksize=1, precision=-1 ): for flag, flag_is_set, expression in [ ('--cbc', cbc, '(B)'), ('--inc', inc, 'S+1'), ('--dec', dec, 'S-1'), ('--ctr', ctr, 'S+B'), ('--lcg', lcg, 'S'), ]: if not flag_is_set: continue if not epilogue: epilogue = expression continue raise ValueError( F'Ambiguous specification; epilogue was already set to {epilogue} ' F'when {flag} was parsed.') self._index = IndexCounter() super().__init__( self._index, *argument, bigendian=bigendian, blocksize=blocksize, precision=precision, seed=seed, skip=skip, lcg=lcg, operator=self._parse_op(operator), prologue=self._parse_op(prologue, 'S'), epilogue=self._parse_op(epilogue, 'S'), ) @property def _is_ecb(self): return not self.args.epilogue and not self.args.prologue def _fastblock(self, data): raise FastBlockError def process(self, data): context = dict(metavars(data)) seed = self.args.seed skip = self.args.skip fbits = self.fbits fmask = self.fmask self._index.init(self.fmask) def _expression(definition: str): return PythonExpression(definition, *'IBEASMNVRLX', all_variables_allowed=True, mask=fmask) _prologue: str = self.args.prologue _epilogue: str = self.args.epilogue _operator: str = self.args.operator lcg: slice[int, int, int | None] | None = self.args.lcg if lcg is not None: a = lcg.start b = lcg.stop if a is None or b is None: raise ValueError('Invalid LCG specification. Both a multiplier and an increment must be specified.') if a != 1: _epilogue = F'{_epilogue}*{a}' if b != 0: _epilogue = F'{_epilogue}+{b}' if m := lcg.step: _epilogue = F'({_epilogue})%{m}' prologue = _expression(_prologue).expression epilogue = _expression(_epilogue).expression operator = _expression(_operator).expression def cast_unsigned(n) -> int: return int(n) & fmask def cast_signed(n) -> int: n = int(n) & fmask if n >> (fbits - 1): return -((~n + 1) & fmask) else: return n if fbits is INF: rotate_r = int.__rshift__ rotate_l = int.__lshift__ else: def rotate_r(n: int, k: int, /): return (n >> k) | (n << (fbits - k)) & fmask def rotate_l(n: int, k: int, /): return (n << k) | (n >> (fbits - k)) & fmask def negate_bits(n): return n ^ fmask def mask_to_bits(x, b): return x & ((1 << b) - 1) context.update( N=len(data), I=cast_signed, U=cast_unsigned, R=rotate_r, L=rotate_l, X=negate_bits, M=mask_to_bits, ) if isinstance(seed, str): seed = PythonExpression(seed, 'IAMNVRLX', constants=context, mask=fmask) if callable(seed): seed = seed(context, N=len(data)) self._index.init(self.fmask) context.update(S=seed) for _ in range(skip): context['S'] = eval(prologue, None, context) context['S'] = eval(epilogue, None, context) def operate(block, index, *args): context.update(K=index, B=block, E=block, V=args) if args: context['A'] = args[0] context['S'] = eval(prologue, None, context) context['B'] = eval(operator, None, context) context['S'] = eval(epilogue, None, context) return context['B'] placeholder = self.operate self.operate = operate try: result = super().process(data) finally: self.operate = placeholder return result @staticmethod def operate(block, index, *args): raise RuntimeError('This operate method cannot be called.') def inplace(self, block, *args) -> None: super().inplace(block, *args)Ancestors
Subclasses
Class variables
var reverse-
The type of the None singleton.
Static methods
def operate(block, index, *args)-
Expand source code Browse git
@staticmethod def operate(block, index, *args): raise RuntimeError('This operate method cannot be called.')
Methods
def inplace(self, block, *args)-
Expand source code Browse git
def inplace(self, block, *args) -> None: super().inplace(block, *args)
Inherited members
ArithmeticUnit:ArithmeticUnit:FilterEverythingRequiresactassemblebytestreamchunkchunk_into_bytescodecfilterfinishhandlesis_quietis_reversibleisattylabelledleniencylog_alwayslog_debuglog_detachlog_faillog_infolog_levellog_warnloggernamenozzleprocess_blockreadread1resetrestrunsourcesuperinitunchunk
BlockTransformationBase: