Module refinery.lib.crypto
Primitives used in custom cryptographic implementations.
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Primitives used in custom cryptographic implementations.
"""
from __future__ import annotations
from typing import Callable, ClassVar, Container, Generator, Optional, Type, Union, Dict
from abc import ABC, abstractmethod
from enum import Enum
BufferType = Union[bytearray, bytes, memoryview]
CIPHER_MODES: Dict[str, CipherMode] = {}
def strxor(a: bytes, b: bytes):
"""
Return the XOR of the two byte strings `a` and `b`. The shorter of the two strings defines the
length of the output.
"""
return bytes(a ^ b for a, b in zip(a, b))
def _register_cipher_mode(cls: Type[CipherMode]):
cls._identifier = len(CIPHER_MODES)
CIPHER_MODES[cls.__name__] = cls
return cls
def rotl128(x: int, c: int):
"""
Rotate the 128-bit integer `x` by `c` positions to the left.
"""
return ((x << c) | (x >> (0x80 - c))) & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
def rotl64(x: int, c: int):
"""
Rotate the 64-bit integer `x` by `c` positions to the left.
"""
return ((x << c) | (x >> (0x40 - c))) & 0xFFFFFFFFFFFFFFFF
def rotl32(x: int, c: int):
"""
Rotate the 32-bit integer `x` by `c` positions to the left.
"""
return ((x << c) | (x >> (0x20 - c))) & 0xFFFFFFFF
def rotl16(x: int, c: int):
"""
Rotate the 16-bit integer `x` by `c` positions to the left.
"""
return ((x << c) | (x >> (0x10 - c))) & 0xFFFF
def rotl8(x: int, c: int):
"""
Rotate the byte `x` by `c` positions to the left.
"""
return ((x << c) | (x >> (0x08 - c))) & 0xFF
def rotr128(x: int, c: int):
"""
Rotate the 128-bit integer `x` by `c` positions to the right.
"""
return (x << (0x80 - c) & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF) | (x >> c)
def rotr64(x: int, c: int):
"""
Rotate the 64-bit integer `x` by `c` positions to the right.
"""
return (x << (0x40 - c) & 0xFFFFFFFFFFFFFFFF) | (x >> c)
def rotr32(x: int, c: int):
"""
Rotate the 32-bit integer `x` by `c` positions to the right.
"""
return (x << (0x20 - c) & 0xFFFFFFFF) | (x >> c)
def rotr16(x: int, c: int):
"""
Rotate the 16-bit integer `x` by `c` positions to the right.
"""
return (x << (0x10 - c) & 0xFFFF) | (x >> c)
def rotr8(x: int, c: int):
"""
Rotate the byte `x` by `c` positions to the right.
"""
return (x << (0x08 - c) & 0xFF) | (x >> c)
def rotr(n: int, x: int, c: int) -> int:
"""
Rotate the `n`-bit integer `x` by `c` positions to the right. If `n` is among the common bit
sizes 8, 16, 32, 64, or 128, then one of the more specific functions in this module should be
used instead.
"""
mask = (1 << n) - 1
c %= n
return (x >> c) | (x << (n - c) & mask)
def rotl(n: int, x: int, c: int) -> int:
"""
Rotate the `n`-bit integer `x` by `c` positions to the left. If `n` is among the common bit
sizes 8, 16, 32, 64, or 128, then one of the more specific functions in this module should be
used instead.
"""
mask = (1 << n) - 1
c %= n
return (x >> (n - c)) | (x << c & mask)
class Operation(str, Enum):
"""
Specifies whether data is currently being encrypted or decrypted.
"""
Encrypt = 'encrypt'
Decrypt = 'decrypt'
class CipherMode(ABC):
"""
Abstract base class for a cipher mode of operation.
"""
encrypt_block: Callable[[memoryview], memoryview]
decrypt_block: Callable[[memoryview], memoryview]
aligned: bool = True
_identifier: ClassVar[int]
@abstractmethod
def encrypt(self) -> Generator[memoryview, memoryview, None]:
"""
Implements data encryption according to the current cipher mode and underlying cipher.
"""
raise NotImplementedError
@abstractmethod
def decrypt(self) -> Generator[memoryview, memoryview, None]:
"""
Implements data decryption according to the current cipher mode and underlying cipher.
"""
raise NotImplementedError
def apply(
self,
operation: Operation,
dst: memoryview,
src: memoryview,
encrypt_block: Callable[[memoryview], memoryview],
decrypt_block: Callable[[memoryview], memoryview],
blocksize: int,
) -> memoryview:
"""
This method is used to perform a cryptographic `refinery.lib.crypto.Operation` to a given
source `src` and write the result to the memory at `dst` according to the current cipher
mode. To this end, it requires the block encryption and decryption primitives of the
underlying cipher and the current block size.
"""
self.encrypt_block = encrypt_block
self.decrypt_block = decrypt_block
engine: Generator[memoryview, memoryview, None] = {
Operation.Encrypt: self.encrypt,
Operation.Decrypt: self.decrypt,
}[operation]()
next(engine)
top, rest = divmod(len(src), blocksize)
top *= blocksize
for k in range(0, top, blocksize):
dst[k:k + blocksize] = engine.send(src[k:k + blocksize])
if rest:
dst[-rest:] = engine.send(src[-rest:])[:rest]
engine.close()
return dst
@_register_cipher_mode
class ECB(CipherMode):
"""
The Electronic Codebook (ECB) is the most simple cipher mode of operation. The underlying
cipher is applied block-wise with no additional safeguards.
"""
def decrypt(self) -> Generator[memoryview, memoryview, None]:
M = None
D = self.decrypt_block
while True:
C = yield M
M = D(C)
def encrypt(self) -> Generator[memoryview, memoryview, None]:
C = None
E = self.encrypt_block
while True:
M = yield C
C = E(M)
class DataUnaligned(ValueError):
"""
Raised when input data is unexpectedly unaligned to the current block size.
"""
def __init__(self) -> None:
super().__init__('Data not aligned to block size.')
class StatefulCipherMode(CipherMode):
"""
A subclass of `refinery.lib.crypto.CipherMode` that holds a state while performing any of
its cryptographic `refinery.lib.crypto.Operation`s.
"""
iv: BufferType
"""
The initial vector for the internal state of the cipher mode.
"""
def __init__(self, iv: BufferType):
self.iv = iv
@_register_cipher_mode
class CBC(StatefulCipherMode):
"""
An implementation of the popular Cipher Block Chaining mode of operation.
"""
def encrypt(self) -> Generator[memoryview, memoryview, None]:
C = self.iv
E = self.encrypt_block
while True:
M = yield C
C = E(strxor(M, C))
def decrypt(self) -> Generator[memoryview, memoryview, None]:
S = self.iv
M = None
D = self.decrypt_block
while True:
C = yield M
M = strxor(D(C), S)
S = bytes(C)
@_register_cipher_mode
class PCBC(StatefulCipherMode):
"""
An implementation of Propagating Cipher Block Chaining.
"""
def encrypt(self) -> Generator[memoryview, memoryview, None]:
S = self.iv
C = None
E = self.encrypt_block
while True:
M = yield C
C = E(strxor(M, S))
S = strxor(C, M)
def decrypt(self) -> Generator[memoryview, memoryview, None]:
S = self.iv
M = None
D = self.decrypt_block
while True:
C = yield M
M = strxor(S, D(C))
S = strxor(M, C)
@_register_cipher_mode
class CFB(CipherMode):
"""
Cipher Feedback Mode: https://csrc.nist.gov/publications/detail/sp/800-38a/final
"""
iv: BufferType
segment_size: int
aligned = False
def __init__(self, iv: BufferType, segment_size: Optional[int] = None):
if segment_size is None:
segment_size = 8
if segment_size % 8 != 0:
raise NotImplementedError('segment sizes may only be multiples of 8')
segment_size = segment_size // 8
if len(iv) % segment_size != 0:
raise NotImplementedError(
F'the block size {len(iv) * 8} is not an even multiple of the segment '
F'size {segment_size * 8}; this is currently not supported.')
self.segment_size = segment_size
self.iv = iv
def encrypt(self) -> Generator[memoryview, memoryview, None]:
s = self.segment_size
S = bytearray(self.iv)
E = self.encrypt_block
C = bytearray(len(self.iv))
if s == 1:
while True:
M = yield C
for k, m in enumerate(M):
C[k] = c = m ^ E(S)[0]
S[:-1], S[-1] = memoryview(S)[1:], c
else:
segments = [slice(i, i + s) for i in range(0, len(S), s)]
while True:
M = yield C
for k in segments:
m = M[k]
C[k] = c = strxor(m, E(S)[:s])
S[:-s], S[-s:] = memoryview(S)[s:], c
def decrypt(self) -> Generator[memoryview, memoryview, None]:
s = self.segment_size
S = bytearray(self.iv)
E = self.encrypt_block
M = bytearray(len(self.iv))
if s == 1:
while True:
C = yield M
for k, c in enumerate(C):
M[k] = c ^ E(S)[0]
S[:-1], S[-1] = memoryview(S)[1:], c
else:
segments = [slice(i, i + s) for i in range(0, len(S), s)]
while True:
C = yield M
for k in segments:
c = C[k]
M[k] = strxor(c, E(S)[:s])
S[:-s], S[-s:] = memoryview(S)[s:], c
@_register_cipher_mode
class OFB(StatefulCipherMode):
"""
An implementation of Output Feedback Mode.
"""
aligned = False
def encrypt(self) -> Generator[memoryview, memoryview, None]:
S = self.iv
C = None
E = self.encrypt_block
while True:
M = yield C
S = E(S)
C = strxor(M, S)
decrypt = encrypt
@_register_cipher_mode
class CTR(CipherMode):
"""
An implementation of Counter mode.
"""
counter_len: int
prefix: BufferType
suffix: BufferType
initial_value: int
little_endian: bool
block_size: int
aligned = False
@property
def byte_order(self):
return 'little' if self.little_endian else 'big'
def __init__(
self,
block_size: Optional[int] = None,
counter: Optional[Dict] = None,
nonce: Optional[BufferType] = None,
initial_value: Optional[int] = 0,
little_endian: bool = False
):
if counter is not None:
self.initial_value = counter.get('initial_value', initial_value)
self.little_endian = counter.get('little_endian', little_endian)
self.prefix = counter['prefix']
self.suffix = counter['suffix']
self.counter_len = counter['counter_len']
self.block_size = self.counter_len + len(self.prefix) + len(self.suffix)
if block_size not in {None, self.block_size}:
raise ValueError('Information in counter object does not align with block size.')
return
if block_size is None:
raise ValueError('Unable to construct CTR mode object without block_size or counter argument.')
self.initial_value = initial_value
self.little_endian = little_endian
self.suffix = B''
self.block_size = block_size
if nonce is not None:
if len(nonce) > block_size:
raise ValueError('Nonce length exceeds block length.')
self.counter_len = block_size - len(nonce)
self.prefix = nonce
else:
self.counter_len = block_size // 2
self.prefix = B'\0' * (block_size - self.counter_len)
def encrypt(self) -> Generator[memoryview, memoryview, None]:
S = bytearray(self.block_size)
J = slice(len(self.prefix), self.block_size - len(self.suffix))
K = self.initial_value
if self.prefix:
S[:+len(self.prefix)] = self.prefix
if self.suffix:
S[-len(self.suffix):] = self.suffix
C = None
E = self.encrypt_block
order = self.byte_order
csize = self.counter_len
mask = (1 << (csize * 8)) - 1
while True:
M = yield C
S[J] = K.to_bytes(csize, order)
K = K + 1 & mask
C = strxor(E(S), M)
decrypt = encrypt
class CipherInterface(ABC):
"""
Abstract base class for refinery's block cipher interface.
"""
key_size: Container[int]
"""
A container containing all valid key sizes for this cipher.
"""
block_size: int
"""
The block size of this cipher.
"""
@abstractmethod
def encrypt(self, M: BufferType) -> BufferType: ...
"""
Data encryption according to this cipher interface.
"""
@abstractmethod
def decrypt(self, C: BufferType) -> BufferType: ...
"""
Data decryption according to this cipher interface.
"""
class CipherObjectFactory(ABC):
"""
An abstract class to build `refinery.lib.crypto.CipherInterface`s from an asortment of
cryptographic secrets and parameters.
"""
name: str
key_size: Optional[Container[int]] = None
block_size: Optional[int] = None
@abstractmethod
def new(
self,
key: BufferType,
iv: Optional[BufferType] = None,
counter: Optional[int] = None,
initial_value: Optional[int] = 0,
nonce: Optional[BufferType] = None,
mode: Optional[str] = None,
segment_size: Optional[int] = None,
block_size: Optional[int] = None,
**cipher_args
) -> CipherInterface:
"""
Build the actual `refinery.lib.crypto.CipherInterface` from the given input parameters.
This mimics the PyCrypto interface for new ciphers in order to make the refinery factory
cross-compatible with that library.
"""
...
class PyCryptoFactoryWrapper(CipherObjectFactory):
"""
Wraps a PyCrypto module as a `refinery.lib.crypto.CipherObjectFactory`.
"""
def __init__(self, module):
self.module = module
def new(self, *a, **k) -> CipherInterface:
return self.module.new(*a, **k)
@property
def key_size(self):
try:
value = self.module.key_size
except AttributeError:
return None
if isinstance(value, int):
return {value}
return value
@property
def block_size(self):
try:
value = self.module.block_size
except AttributeError:
return None
return value
def __repr__(self):
return repr(self.module)
def __dir__(self):
return dir(self.module)
def __getattr__(self, key):
return getattr(self.module, key)
class BlockCipherFactory(CipherObjectFactory):
"""
A `refinery.lib.crypto.CipherObjectFactory` for custom block ciphers.
"""
cipher: Type[BlockCipher]
def __init__(self, cipher: Type[BlockCipher]):
self.cipher = cipher
self._modes = []
for name, mode in CIPHER_MODES.items():
setattr(self, F'MODE_{name}', mode._identifier)
self._modes.append(mode)
def new(self, key, mode=None, **args) -> CipherInterface:
if mode is not None:
mode = self._modes[mode]
mode_arguments = {}
cipher = self.cipher
for arg in ('iv', 'counter', 'initial_value', 'nonce', 'mode', 'segment_size'):
try:
mode_arguments[arg] = args.pop(arg)
except KeyError:
pass
if mode is CTR:
block_size = self.block_size
if block_size is None:
# This happens for ciphers that do not have a fixed block size, i.e. the
# block size is truly an instance attribute and not a class property.
# In this case, we create a temporary cipher object and use it to obtain
# the true block size.
block_size = cipher(key, ECB, **args).block_size
mode_arguments.update(block_size=block_size)
mode = mode(**mode_arguments)
return cipher(key, mode, **args)
@property
def name(self):
return self.cipher.__name__
@property
def key_size(self):
try:
value = self.cipher.key_size
except AttributeError:
return None
if isinstance(value, property):
return None
return value
@property
def block_size(self):
try:
value = self.cipher.block_size
except AttributeError:
return None
if not isinstance(value, int):
return None
return value
class BlockCipher(CipherInterface, ABC):
block_size: int
key: BufferType
mode: CipherMode
key_size: Container[int]
def __init__(self, key: BufferType, mode: Optional[CipherMode]):
if len(key) not in self.key_size:
raise ValueError(F'The key size {len(key)} is not supported by {self.__class__.__name__.lower()}.')
self.key = key
self.mode = mode or ECB()
@abstractmethod
def block_encrypt(self, data: BufferType) -> BufferType:
"""
Encryption of a single block of data.
"""
raise NotImplementedError
@abstractmethod
def block_decrypt(self, data: BufferType) -> BufferType:
"""
Decryption of a single block of data.
"""
raise NotImplementedError
def _apply_blockwise(self, operation: Operation, data: BufferType) -> BufferType:
block_size = self.block_size
mode = self.mode
if len(data) % block_size != 0 and mode.aligned:
raise DataUnaligned
dst = src = memoryview(data)
if dst.readonly:
dst = bytearray(src)
return mode.apply(
operation,
dst,
src,
self.block_encrypt,
self.block_decrypt,
block_size
)
def encrypt(self, data: BufferType) -> BufferType:
"""
Encrypt the input data.
"""
return self._apply_blockwise(Operation.Encrypt, data)
def decrypt(self, data: BufferType) -> BufferType:
"""
Decrypt the input data.
"""
return self._apply_blockwise(Operation.Decrypt, data)
Functions
def strxor(a, b)
-
Return the XOR of the two byte strings
a
andb
. The shorter of the two strings defines the length of the output.Expand source code Browse git
def strxor(a: bytes, b: bytes): """ Return the XOR of the two byte strings `a` and `b`. The shorter of the two strings defines the length of the output. """ return bytes(a ^ b for a, b in zip(a, b))
def rotl128(x, c)
-
Rotate the 128-bit integer
x
byc
positions to the left.Expand source code Browse git
def rotl128(x: int, c: int): """ Rotate the 128-bit integer `x` by `c` positions to the left. """ return ((x << c) | (x >> (0x80 - c))) & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
def rotl64(x, c)
-
Rotate the 64-bit integer
x
byc
positions to the left.Expand source code Browse git
def rotl64(x: int, c: int): """ Rotate the 64-bit integer `x` by `c` positions to the left. """ return ((x << c) | (x >> (0x40 - c))) & 0xFFFFFFFFFFFFFFFF
def rotl32(x, c)
-
Rotate the 32-bit integer
x
byc
positions to the left.Expand source code Browse git
def rotl32(x: int, c: int): """ Rotate the 32-bit integer `x` by `c` positions to the left. """ return ((x << c) | (x >> (0x20 - c))) & 0xFFFFFFFF
def rotl16(x, c)
-
Rotate the 16-bit integer
x
byc
positions to the left.Expand source code Browse git
def rotl16(x: int, c: int): """ Rotate the 16-bit integer `x` by `c` positions to the left. """ return ((x << c) | (x >> (0x10 - c))) & 0xFFFF
def rotl8(x, c)
-
Rotate the byte
x
byc
positions to the left.Expand source code Browse git
def rotl8(x: int, c: int): """ Rotate the byte `x` by `c` positions to the left. """ return ((x << c) | (x >> (0x08 - c))) & 0xFF
def rotr128(x, c)
-
Rotate the 128-bit integer
x
byc
positions to the right.Expand source code Browse git
def rotr128(x: int, c: int): """ Rotate the 128-bit integer `x` by `c` positions to the right. """ return (x << (0x80 - c) & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF) | (x >> c)
def rotr64(x, c)
-
Rotate the 64-bit integer
x
byc
positions to the right.Expand source code Browse git
def rotr64(x: int, c: int): """ Rotate the 64-bit integer `x` by `c` positions to the right. """ return (x << (0x40 - c) & 0xFFFFFFFFFFFFFFFF) | (x >> c)
def rotr32(x, c)
-
Rotate the 32-bit integer
x
byc
positions to the right.Expand source code Browse git
def rotr32(x: int, c: int): """ Rotate the 32-bit integer `x` by `c` positions to the right. """ return (x << (0x20 - c) & 0xFFFFFFFF) | (x >> c)
def rotr16(x, c)
-
Rotate the 16-bit integer
x
byc
positions to the right.Expand source code Browse git
def rotr16(x: int, c: int): """ Rotate the 16-bit integer `x` by `c` positions to the right. """ return (x << (0x10 - c) & 0xFFFF) | (x >> c)
def rotr8(x, c)
-
Rotate the byte
x
byc
positions to the right.Expand source code Browse git
def rotr8(x: int, c: int): """ Rotate the byte `x` by `c` positions to the right. """ return (x << (0x08 - c) & 0xFF) | (x >> c)
def rotr(n, x, c)
-
Rotate the
n
-bit integerx
byc
positions to the right. Ifn
is among the common bit sizes 8, 16, 32, 64, or 128, then one of the more specific functions in this module should be used instead.Expand source code Browse git
def rotr(n: int, x: int, c: int) -> int: """ Rotate the `n`-bit integer `x` by `c` positions to the right. If `n` is among the common bit sizes 8, 16, 32, 64, or 128, then one of the more specific functions in this module should be used instead. """ mask = (1 << n) - 1 c %= n return (x >> c) | (x << (n - c) & mask)
def rotl(n, x, c)
-
Rotate the
n
-bit integerx
byc
positions to the left. Ifn
is among the common bit sizes 8, 16, 32, 64, or 128, then one of the more specific functions in this module should be used instead.Expand source code Browse git
def rotl(n: int, x: int, c: int) -> int: """ Rotate the `n`-bit integer `x` by `c` positions to the left. If `n` is among the common bit sizes 8, 16, 32, 64, or 128, then one of the more specific functions in this module should be used instead. """ mask = (1 << n) - 1 c %= n return (x >> (n - c)) | (x << c & mask)
Classes
class Operation (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Specifies whether data is currently being encrypted or decrypted.
Expand source code Browse git
class Operation(str, Enum): """ Specifies whether data is currently being encrypted or decrypted. """ Encrypt = 'encrypt' Decrypt = 'decrypt'
Ancestors
- builtins.str
- enum.Enum
Class variables
var Encrypt
var Decrypt
class CipherMode
-
Abstract base class for a cipher mode of operation.
Expand source code Browse git
class CipherMode(ABC): """ Abstract base class for a cipher mode of operation. """ encrypt_block: Callable[[memoryview], memoryview] decrypt_block: Callable[[memoryview], memoryview] aligned: bool = True _identifier: ClassVar[int] @abstractmethod def encrypt(self) -> Generator[memoryview, memoryview, None]: """ Implements data encryption according to the current cipher mode and underlying cipher. """ raise NotImplementedError @abstractmethod def decrypt(self) -> Generator[memoryview, memoryview, None]: """ Implements data decryption according to the current cipher mode and underlying cipher. """ raise NotImplementedError def apply( self, operation: Operation, dst: memoryview, src: memoryview, encrypt_block: Callable[[memoryview], memoryview], decrypt_block: Callable[[memoryview], memoryview], blocksize: int, ) -> memoryview: """ This method is used to perform a cryptographic `refinery.lib.crypto.Operation` to a given source `src` and write the result to the memory at `dst` according to the current cipher mode. To this end, it requires the block encryption and decryption primitives of the underlying cipher and the current block size. """ self.encrypt_block = encrypt_block self.decrypt_block = decrypt_block engine: Generator[memoryview, memoryview, None] = { Operation.Encrypt: self.encrypt, Operation.Decrypt: self.decrypt, }[operation]() next(engine) top, rest = divmod(len(src), blocksize) top *= blocksize for k in range(0, top, blocksize): dst[k:k + blocksize] = engine.send(src[k:k + blocksize]) if rest: dst[-rest:] = engine.send(src[-rest:])[:rest] engine.close() return dst
Ancestors
- abc.ABC
Subclasses
Class variables
var encrypt_block
var decrypt_block
var aligned
Methods
def encrypt(self)
-
Implements data encryption according to the current cipher mode and underlying cipher.
Expand source code Browse git
@abstractmethod def encrypt(self) -> Generator[memoryview, memoryview, None]: """ Implements data encryption according to the current cipher mode and underlying cipher. """ raise NotImplementedError
def decrypt(self)
-
Implements data decryption according to the current cipher mode and underlying cipher.
Expand source code Browse git
@abstractmethod def decrypt(self) -> Generator[memoryview, memoryview, None]: """ Implements data decryption according to the current cipher mode and underlying cipher. """ raise NotImplementedError
def apply(self, operation, dst, src, encrypt_block, decrypt_block, blocksize)
-
This method is used to perform a cryptographic
Operation
to a given sourcesrc
and write the result to the memory atdst
according to the current cipher mode. To this end, it requires the block encryption and decryption primitives of the underlying cipher and the current block size.Expand source code Browse git
def apply( self, operation: Operation, dst: memoryview, src: memoryview, encrypt_block: Callable[[memoryview], memoryview], decrypt_block: Callable[[memoryview], memoryview], blocksize: int, ) -> memoryview: """ This method is used to perform a cryptographic `refinery.lib.crypto.Operation` to a given source `src` and write the result to the memory at `dst` according to the current cipher mode. To this end, it requires the block encryption and decryption primitives of the underlying cipher and the current block size. """ self.encrypt_block = encrypt_block self.decrypt_block = decrypt_block engine: Generator[memoryview, memoryview, None] = { Operation.Encrypt: self.encrypt, Operation.Decrypt: self.decrypt, }[operation]() next(engine) top, rest = divmod(len(src), blocksize) top *= blocksize for k in range(0, top, blocksize): dst[k:k + blocksize] = engine.send(src[k:k + blocksize]) if rest: dst[-rest:] = engine.send(src[-rest:])[:rest] engine.close() return dst
class ECB
-
The Electronic Codebook (ECB) is the most simple cipher mode of operation. The underlying cipher is applied block-wise with no additional safeguards.
Expand source code Browse git
class ECB(CipherMode): """ The Electronic Codebook (ECB) is the most simple cipher mode of operation. The underlying cipher is applied block-wise with no additional safeguards. """ def decrypt(self) -> Generator[memoryview, memoryview, None]: M = None D = self.decrypt_block while True: C = yield M M = D(C) def encrypt(self) -> Generator[memoryview, memoryview, None]: C = None E = self.encrypt_block while True: M = yield C C = E(M)
Ancestors
- CipherMode
- abc.ABC
Class variables
var encrypt_block
var decrypt_block
var aligned
Inherited members
class DataUnaligned
-
Raised when input data is unexpectedly unaligned to the current block size.
Expand source code Browse git
class DataUnaligned(ValueError): """ Raised when input data is unexpectedly unaligned to the current block size. """ def __init__(self) -> None: super().__init__('Data not aligned to block size.')
Ancestors
- builtins.ValueError
- builtins.Exception
- builtins.BaseException
class StatefulCipherMode (iv)
-
A subclass of
CipherMode
that holds a state while performing any of its cryptographicOperation
s.Expand source code Browse git
class StatefulCipherMode(CipherMode): """ A subclass of `refinery.lib.crypto.CipherMode` that holds a state while performing any of its cryptographic `refinery.lib.crypto.Operation`s. """ iv: BufferType """ The initial vector for the internal state of the cipher mode. """ def __init__(self, iv: BufferType): self.iv = iv
Ancestors
- CipherMode
- abc.ABC
Subclasses
Class variables
var iv
-
The initial vector for the internal state of the cipher mode.
Inherited members
class CBC (iv)
-
An implementation of the popular Cipher Block Chaining mode of operation.
Expand source code Browse git
class CBC(StatefulCipherMode): """ An implementation of the popular Cipher Block Chaining mode of operation. """ def encrypt(self) -> Generator[memoryview, memoryview, None]: C = self.iv E = self.encrypt_block while True: M = yield C C = E(strxor(M, C)) def decrypt(self) -> Generator[memoryview, memoryview, None]: S = self.iv M = None D = self.decrypt_block while True: C = yield M M = strxor(D(C), S) S = bytes(C)
Ancestors
- StatefulCipherMode
- CipherMode
- abc.ABC
Inherited members
class PCBC (iv)
-
An implementation of Propagating Cipher Block Chaining.
Expand source code Browse git
class PCBC(StatefulCipherMode): """ An implementation of Propagating Cipher Block Chaining. """ def encrypt(self) -> Generator[memoryview, memoryview, None]: S = self.iv C = None E = self.encrypt_block while True: M = yield C C = E(strxor(M, S)) S = strxor(C, M) def decrypt(self) -> Generator[memoryview, memoryview, None]: S = self.iv M = None D = self.decrypt_block while True: C = yield M M = strxor(S, D(C)) S = strxor(M, C)
Ancestors
- StatefulCipherMode
- CipherMode
- abc.ABC
Inherited members
class CFB (iv, segment_size=None)
-
Cipher Feedback Mode: https://csrc.nist.gov/publications/detail/sp/800-38a/final
Expand source code Browse git
class CFB(CipherMode): """ Cipher Feedback Mode: https://csrc.nist.gov/publications/detail/sp/800-38a/final """ iv: BufferType segment_size: int aligned = False def __init__(self, iv: BufferType, segment_size: Optional[int] = None): if segment_size is None: segment_size = 8 if segment_size % 8 != 0: raise NotImplementedError('segment sizes may only be multiples of 8') segment_size = segment_size // 8 if len(iv) % segment_size != 0: raise NotImplementedError( F'the block size {len(iv) * 8} is not an even multiple of the segment ' F'size {segment_size * 8}; this is currently not supported.') self.segment_size = segment_size self.iv = iv def encrypt(self) -> Generator[memoryview, memoryview, None]: s = self.segment_size S = bytearray(self.iv) E = self.encrypt_block C = bytearray(len(self.iv)) if s == 1: while True: M = yield C for k, m in enumerate(M): C[k] = c = m ^ E(S)[0] S[:-1], S[-1] = memoryview(S)[1:], c else: segments = [slice(i, i + s) for i in range(0, len(S), s)] while True: M = yield C for k in segments: m = M[k] C[k] = c = strxor(m, E(S)[:s]) S[:-s], S[-s:] = memoryview(S)[s:], c def decrypt(self) -> Generator[memoryview, memoryview, None]: s = self.segment_size S = bytearray(self.iv) E = self.encrypt_block M = bytearray(len(self.iv)) if s == 1: while True: C = yield M for k, c in enumerate(C): M[k] = c ^ E(S)[0] S[:-1], S[-1] = memoryview(S)[1:], c else: segments = [slice(i, i + s) for i in range(0, len(S), s)] while True: C = yield M for k in segments: c = C[k] M[k] = strxor(c, E(S)[:s]) S[:-s], S[-s:] = memoryview(S)[s:], c
Ancestors
- CipherMode
- abc.ABC
Class variables
var iv
var segment_size
var aligned
Inherited members
class OFB (iv)
-
An implementation of Output Feedback Mode.
Expand source code Browse git
class OFB(StatefulCipherMode): """ An implementation of Output Feedback Mode. """ aligned = False def encrypt(self) -> Generator[memoryview, memoryview, None]: S = self.iv C = None E = self.encrypt_block while True: M = yield C S = E(S) C = strxor(M, S) decrypt = encrypt
Ancestors
- StatefulCipherMode
- CipherMode
- abc.ABC
Class variables
var aligned
Methods
def decrypt(self)
-
Implements data encryption according to the current cipher mode and underlying cipher.
Expand source code Browse git
def encrypt(self) -> Generator[memoryview, memoryview, None]: S = self.iv C = None E = self.encrypt_block while True: M = yield C S = E(S) C = strxor(M, S)
Inherited members
class CTR (block_size=None, counter=None, nonce=None, initial_value=0, little_endian=False)
-
An implementation of Counter mode.
Expand source code Browse git
class CTR(CipherMode): """ An implementation of Counter mode. """ counter_len: int prefix: BufferType suffix: BufferType initial_value: int little_endian: bool block_size: int aligned = False @property def byte_order(self): return 'little' if self.little_endian else 'big' def __init__( self, block_size: Optional[int] = None, counter: Optional[Dict] = None, nonce: Optional[BufferType] = None, initial_value: Optional[int] = 0, little_endian: bool = False ): if counter is not None: self.initial_value = counter.get('initial_value', initial_value) self.little_endian = counter.get('little_endian', little_endian) self.prefix = counter['prefix'] self.suffix = counter['suffix'] self.counter_len = counter['counter_len'] self.block_size = self.counter_len + len(self.prefix) + len(self.suffix) if block_size not in {None, self.block_size}: raise ValueError('Information in counter object does not align with block size.') return if block_size is None: raise ValueError('Unable to construct CTR mode object without block_size or counter argument.') self.initial_value = initial_value self.little_endian = little_endian self.suffix = B'' self.block_size = block_size if nonce is not None: if len(nonce) > block_size: raise ValueError('Nonce length exceeds block length.') self.counter_len = block_size - len(nonce) self.prefix = nonce else: self.counter_len = block_size // 2 self.prefix = B'\0' * (block_size - self.counter_len) def encrypt(self) -> Generator[memoryview, memoryview, None]: S = bytearray(self.block_size) J = slice(len(self.prefix), self.block_size - len(self.suffix)) K = self.initial_value if self.prefix: S[:+len(self.prefix)] = self.prefix if self.suffix: S[-len(self.suffix):] = self.suffix C = None E = self.encrypt_block order = self.byte_order csize = self.counter_len mask = (1 << (csize * 8)) - 1 while True: M = yield C S[J] = K.to_bytes(csize, order) K = K + 1 & mask C = strxor(E(S), M) decrypt = encrypt
Ancestors
- CipherMode
- abc.ABC
Class variables
var counter_len
var prefix
var suffix
var initial_value
var little_endian
var block_size
var aligned
Instance variables
var byte_order
-
Expand source code Browse git
@property def byte_order(self): return 'little' if self.little_endian else 'big'
Methods
def decrypt(self)
-
Implements data encryption according to the current cipher mode and underlying cipher.
Expand source code Browse git
def encrypt(self) -> Generator[memoryview, memoryview, None]: S = bytearray(self.block_size) J = slice(len(self.prefix), self.block_size - len(self.suffix)) K = self.initial_value if self.prefix: S[:+len(self.prefix)] = self.prefix if self.suffix: S[-len(self.suffix):] = self.suffix C = None E = self.encrypt_block order = self.byte_order csize = self.counter_len mask = (1 << (csize * 8)) - 1 while True: M = yield C S[J] = K.to_bytes(csize, order) K = K + 1 & mask C = strxor(E(S), M)
Inherited members
class CipherInterface
-
Abstract base class for refinery's block cipher interface.
Expand source code Browse git
class CipherInterface(ABC): """ Abstract base class for refinery's block cipher interface. """ key_size: Container[int] """ A container containing all valid key sizes for this cipher. """ block_size: int """ The block size of this cipher. """ @abstractmethod def encrypt(self, M: BufferType) -> BufferType: ... """ Data encryption according to this cipher interface. """ @abstractmethod def decrypt(self, C: BufferType) -> BufferType: ... """ Data decryption according to this cipher interface. """
Ancestors
- abc.ABC
Subclasses
Class variables
var key_size
-
A container containing all valid key sizes for this cipher.
var block_size
-
The block size of this cipher.
Methods
def encrypt(self, M)
-
Expand source code Browse git
@abstractmethod def encrypt(self, M: BufferType) -> BufferType: ...
def decrypt(self, C)
-
Expand source code Browse git
@abstractmethod def decrypt(self, C: BufferType) -> BufferType: ...
class CipherObjectFactory
-
An abstract class to build
CipherInterface
s from an asortment of cryptographic secrets and parameters.Expand source code Browse git
class CipherObjectFactory(ABC): """ An abstract class to build `refinery.lib.crypto.CipherInterface`s from an asortment of cryptographic secrets and parameters. """ name: str key_size: Optional[Container[int]] = None block_size: Optional[int] = None @abstractmethod def new( self, key: BufferType, iv: Optional[BufferType] = None, counter: Optional[int] = None, initial_value: Optional[int] = 0, nonce: Optional[BufferType] = None, mode: Optional[str] = None, segment_size: Optional[int] = None, block_size: Optional[int] = None, **cipher_args ) -> CipherInterface: """ Build the actual `refinery.lib.crypto.CipherInterface` from the given input parameters. This mimics the PyCrypto interface for new ciphers in order to make the refinery factory cross-compatible with that library. """ ...
Ancestors
- abc.ABC
Subclasses
Class variables
var name
var key_size
var block_size
Methods
def new(self, key, iv=None, counter=None, initial_value=0, nonce=None, mode=None, segment_size=None, block_size=None, **cipher_args)
-
Build the actual
CipherInterface
from the given input parameters. This mimics the PyCrypto interface for new ciphers in order to make the refinery factory cross-compatible with that library.Expand source code Browse git
@abstractmethod def new( self, key: BufferType, iv: Optional[BufferType] = None, counter: Optional[int] = None, initial_value: Optional[int] = 0, nonce: Optional[BufferType] = None, mode: Optional[str] = None, segment_size: Optional[int] = None, block_size: Optional[int] = None, **cipher_args ) -> CipherInterface: """ Build the actual `refinery.lib.crypto.CipherInterface` from the given input parameters. This mimics the PyCrypto interface for new ciphers in order to make the refinery factory cross-compatible with that library. """ ...
class PyCryptoFactoryWrapper (module)
-
Wraps a PyCrypto module as a
CipherObjectFactory
.Expand source code Browse git
class PyCryptoFactoryWrapper(CipherObjectFactory): """ Wraps a PyCrypto module as a `refinery.lib.crypto.CipherObjectFactory`. """ def __init__(self, module): self.module = module def new(self, *a, **k) -> CipherInterface: return self.module.new(*a, **k) @property def key_size(self): try: value = self.module.key_size except AttributeError: return None if isinstance(value, int): return {value} return value @property def block_size(self): try: value = self.module.block_size except AttributeError: return None return value def __repr__(self): return repr(self.module) def __dir__(self): return dir(self.module) def __getattr__(self, key): return getattr(self.module, key)
Ancestors
- CipherObjectFactory
- abc.ABC
Class variables
var name
Instance variables
var key_size
-
Expand source code Browse git
@property def key_size(self): try: value = self.module.key_size except AttributeError: return None if isinstance(value, int): return {value} return value
var block_size
-
Expand source code Browse git
@property def block_size(self): try: value = self.module.block_size except AttributeError: return None return value
Inherited members
class BlockCipherFactory (cipher)
-
A
CipherObjectFactory
for custom block ciphers.Expand source code Browse git
class BlockCipherFactory(CipherObjectFactory): """ A `refinery.lib.crypto.CipherObjectFactory` for custom block ciphers. """ cipher: Type[BlockCipher] def __init__(self, cipher: Type[BlockCipher]): self.cipher = cipher self._modes = [] for name, mode in CIPHER_MODES.items(): setattr(self, F'MODE_{name}', mode._identifier) self._modes.append(mode) def new(self, key, mode=None, **args) -> CipherInterface: if mode is not None: mode = self._modes[mode] mode_arguments = {} cipher = self.cipher for arg in ('iv', 'counter', 'initial_value', 'nonce', 'mode', 'segment_size'): try: mode_arguments[arg] = args.pop(arg) except KeyError: pass if mode is CTR: block_size = self.block_size if block_size is None: # This happens for ciphers that do not have a fixed block size, i.e. the # block size is truly an instance attribute and not a class property. # In this case, we create a temporary cipher object and use it to obtain # the true block size. block_size = cipher(key, ECB, **args).block_size mode_arguments.update(block_size=block_size) mode = mode(**mode_arguments) return cipher(key, mode, **args) @property def name(self): return self.cipher.__name__ @property def key_size(self): try: value = self.cipher.key_size except AttributeError: return None if isinstance(value, property): return None return value @property def block_size(self): try: value = self.cipher.block_size except AttributeError: return None if not isinstance(value, int): return None return value
Ancestors
- CipherObjectFactory
- abc.ABC
Class variables
var cipher
Instance variables
var name
-
Expand source code Browse git
@property def name(self): return self.cipher.__name__
var key_size
-
Expand source code Browse git
@property def key_size(self): try: value = self.cipher.key_size except AttributeError: return None if isinstance(value, property): return None return value
var block_size
-
Expand source code Browse git
@property def block_size(self): try: value = self.cipher.block_size except AttributeError: return None if not isinstance(value, int): return None return value
Inherited members
class BlockCipher (key, mode)
-
Abstract base class for refinery's block cipher interface.
Expand source code Browse git
class BlockCipher(CipherInterface, ABC): block_size: int key: BufferType mode: CipherMode key_size: Container[int] def __init__(self, key: BufferType, mode: Optional[CipherMode]): if len(key) not in self.key_size: raise ValueError(F'The key size {len(key)} is not supported by {self.__class__.__name__.lower()}.') self.key = key self.mode = mode or ECB() @abstractmethod def block_encrypt(self, data: BufferType) -> BufferType: """ Encryption of a single block of data. """ raise NotImplementedError @abstractmethod def block_decrypt(self, data: BufferType) -> BufferType: """ Decryption of a single block of data. """ raise NotImplementedError def _apply_blockwise(self, operation: Operation, data: BufferType) -> BufferType: block_size = self.block_size mode = self.mode if len(data) % block_size != 0 and mode.aligned: raise DataUnaligned dst = src = memoryview(data) if dst.readonly: dst = bytearray(src) return mode.apply( operation, dst, src, self.block_encrypt, self.block_decrypt, block_size ) def encrypt(self, data: BufferType) -> BufferType: """ Encrypt the input data. """ return self._apply_blockwise(Operation.Encrypt, data) def decrypt(self, data: BufferType) -> BufferType: """ Decrypt the input data. """ return self._apply_blockwise(Operation.Decrypt, data)
Ancestors
- CipherInterface
- abc.ABC
Subclasses
Class variables
var key
var mode
Methods
def block_encrypt(self, data)
-
Encryption of a single block of data.
Expand source code Browse git
@abstractmethod def block_encrypt(self, data: BufferType) -> BufferType: """ Encryption of a single block of data. """ raise NotImplementedError
def block_decrypt(self, data)
-
Decryption of a single block of data.
Expand source code Browse git
@abstractmethod def block_decrypt(self, data: BufferType) -> BufferType: """ Decryption of a single block of data. """ raise NotImplementedError
def encrypt(self, data)
-
Encrypt the input data.
Expand source code Browse git
def encrypt(self, data: BufferType) -> BufferType: """ Encrypt the input data. """ return self._apply_blockwise(Operation.Encrypt, data)
def decrypt(self, data)
-
Decrypt the input data.
Expand source code Browse git
def decrypt(self, data: BufferType) -> BufferType: """ Decrypt the input data. """ return self._apply_blockwise(Operation.Decrypt, data)
Inherited members