Module refinery.lib.crypto

Primitives used in custom cryptographic implementations.

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Primitives used in custom cryptographic implementations.
"""
from __future__ import annotations

from typing import Callable, ClassVar, Container, Generator, Optional, Type, Union, Dict
from abc import ABC, abstractmethod
from enum import Enum

BufferType = Union[bytearray, bytes, memoryview]
CIPHER_MODES: Dict[str, CipherMode] = {}


def strxor(a: bytes, b: bytes):
    """
    Return the XOR of the two byte strings `a` and `b`. The shorter of the two strings defines the
    length of the output.
    """
    return bytes(a ^ b for a, b in zip(a, b))


def _register_cipher_mode(cls: Type[CipherMode]):
    cls._identifier = len(CIPHER_MODES)
    CIPHER_MODES[cls.__name__] = cls
    return cls


def rotl128(x: int, c: int):
    """
    Rotate the 128-bit integer `x` by `c` positions to the left.
    """
    return ((x << c) | (x >> (0x80 - c))) & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF


def rotl64(x: int, c: int):
    """
    Rotate the 64-bit integer `x` by `c` positions to the left.
    """
    return ((x << c) | (x >> (0x40 - c))) & 0xFFFFFFFFFFFFFFFF


def rotl32(x: int, c: int):
    """
    Rotate the 32-bit integer `x` by `c` positions to the left.
    """
    return ((x << c) | (x >> (0x20 - c))) & 0xFFFFFFFF


def rotl16(x: int, c: int):
    """
    Rotate the 16-bit integer `x` by `c` positions to the left.
    """
    return ((x << c) | (x >> (0x10 - c))) & 0xFFFF


def rotl8(x: int, c: int):
    """
    Rotate the byte `x` by `c` positions to the left.
    """
    return ((x << c) | (x >> (0x08 - c))) & 0xFF


def rotr128(x: int, c: int):
    """
    Rotate the 128-bit integer `x` by `c` positions to the right.
    """
    return (x << (0x80 - c) & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF) | (x >> c)


def rotr64(x: int, c: int):
    """
    Rotate the 64-bit integer `x` by `c` positions to the right.
    """
    return (x << (0x40 - c) & 0xFFFFFFFFFFFFFFFF) | (x >> c)


def rotr32(x: int, c: int):
    """
    Rotate the 32-bit integer `x` by `c` positions to the right.
    """
    return (x << (0x20 - c) & 0xFFFFFFFF) | (x >> c)


def rotr16(x: int, c: int):
    """
    Rotate the 16-bit integer `x` by `c` positions to the right.
    """
    return (x << (0x10 - c) & 0xFFFF) | (x >> c)


def rotr8(x: int, c: int):
    """
    Rotate the byte `x` by `c` positions to the right.
    """
    return (x << (0x08 - c) & 0xFF) | (x >> c)


def rotr(n: int, x: int, c: int) -> int:
    """
    Rotate the `n`-bit integer `x` by `c` positions to the right. If `n` is among the common bit
    sizes 8, 16, 32, 64, or 128, then one of the more specific functions in this module should be
    used instead.
    """
    mask = (1 << n) - 1
    c %= n
    return (x >> c) | (x << (n - c) & mask)


def rotl(n: int, x: int, c: int) -> int:
    """
    Rotate the `n`-bit integer `x` by `c` positions to the left. If `n` is among the common bit
    sizes 8, 16, 32, 64, or 128, then one of the more specific functions in this module should be
    used instead.
    """
    mask = (1 << n) - 1
    c %= n
    return (x >> (n - c)) | (x << c & mask)


class Operation(str, Enum):
    """
    Specifies whether data is currently being encrypted or decrypted.
    """
    Encrypt = 'encrypt'
    Decrypt = 'decrypt'


class CipherMode(ABC):
    """
    Abstract base class for a cipher mode of operation.
    """

    encrypt_block: Callable[[memoryview], memoryview]
    decrypt_block: Callable[[memoryview], memoryview]
    aligned: bool = True
    _identifier: ClassVar[int]

    @abstractmethod
    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        """
        Implements data encryption according to the current cipher mode and underlying cipher.
        """
        raise NotImplementedError

    @abstractmethod
    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        """
        Implements data decryption according to the current cipher mode and underlying cipher.
        """
        raise NotImplementedError

    def apply(
        self,
        operation: Operation,
        dst: memoryview,
        src: memoryview,
        encrypt_block: Callable[[memoryview], memoryview],
        decrypt_block: Callable[[memoryview], memoryview],
        blocksize: int,
    ) -> memoryview:
        """
        This method is used to perform a cryptographic `refinery.lib.crypto.Operation` to a given
        source `src` and write the result to the memory at `dst` according to the current cipher
        mode. To this end, it requires the block encryption and decryption primitives of the
        underlying cipher and the current block size.
        """
        self.encrypt_block = encrypt_block
        self.decrypt_block = decrypt_block
        engine: Generator[memoryview, memoryview, None] = {
            Operation.Encrypt: self.encrypt,
            Operation.Decrypt: self.decrypt,
        }[operation]()
        next(engine)
        top, rest = divmod(len(src), blocksize)
        top *= blocksize
        for k in range(0, top, blocksize):
            dst[k:k + blocksize] = engine.send(src[k:k + blocksize])
        if rest:
            dst[-rest:] = engine.send(src[-rest:])[:rest]
        engine.close()
        return dst


@_register_cipher_mode
class ECB(CipherMode):
    """
    The Electronic Codebook (ECB) is the most simple cipher mode of operation. The underlying
    cipher is applied block-wise with no additional safeguards.
    """

    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        M = None
        D = self.decrypt_block
        while True:
            C = yield M
            M = D(C)

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        C = None
        E = self.encrypt_block
        while True:
            M = yield C
            C = E(M)


class DataUnaligned(ValueError):
    """
    Raised when input data is unexpectedly unaligned to the current block size.
    """
    def __init__(self) -> None:
        super().__init__('Data not aligned to block size.')


class StatefulCipherMode(CipherMode):
    """
    A subclass of `refinery.lib.crypto.CipherMode` that holds a state while performing any of
    its cryptographic `refinery.lib.crypto.Operation`s.
    """

    iv: BufferType
    """
    The initial vector for the internal state of the cipher mode.
    """

    def __init__(self, iv: BufferType):
        self.iv = iv


@_register_cipher_mode
class CBC(StatefulCipherMode):
    """
    An implementation of the popular Cipher Block Chaining mode of operation.
    """

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        C = self.iv
        E = self.encrypt_block
        while True:
            M = yield C
            C = E(strxor(M, C))

    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        S = self.iv
        M = None
        D = self.decrypt_block
        while True:
            C = yield M
            M = strxor(D(C), S)
            S = bytes(C)


@_register_cipher_mode
class PCBC(StatefulCipherMode):
    """
    An implementation of Propagating Cipher Block Chaining.
    """

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        S = self.iv
        C = None
        E = self.encrypt_block
        while True:
            M = yield C
            C = E(strxor(M, S))
            S = strxor(C, M)

    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        S = self.iv
        M = None
        D = self.decrypt_block
        while True:
            C = yield M
            M = strxor(S, D(C))
            S = strxor(M, C)


@_register_cipher_mode
class CFB(CipherMode):
    """
    Cipher Feedback Mode: https://csrc.nist.gov/publications/detail/sp/800-38a/final
    """

    iv: BufferType
    segment_size: int
    aligned = False

    def __init__(self, iv: BufferType, segment_size: Optional[int] = None):
        if segment_size is None:
            segment_size = 8
        if segment_size % 8 != 0:
            raise NotImplementedError('segment sizes may only be multiples of 8')
        segment_size = segment_size // 8
        if len(iv) % segment_size != 0:
            raise NotImplementedError(
                F'the block size {len(iv) * 8} is not an even multiple of the segment '
                F'size {segment_size * 8}; this is currently not supported.')
        self.segment_size = segment_size
        self.iv = iv

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        s = self.segment_size
        S = bytearray(self.iv)
        E = self.encrypt_block
        C = bytearray(len(self.iv))
        if s == 1:
            while True:
                M = yield C
                for k, m in enumerate(M):
                    C[k] = c = m ^ E(S)[0]
                    S[:-1], S[-1] = memoryview(S)[1:], c
        else:
            segments = [slice(i, i + s) for i in range(0, len(S), s)]
            while True:
                M = yield C
                for k in segments:
                    m = M[k]
                    C[k] = c = strxor(m, E(S)[:s])
                    S[:-s], S[-s:] = memoryview(S)[s:], c

    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        s = self.segment_size
        S = bytearray(self.iv)
        E = self.encrypt_block
        M = bytearray(len(self.iv))
        if s == 1:
            while True:
                C = yield M
                for k, c in enumerate(C):
                    M[k] = c ^ E(S)[0]
                    S[:-1], S[-1] = memoryview(S)[1:], c
        else:
            segments = [slice(i, i + s) for i in range(0, len(S), s)]
            while True:
                C = yield M
                for k in segments:
                    c = C[k]
                    M[k] = strxor(c, E(S)[:s])
                    S[:-s], S[-s:] = memoryview(S)[s:], c


@_register_cipher_mode
class OFB(StatefulCipherMode):
    """
    An implementation of Output Feedback Mode.
    """

    aligned = False

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        S = self.iv
        C = None
        E = self.encrypt_block
        while True:
            M = yield C
            S = E(S)
            C = strxor(M, S)

    decrypt = encrypt


@_register_cipher_mode
class CTR(CipherMode):
    """
    An implementation of Counter mode.
    """

    counter_len: int
    prefix: BufferType
    suffix: BufferType
    initial_value: int
    little_endian: bool
    block_size: int

    aligned = False

    @property
    def byte_order(self):
        return 'little' if self.little_endian else 'big'

    def __init__(
        self,
        block_size: Optional[int] = None,
        counter: Optional[Dict] = None,
        nonce: Optional[BufferType] = None,
        initial_value: Optional[int] = 0,
        little_endian: bool = False
    ):
        if counter is not None:
            self.initial_value = counter.get('initial_value', initial_value)
            self.little_endian = counter.get('little_endian', little_endian)
            self.prefix = counter['prefix']
            self.suffix = counter['suffix']
            self.counter_len = counter['counter_len']
            self.block_size = self.counter_len + len(self.prefix) + len(self.suffix)
            if block_size not in {None, self.block_size}:
                raise ValueError('Information in counter object does not align with block size.')
            return
        if block_size is None:
            raise ValueError('Unable to construct CTR mode object without block_size or counter argument.')

        self.initial_value = initial_value
        self.little_endian = little_endian
        self.suffix = B''
        self.block_size = block_size

        if nonce is not None:
            if len(nonce) > block_size:
                raise ValueError('Nonce length exceeds block length.')
            self.counter_len = block_size - len(nonce)
            self.prefix = nonce
        else:
            self.counter_len = block_size // 2
            self.prefix = B'\0' * (block_size - self.counter_len)

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        S = bytearray(self.block_size)
        J = slice(len(self.prefix), self.block_size - len(self.suffix))
        K = self.initial_value
        if self.prefix:
            S[:+len(self.prefix)] = self.prefix
        if self.suffix:
            S[-len(self.suffix):] = self.suffix
        C = None
        E = self.encrypt_block
        order = self.byte_order
        csize = self.counter_len
        mask = (1 << (csize * 8)) - 1
        while True:
            M = yield C
            S[J] = K.to_bytes(csize, order)
            K = K + 1 & mask
            C = strxor(E(S), M)

    decrypt = encrypt


class CipherInterface(ABC):
    """
    Abstract base class for refinery's block cipher interface.
    """

    key_size: Container[int]
    """
    A container containing all valid key sizes for this cipher.
    """

    block_size: int
    """
    The block size of this cipher.
    """

    @abstractmethod
    def encrypt(self, M: BufferType) -> BufferType: ...
    """
    Data encryption according to this cipher interface.
    """

    @abstractmethod
    def decrypt(self, C: BufferType) -> BufferType: ...
    """
    Data decryption according to this cipher interface.
    """


class CipherObjectFactory(ABC):
    """
    An abstract class to build `refinery.lib.crypto.CipherInterface`s from an asortment of
    cryptographic secrets and parameters.
    """

    name: str
    key_size: Optional[Container[int]] = None
    block_size: Optional[int] = None

    @abstractmethod
    def new(
        self,
        key: BufferType,
        iv: Optional[BufferType] = None,
        counter: Optional[int] = None,
        initial_value: Optional[int] = 0,
        nonce: Optional[BufferType] = None,
        mode: Optional[str] = None,
        segment_size: Optional[int] = None,
        block_size: Optional[int] = None,
        **cipher_args
    ) -> CipherInterface:
        """
        Build the actual `refinery.lib.crypto.CipherInterface` from the given input parameters.
        This mimics the PyCrypto interface for new ciphers in order to make the refinery factory
        cross-compatible with that library.
        """
        ...


class PyCryptoFactoryWrapper(CipherObjectFactory):
    """
    Wraps a PyCrypto module as a `refinery.lib.crypto.CipherObjectFactory`.
    """

    def __init__(self, module):
        self.module = module

    def new(self, *a, **k) -> CipherInterface:
        return self.module.new(*a, **k)

    @property
    def key_size(self):
        try:
            value = self.module.key_size
        except AttributeError:
            return None
        if isinstance(value, int):
            return {value}
        return value

    @property
    def block_size(self):
        try:
            value = self.module.block_size
        except AttributeError:
            return None
        return value

    def __repr__(self):
        return repr(self.module)

    def __dir__(self):
        return dir(self.module)

    def __getattr__(self, key):
        return getattr(self.module, key)


class BlockCipherFactory(CipherObjectFactory):
    """
    A `refinery.lib.crypto.CipherObjectFactory` for custom block ciphers.
    """

    cipher: Type[BlockCipher]

    def __init__(self, cipher: Type[BlockCipher]):
        self.cipher = cipher
        self._modes = []
        for name, mode in CIPHER_MODES.items():
            setattr(self, F'MODE_{name}', mode._identifier)
            self._modes.append(mode)

    def new(self, key, mode=None, **args) -> CipherInterface:
        if mode is not None:
            mode = self._modes[mode]
        mode_arguments = {}
        cipher = self.cipher
        for arg in ('iv', 'counter', 'initial_value', 'nonce', 'mode', 'segment_size'):
            try:
                mode_arguments[arg] = args.pop(arg)
            except KeyError:
                pass
        if mode is CTR:
            block_size = self.block_size
            if block_size is None:
                # This happens for ciphers that do not have a fixed block size, i.e. the
                # block size is truly an instance attribute and not a class property.
                # In this case, we create a temporary cipher object and use it to obtain
                # the true block size.
                block_size = cipher(key, ECB, **args).block_size
            mode_arguments.update(block_size=block_size)
        mode = mode(**mode_arguments)
        return cipher(key, mode, **args)

    @property
    def name(self):
        return self.cipher.__name__

    @property
    def key_size(self):
        try:
            value = self.cipher.key_size
        except AttributeError:
            return None
        if isinstance(value, property):
            return None
        return value

    @property
    def block_size(self):
        try:
            value = self.cipher.block_size
        except AttributeError:
            return None
        if not isinstance(value, int):
            return None
        return value


class BlockCipher(CipherInterface, ABC):
    block_size: int
    key: BufferType
    mode: CipherMode
    key_size: Container[int]

    def __init__(self, key: BufferType, mode: Optional[CipherMode]):
        if len(key) not in self.key_size:
            raise ValueError(F'The key size {len(key)} is not supported by {self.__class__.__name__.lower()}.')
        self.key = key
        self.mode = mode or ECB()

    @abstractmethod
    def block_encrypt(self, data: BufferType) -> BufferType:
        """
        Encryption of a single block of data.
        """
        raise NotImplementedError

    @abstractmethod
    def block_decrypt(self, data: BufferType) -> BufferType:
        """
        Decryption of a single block of data.
        """
        raise NotImplementedError

    def _apply_blockwise(self, operation: Operation, data: BufferType) -> BufferType:
        block_size = self.block_size
        mode = self.mode
        if len(data) % block_size != 0 and mode.aligned:
            raise DataUnaligned
        dst = src = memoryview(data)
        if dst.readonly:
            dst = bytearray(src)
        return mode.apply(
            operation,
            dst,
            src,
            self.block_encrypt,
            self.block_decrypt,
            block_size
        )

    def encrypt(self, data: BufferType) -> BufferType:
        """
        Encrypt the input data.
        """
        return self._apply_blockwise(Operation.Encrypt, data)

    def decrypt(self, data: BufferType) -> BufferType:
        """
        Decrypt the input data.
        """
        return self._apply_blockwise(Operation.Decrypt, data)

Functions

def strxor(a, b)

Return the XOR of the two byte strings a and b. The shorter of the two strings defines the length of the output.

Expand source code Browse git
def strxor(a: bytes, b: bytes):
    """
    Return the XOR of the two byte strings `a` and `b`. The shorter of the two strings defines the
    length of the output.
    """
    return bytes(a ^ b for a, b in zip(a, b))
def rotl128(x, c)

Rotate the 128-bit integer x by c positions to the left.

Expand source code Browse git
def rotl128(x: int, c: int):
    """
    Rotate the 128-bit integer `x` by `c` positions to the left.
    """
    return ((x << c) | (x >> (0x80 - c))) & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
def rotl64(x, c)

Rotate the 64-bit integer x by c positions to the left.

Expand source code Browse git
def rotl64(x: int, c: int):
    """
    Rotate the 64-bit integer `x` by `c` positions to the left.
    """
    return ((x << c) | (x >> (0x40 - c))) & 0xFFFFFFFFFFFFFFFF
def rotl32(x, c)

Rotate the 32-bit integer x by c positions to the left.

Expand source code Browse git
def rotl32(x: int, c: int):
    """
    Rotate the 32-bit integer `x` by `c` positions to the left.
    """
    return ((x << c) | (x >> (0x20 - c))) & 0xFFFFFFFF
def rotl16(x, c)

Rotate the 16-bit integer x by c positions to the left.

Expand source code Browse git
def rotl16(x: int, c: int):
    """
    Rotate the 16-bit integer `x` by `c` positions to the left.
    """
    return ((x << c) | (x >> (0x10 - c))) & 0xFFFF
def rotl8(x, c)

Rotate the byte x by c positions to the left.

Expand source code Browse git
def rotl8(x: int, c: int):
    """
    Rotate the byte `x` by `c` positions to the left.
    """
    return ((x << c) | (x >> (0x08 - c))) & 0xFF
def rotr128(x, c)

Rotate the 128-bit integer x by c positions to the right.

Expand source code Browse git
def rotr128(x: int, c: int):
    """
    Rotate the 128-bit integer `x` by `c` positions to the right.
    """
    return (x << (0x80 - c) & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF) | (x >> c)
def rotr64(x, c)

Rotate the 64-bit integer x by c positions to the right.

Expand source code Browse git
def rotr64(x: int, c: int):
    """
    Rotate the 64-bit integer `x` by `c` positions to the right.
    """
    return (x << (0x40 - c) & 0xFFFFFFFFFFFFFFFF) | (x >> c)
def rotr32(x, c)

Rotate the 32-bit integer x by c positions to the right.

Expand source code Browse git
def rotr32(x: int, c: int):
    """
    Rotate the 32-bit integer `x` by `c` positions to the right.
    """
    return (x << (0x20 - c) & 0xFFFFFFFF) | (x >> c)
def rotr16(x, c)

Rotate the 16-bit integer x by c positions to the right.

Expand source code Browse git
def rotr16(x: int, c: int):
    """
    Rotate the 16-bit integer `x` by `c` positions to the right.
    """
    return (x << (0x10 - c) & 0xFFFF) | (x >> c)
def rotr8(x, c)

Rotate the byte x by c positions to the right.

Expand source code Browse git
def rotr8(x: int, c: int):
    """
    Rotate the byte `x` by `c` positions to the right.
    """
    return (x << (0x08 - c) & 0xFF) | (x >> c)
def rotr(n, x, c)

Rotate the n-bit integer x by c positions to the right. If n is among the common bit sizes 8, 16, 32, 64, or 128, then one of the more specific functions in this module should be used instead.

Expand source code Browse git
def rotr(n: int, x: int, c: int) -> int:
    """
    Rotate the `n`-bit integer `x` by `c` positions to the right. If `n` is among the common bit
    sizes 8, 16, 32, 64, or 128, then one of the more specific functions in this module should be
    used instead.
    """
    mask = (1 << n) - 1
    c %= n
    return (x >> c) | (x << (n - c) & mask)
def rotl(n, x, c)

Rotate the n-bit integer x by c positions to the left. If n is among the common bit sizes 8, 16, 32, 64, or 128, then one of the more specific functions in this module should be used instead.

Expand source code Browse git
def rotl(n: int, x: int, c: int) -> int:
    """
    Rotate the `n`-bit integer `x` by `c` positions to the left. If `n` is among the common bit
    sizes 8, 16, 32, 64, or 128, then one of the more specific functions in this module should be
    used instead.
    """
    mask = (1 << n) - 1
    c %= n
    return (x >> (n - c)) | (x << c & mask)

Classes

class Operation (value, names=None, *, module=None, qualname=None, type=None, start=1)

Specifies whether data is currently being encrypted or decrypted.

Expand source code Browse git
class Operation(str, Enum):
    """
    Specifies whether data is currently being encrypted or decrypted.
    """
    Encrypt = 'encrypt'
    Decrypt = 'decrypt'

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var Encrypt
var Decrypt
class CipherMode

Abstract base class for a cipher mode of operation.

Expand source code Browse git
class CipherMode(ABC):
    """
    Abstract base class for a cipher mode of operation.
    """

    encrypt_block: Callable[[memoryview], memoryview]
    decrypt_block: Callable[[memoryview], memoryview]
    aligned: bool = True
    _identifier: ClassVar[int]

    @abstractmethod
    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        """
        Implements data encryption according to the current cipher mode and underlying cipher.
        """
        raise NotImplementedError

    @abstractmethod
    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        """
        Implements data decryption according to the current cipher mode and underlying cipher.
        """
        raise NotImplementedError

    def apply(
        self,
        operation: Operation,
        dst: memoryview,
        src: memoryview,
        encrypt_block: Callable[[memoryview], memoryview],
        decrypt_block: Callable[[memoryview], memoryview],
        blocksize: int,
    ) -> memoryview:
        """
        This method is used to perform a cryptographic `refinery.lib.crypto.Operation` to a given
        source `src` and write the result to the memory at `dst` according to the current cipher
        mode. To this end, it requires the block encryption and decryption primitives of the
        underlying cipher and the current block size.
        """
        self.encrypt_block = encrypt_block
        self.decrypt_block = decrypt_block
        engine: Generator[memoryview, memoryview, None] = {
            Operation.Encrypt: self.encrypt,
            Operation.Decrypt: self.decrypt,
        }[operation]()
        next(engine)
        top, rest = divmod(len(src), blocksize)
        top *= blocksize
        for k in range(0, top, blocksize):
            dst[k:k + blocksize] = engine.send(src[k:k + blocksize])
        if rest:
            dst[-rest:] = engine.send(src[-rest:])[:rest]
        engine.close()
        return dst

Ancestors

  • abc.ABC

Subclasses

Class variables

var encrypt_block
var decrypt_block
var aligned

Methods

def encrypt(self)

Implements data encryption according to the current cipher mode and underlying cipher.

Expand source code Browse git
@abstractmethod
def encrypt(self) -> Generator[memoryview, memoryview, None]:
    """
    Implements data encryption according to the current cipher mode and underlying cipher.
    """
    raise NotImplementedError
def decrypt(self)

Implements data decryption according to the current cipher mode and underlying cipher.

Expand source code Browse git
@abstractmethod
def decrypt(self) -> Generator[memoryview, memoryview, None]:
    """
    Implements data decryption according to the current cipher mode and underlying cipher.
    """
    raise NotImplementedError
def apply(self, operation, dst, src, encrypt_block, decrypt_block, blocksize)

This method is used to perform a cryptographic Operation to a given source src and write the result to the memory at dst according to the current cipher mode. To this end, it requires the block encryption and decryption primitives of the underlying cipher and the current block size.

Expand source code Browse git
def apply(
    self,
    operation: Operation,
    dst: memoryview,
    src: memoryview,
    encrypt_block: Callable[[memoryview], memoryview],
    decrypt_block: Callable[[memoryview], memoryview],
    blocksize: int,
) -> memoryview:
    """
    This method is used to perform a cryptographic `refinery.lib.crypto.Operation` to a given
    source `src` and write the result to the memory at `dst` according to the current cipher
    mode. To this end, it requires the block encryption and decryption primitives of the
    underlying cipher and the current block size.
    """
    self.encrypt_block = encrypt_block
    self.decrypt_block = decrypt_block
    engine: Generator[memoryview, memoryview, None] = {
        Operation.Encrypt: self.encrypt,
        Operation.Decrypt: self.decrypt,
    }[operation]()
    next(engine)
    top, rest = divmod(len(src), blocksize)
    top *= blocksize
    for k in range(0, top, blocksize):
        dst[k:k + blocksize] = engine.send(src[k:k + blocksize])
    if rest:
        dst[-rest:] = engine.send(src[-rest:])[:rest]
    engine.close()
    return dst
class ECB

The Electronic Codebook (ECB) is the most simple cipher mode of operation. The underlying cipher is applied block-wise with no additional safeguards.

Expand source code Browse git
class ECB(CipherMode):
    """
    The Electronic Codebook (ECB) is the most simple cipher mode of operation. The underlying
    cipher is applied block-wise with no additional safeguards.
    """

    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        M = None
        D = self.decrypt_block
        while True:
            C = yield M
            M = D(C)

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        C = None
        E = self.encrypt_block
        while True:
            M = yield C
            C = E(M)

Ancestors

Class variables

var encrypt_block
var decrypt_block
var aligned

Inherited members

class DataUnaligned

Raised when input data is unexpectedly unaligned to the current block size.

Expand source code Browse git
class DataUnaligned(ValueError):
    """
    Raised when input data is unexpectedly unaligned to the current block size.
    """
    def __init__(self) -> None:
        super().__init__('Data not aligned to block size.')

Ancestors

  • builtins.ValueError
  • builtins.Exception
  • builtins.BaseException
class StatefulCipherMode (iv)

A subclass of CipherMode that holds a state while performing any of its cryptographic Operations.

Expand source code Browse git
class StatefulCipherMode(CipherMode):
    """
    A subclass of `refinery.lib.crypto.CipherMode` that holds a state while performing any of
    its cryptographic `refinery.lib.crypto.Operation`s.
    """

    iv: BufferType
    """
    The initial vector for the internal state of the cipher mode.
    """

    def __init__(self, iv: BufferType):
        self.iv = iv

Ancestors

Subclasses

Class variables

var iv

The initial vector for the internal state of the cipher mode.

Inherited members

class CBC (iv)

An implementation of the popular Cipher Block Chaining mode of operation.

Expand source code Browse git
class CBC(StatefulCipherMode):
    """
    An implementation of the popular Cipher Block Chaining mode of operation.
    """

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        C = self.iv
        E = self.encrypt_block
        while True:
            M = yield C
            C = E(strxor(M, C))

    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        S = self.iv
        M = None
        D = self.decrypt_block
        while True:
            C = yield M
            M = strxor(D(C), S)
            S = bytes(C)

Ancestors

Inherited members

class PCBC (iv)

An implementation of Propagating Cipher Block Chaining.

Expand source code Browse git
class PCBC(StatefulCipherMode):
    """
    An implementation of Propagating Cipher Block Chaining.
    """

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        S = self.iv
        C = None
        E = self.encrypt_block
        while True:
            M = yield C
            C = E(strxor(M, S))
            S = strxor(C, M)

    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        S = self.iv
        M = None
        D = self.decrypt_block
        while True:
            C = yield M
            M = strxor(S, D(C))
            S = strxor(M, C)

Ancestors

Inherited members

class CFB (iv, segment_size=None)

Cipher Feedback Mode: https://csrc.nist.gov/publications/detail/sp/800-38a/final

Expand source code Browse git
class CFB(CipherMode):
    """
    Cipher Feedback Mode: https://csrc.nist.gov/publications/detail/sp/800-38a/final
    """

    iv: BufferType
    segment_size: int
    aligned = False

    def __init__(self, iv: BufferType, segment_size: Optional[int] = None):
        if segment_size is None:
            segment_size = 8
        if segment_size % 8 != 0:
            raise NotImplementedError('segment sizes may only be multiples of 8')
        segment_size = segment_size // 8
        if len(iv) % segment_size != 0:
            raise NotImplementedError(
                F'the block size {len(iv) * 8} is not an even multiple of the segment '
                F'size {segment_size * 8}; this is currently not supported.')
        self.segment_size = segment_size
        self.iv = iv

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        s = self.segment_size
        S = bytearray(self.iv)
        E = self.encrypt_block
        C = bytearray(len(self.iv))
        if s == 1:
            while True:
                M = yield C
                for k, m in enumerate(M):
                    C[k] = c = m ^ E(S)[0]
                    S[:-1], S[-1] = memoryview(S)[1:], c
        else:
            segments = [slice(i, i + s) for i in range(0, len(S), s)]
            while True:
                M = yield C
                for k in segments:
                    m = M[k]
                    C[k] = c = strxor(m, E(S)[:s])
                    S[:-s], S[-s:] = memoryview(S)[s:], c

    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        s = self.segment_size
        S = bytearray(self.iv)
        E = self.encrypt_block
        M = bytearray(len(self.iv))
        if s == 1:
            while True:
                C = yield M
                for k, c in enumerate(C):
                    M[k] = c ^ E(S)[0]
                    S[:-1], S[-1] = memoryview(S)[1:], c
        else:
            segments = [slice(i, i + s) for i in range(0, len(S), s)]
            while True:
                C = yield M
                for k in segments:
                    c = C[k]
                    M[k] = strxor(c, E(S)[:s])
                    S[:-s], S[-s:] = memoryview(S)[s:], c

Ancestors

Class variables

var iv
var segment_size
var aligned

Inherited members

class OFB (iv)

An implementation of Output Feedback Mode.

Expand source code Browse git
class OFB(StatefulCipherMode):
    """
    An implementation of Output Feedback Mode.
    """

    aligned = False

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        S = self.iv
        C = None
        E = self.encrypt_block
        while True:
            M = yield C
            S = E(S)
            C = strxor(M, S)

    decrypt = encrypt

Ancestors

Class variables

var aligned

Methods

def decrypt(self)

Implements data encryption according to the current cipher mode and underlying cipher.

Expand source code Browse git
def encrypt(self) -> Generator[memoryview, memoryview, None]:
    S = self.iv
    C = None
    E = self.encrypt_block
    while True:
        M = yield C
        S = E(S)
        C = strxor(M, S)

Inherited members

class CTR (block_size=None, counter=None, nonce=None, initial_value=0, little_endian=False)

An implementation of Counter mode.

Expand source code Browse git
class CTR(CipherMode):
    """
    An implementation of Counter mode.
    """

    counter_len: int
    prefix: BufferType
    suffix: BufferType
    initial_value: int
    little_endian: bool
    block_size: int

    aligned = False

    @property
    def byte_order(self):
        return 'little' if self.little_endian else 'big'

    def __init__(
        self,
        block_size: Optional[int] = None,
        counter: Optional[Dict] = None,
        nonce: Optional[BufferType] = None,
        initial_value: Optional[int] = 0,
        little_endian: bool = False
    ):
        if counter is not None:
            self.initial_value = counter.get('initial_value', initial_value)
            self.little_endian = counter.get('little_endian', little_endian)
            self.prefix = counter['prefix']
            self.suffix = counter['suffix']
            self.counter_len = counter['counter_len']
            self.block_size = self.counter_len + len(self.prefix) + len(self.suffix)
            if block_size not in {None, self.block_size}:
                raise ValueError('Information in counter object does not align with block size.')
            return
        if block_size is None:
            raise ValueError('Unable to construct CTR mode object without block_size or counter argument.')

        self.initial_value = initial_value
        self.little_endian = little_endian
        self.suffix = B''
        self.block_size = block_size

        if nonce is not None:
            if len(nonce) > block_size:
                raise ValueError('Nonce length exceeds block length.')
            self.counter_len = block_size - len(nonce)
            self.prefix = nonce
        else:
            self.counter_len = block_size // 2
            self.prefix = B'\0' * (block_size - self.counter_len)

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        S = bytearray(self.block_size)
        J = slice(len(self.prefix), self.block_size - len(self.suffix))
        K = self.initial_value
        if self.prefix:
            S[:+len(self.prefix)] = self.prefix
        if self.suffix:
            S[-len(self.suffix):] = self.suffix
        C = None
        E = self.encrypt_block
        order = self.byte_order
        csize = self.counter_len
        mask = (1 << (csize * 8)) - 1
        while True:
            M = yield C
            S[J] = K.to_bytes(csize, order)
            K = K + 1 & mask
            C = strxor(E(S), M)

    decrypt = encrypt

Ancestors

Class variables

var counter_len
var prefix
var suffix
var initial_value
var little_endian
var block_size
var aligned

Instance variables

var byte_order
Expand source code Browse git
@property
def byte_order(self):
    return 'little' if self.little_endian else 'big'

Methods

def decrypt(self)

Implements data encryption according to the current cipher mode and underlying cipher.

Expand source code Browse git
def encrypt(self) -> Generator[memoryview, memoryview, None]:
    S = bytearray(self.block_size)
    J = slice(len(self.prefix), self.block_size - len(self.suffix))
    K = self.initial_value
    if self.prefix:
        S[:+len(self.prefix)] = self.prefix
    if self.suffix:
        S[-len(self.suffix):] = self.suffix
    C = None
    E = self.encrypt_block
    order = self.byte_order
    csize = self.counter_len
    mask = (1 << (csize * 8)) - 1
    while True:
        M = yield C
        S[J] = K.to_bytes(csize, order)
        K = K + 1 & mask
        C = strxor(E(S), M)

Inherited members

class CipherInterface

Abstract base class for refinery's block cipher interface.

Expand source code Browse git
class CipherInterface(ABC):
    """
    Abstract base class for refinery's block cipher interface.
    """

    key_size: Container[int]
    """
    A container containing all valid key sizes for this cipher.
    """

    block_size: int
    """
    The block size of this cipher.
    """

    @abstractmethod
    def encrypt(self, M: BufferType) -> BufferType: ...
    """
    Data encryption according to this cipher interface.
    """

    @abstractmethod
    def decrypt(self, C: BufferType) -> BufferType: ...
    """
    Data decryption according to this cipher interface.
    """

Ancestors

  • abc.ABC

Subclasses

Class variables

var key_size

A container containing all valid key sizes for this cipher.

var block_size

The block size of this cipher.

Methods

def encrypt(self, M)
Expand source code Browse git
@abstractmethod
def encrypt(self, M: BufferType) -> BufferType: ...
def decrypt(self, C)
Expand source code Browse git
@abstractmethod
def decrypt(self, C: BufferType) -> BufferType: ...
class CipherObjectFactory

An abstract class to build CipherInterfaces from an asortment of cryptographic secrets and parameters.

Expand source code Browse git
class CipherObjectFactory(ABC):
    """
    An abstract class to build `refinery.lib.crypto.CipherInterface`s from an asortment of
    cryptographic secrets and parameters.
    """

    name: str
    key_size: Optional[Container[int]] = None
    block_size: Optional[int] = None

    @abstractmethod
    def new(
        self,
        key: BufferType,
        iv: Optional[BufferType] = None,
        counter: Optional[int] = None,
        initial_value: Optional[int] = 0,
        nonce: Optional[BufferType] = None,
        mode: Optional[str] = None,
        segment_size: Optional[int] = None,
        block_size: Optional[int] = None,
        **cipher_args
    ) -> CipherInterface:
        """
        Build the actual `refinery.lib.crypto.CipherInterface` from the given input parameters.
        This mimics the PyCrypto interface for new ciphers in order to make the refinery factory
        cross-compatible with that library.
        """
        ...

Ancestors

  • abc.ABC

Subclasses

Class variables

var name
var key_size
var block_size

Methods

def new(self, key, iv=None, counter=None, initial_value=0, nonce=None, mode=None, segment_size=None, block_size=None, **cipher_args)

Build the actual CipherInterface from the given input parameters. This mimics the PyCrypto interface for new ciphers in order to make the refinery factory cross-compatible with that library.

Expand source code Browse git
@abstractmethod
def new(
    self,
    key: BufferType,
    iv: Optional[BufferType] = None,
    counter: Optional[int] = None,
    initial_value: Optional[int] = 0,
    nonce: Optional[BufferType] = None,
    mode: Optional[str] = None,
    segment_size: Optional[int] = None,
    block_size: Optional[int] = None,
    **cipher_args
) -> CipherInterface:
    """
    Build the actual `refinery.lib.crypto.CipherInterface` from the given input parameters.
    This mimics the PyCrypto interface for new ciphers in order to make the refinery factory
    cross-compatible with that library.
    """
    ...
class PyCryptoFactoryWrapper (module)

Wraps a PyCrypto module as a CipherObjectFactory.

Expand source code Browse git
class PyCryptoFactoryWrapper(CipherObjectFactory):
    """
    Wraps a PyCrypto module as a `refinery.lib.crypto.CipherObjectFactory`.
    """

    def __init__(self, module):
        self.module = module

    def new(self, *a, **k) -> CipherInterface:
        return self.module.new(*a, **k)

    @property
    def key_size(self):
        try:
            value = self.module.key_size
        except AttributeError:
            return None
        if isinstance(value, int):
            return {value}
        return value

    @property
    def block_size(self):
        try:
            value = self.module.block_size
        except AttributeError:
            return None
        return value

    def __repr__(self):
        return repr(self.module)

    def __dir__(self):
        return dir(self.module)

    def __getattr__(self, key):
        return getattr(self.module, key)

Ancestors

Class variables

var name

Instance variables

var key_size
Expand source code Browse git
@property
def key_size(self):
    try:
        value = self.module.key_size
    except AttributeError:
        return None
    if isinstance(value, int):
        return {value}
    return value
var block_size
Expand source code Browse git
@property
def block_size(self):
    try:
        value = self.module.block_size
    except AttributeError:
        return None
    return value

Inherited members

class BlockCipherFactory (cipher)

A CipherObjectFactory for custom block ciphers.

Expand source code Browse git
class BlockCipherFactory(CipherObjectFactory):
    """
    A `refinery.lib.crypto.CipherObjectFactory` for custom block ciphers.
    """

    cipher: Type[BlockCipher]

    def __init__(self, cipher: Type[BlockCipher]):
        self.cipher = cipher
        self._modes = []
        for name, mode in CIPHER_MODES.items():
            setattr(self, F'MODE_{name}', mode._identifier)
            self._modes.append(mode)

    def new(self, key, mode=None, **args) -> CipherInterface:
        if mode is not None:
            mode = self._modes[mode]
        mode_arguments = {}
        cipher = self.cipher
        for arg in ('iv', 'counter', 'initial_value', 'nonce', 'mode', 'segment_size'):
            try:
                mode_arguments[arg] = args.pop(arg)
            except KeyError:
                pass
        if mode is CTR:
            block_size = self.block_size
            if block_size is None:
                # This happens for ciphers that do not have a fixed block size, i.e. the
                # block size is truly an instance attribute and not a class property.
                # In this case, we create a temporary cipher object and use it to obtain
                # the true block size.
                block_size = cipher(key, ECB, **args).block_size
            mode_arguments.update(block_size=block_size)
        mode = mode(**mode_arguments)
        return cipher(key, mode, **args)

    @property
    def name(self):
        return self.cipher.__name__

    @property
    def key_size(self):
        try:
            value = self.cipher.key_size
        except AttributeError:
            return None
        if isinstance(value, property):
            return None
        return value

    @property
    def block_size(self):
        try:
            value = self.cipher.block_size
        except AttributeError:
            return None
        if not isinstance(value, int):
            return None
        return value

Ancestors

Class variables

var cipher

Instance variables

var name
Expand source code Browse git
@property
def name(self):
    return self.cipher.__name__
var key_size
Expand source code Browse git
@property
def key_size(self):
    try:
        value = self.cipher.key_size
    except AttributeError:
        return None
    if isinstance(value, property):
        return None
    return value
var block_size
Expand source code Browse git
@property
def block_size(self):
    try:
        value = self.cipher.block_size
    except AttributeError:
        return None
    if not isinstance(value, int):
        return None
    return value

Inherited members

class BlockCipher (key, mode)

Abstract base class for refinery's block cipher interface.

Expand source code Browse git
class BlockCipher(CipherInterface, ABC):
    block_size: int
    key: BufferType
    mode: CipherMode
    key_size: Container[int]

    def __init__(self, key: BufferType, mode: Optional[CipherMode]):
        if len(key) not in self.key_size:
            raise ValueError(F'The key size {len(key)} is not supported by {self.__class__.__name__.lower()}.')
        self.key = key
        self.mode = mode or ECB()

    @abstractmethod
    def block_encrypt(self, data: BufferType) -> BufferType:
        """
        Encryption of a single block of data.
        """
        raise NotImplementedError

    @abstractmethod
    def block_decrypt(self, data: BufferType) -> BufferType:
        """
        Decryption of a single block of data.
        """
        raise NotImplementedError

    def _apply_blockwise(self, operation: Operation, data: BufferType) -> BufferType:
        block_size = self.block_size
        mode = self.mode
        if len(data) % block_size != 0 and mode.aligned:
            raise DataUnaligned
        dst = src = memoryview(data)
        if dst.readonly:
            dst = bytearray(src)
        return mode.apply(
            operation,
            dst,
            src,
            self.block_encrypt,
            self.block_decrypt,
            block_size
        )

    def encrypt(self, data: BufferType) -> BufferType:
        """
        Encrypt the input data.
        """
        return self._apply_blockwise(Operation.Encrypt, data)

    def decrypt(self, data: BufferType) -> BufferType:
        """
        Decrypt the input data.
        """
        return self._apply_blockwise(Operation.Decrypt, data)

Ancestors

Subclasses

Class variables

var key
var mode

Methods

def block_encrypt(self, data)

Encryption of a single block of data.

Expand source code Browse git
@abstractmethod
def block_encrypt(self, data: BufferType) -> BufferType:
    """
    Encryption of a single block of data.
    """
    raise NotImplementedError
def block_decrypt(self, data)

Decryption of a single block of data.

Expand source code Browse git
@abstractmethod
def block_decrypt(self, data: BufferType) -> BufferType:
    """
    Decryption of a single block of data.
    """
    raise NotImplementedError
def encrypt(self, data)

Encrypt the input data.

Expand source code Browse git
def encrypt(self, data: BufferType) -> BufferType:
    """
    Encrypt the input data.
    """
    return self._apply_blockwise(Operation.Encrypt, data)
def decrypt(self, data)

Decrypt the input data.

Expand source code Browse git
def decrypt(self, data: BufferType) -> BufferType:
    """
    Decrypt the input data.
    """
    return self._apply_blockwise(Operation.Decrypt, data)

Inherited members