Module refinery.lib.crypto

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations

from typing import Callable, ClassVar, Container, Generator, Optional, Type, Union, Dict
from abc import ABC, abstractmethod
from enum import Enum

BufferType = Union[bytearray, bytes, memoryview]
CIPHER_MODES: Dict[str, CipherMode] = {}


def strxor(a: bytes, b: bytes):
    return bytes(a ^ b for a, b in zip(a, b))


def _register_cipher_mode(cls: Type[CipherMode]):
    cls._identifier = len(CIPHER_MODES)
    CIPHER_MODES[cls.__name__] = cls
    return cls


_DES_PARITYTABLE = bytearray((
    0x01, 0x01, 0x02, 0x02, 0x04, 0x04, 0x07, 0x07,
    0x08, 0x08, 0x0B, 0x0B, 0x0D, 0x0D, 0x0E, 0x0E,
    0x10, 0x10, 0x13, 0x13, 0x15, 0x15, 0x16, 0x16,
    0x19, 0x19, 0x1A, 0x1A, 0x1C, 0x1C, 0x1F, 0x1F,
    0x20, 0x20, 0x23, 0x23, 0x25, 0x25, 0x26, 0x26,
    0x29, 0x29, 0x2A, 0x2A, 0x2C, 0x2C, 0x2F, 0x2F,
    0x31, 0x31, 0x32, 0x32, 0x34, 0x34, 0x37, 0x37,
    0x38, 0x38, 0x3B, 0x3B, 0x3D, 0x3D, 0x3E, 0x3E,
    0x40, 0x40, 0x43, 0x43, 0x45, 0x45, 0x46, 0x46,
    0x49, 0x49, 0x4A, 0x4A, 0x4C, 0x4C, 0x4F, 0x4F,
    0x51, 0x51, 0x52, 0x52, 0x54, 0x54, 0x57, 0x57,
    0x58, 0x58, 0x5B, 0x5B, 0x5D, 0x5D, 0x5E, 0x5E,
    0x61, 0x61, 0x62, 0x62, 0x64, 0x64, 0x67, 0x67,
    0x68, 0x68, 0x6B, 0x6B, 0x6D, 0x6D, 0x6E, 0x6E,
    0x70, 0x70, 0x73, 0x73, 0x75, 0x75, 0x76, 0x76,
    0x79, 0x79, 0x7A, 0x7A, 0x7C, 0x7C, 0x7F, 0x7F,
    0x80, 0x80, 0x83, 0x83, 0x85, 0x85, 0x86, 0x86,
    0x89, 0x89, 0x8A, 0x8A, 0x8C, 0x8C, 0x8F, 0x8F,
    0x91, 0x91, 0x92, 0x92, 0x94, 0x94, 0x97, 0x97,
    0x98, 0x98, 0x9B, 0x9B, 0x9D, 0x9D, 0x9E, 0x9E,
    0xA1, 0xA1, 0xA2, 0xA2, 0xA4, 0xA4, 0xA7, 0xA7,
    0xA8, 0xA8, 0xAB, 0xAB, 0xAD, 0xAD, 0xAE, 0xAE,
    0xB0, 0xB0, 0xB3, 0xB3, 0xB5, 0xB5, 0xB6, 0xB6,
    0xB9, 0xB9, 0xBA, 0xBA, 0xBC, 0xBC, 0xBF, 0xBF,
    0xC1, 0xC1, 0xC2, 0xC2, 0xC4, 0xC4, 0xC7, 0xC7,
    0xC8, 0xC8, 0xCB, 0xCB, 0xCD, 0xCD, 0xCE, 0xCE,
    0xD0, 0xD0, 0xD3, 0xD3, 0xD5, 0xD5, 0xD6, 0xD6,
    0xD9, 0xD9, 0xDA, 0xDA, 0xDC, 0xDC, 0xDF, 0xDF,
    0xE0, 0xE0, 0xE3, 0xE3, 0xE5, 0xE5, 0xE6, 0xE6,
    0xE9, 0xE9, 0xEA, 0xEA, 0xEC, 0xEC, 0xEF, 0xEF,
    0xF1, 0xF1, 0xF2, 0xF2, 0xF4, 0xF4, 0xF7, 0xF7,
    0xF8, 0xF8, 0xFB, 0xFB, 0xFD, 0xFD, 0xFE, 0xFE
))


def des_set_odd_parity(key: bytearray):
    key[:] = (_DES_PARITYTABLE[b] for b in key)


def rotl128(x: int, c: int):
    return ((x << c) | (x >> (0x80 - c))) & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF


def rotl64(x: int, c: int):
    return ((x << c) | (x >> (0x40 - c))) & 0xFFFFFFFFFFFFFFFF


def rotl32(x: int, c: int):
    return ((x << c) | (x >> (0x20 - c))) & 0xFFFFFFFF


def rotl16(x: int, c: int):
    return ((x << c) | (x >> (0x10 - c))) & 0xFFFF


def rotl8(x: int, c: int):
    return ((x << c) | (x >> (0x08 - c))) & 0xFF


def rotr128(x: int, c: int):
    return (x << (0x80 - c) & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF) | (x >> c)


def rotr64(x: int, c: int):
    return (x << (0x40 - c) & 0xFFFFFFFFFFFFFFFF) | (x >> c)


def rotr32(x: int, c: int):
    return (x << (0x20 - c) & 0xFFFFFFFF) | (x >> c)


def rotr16(x: int, c: int):
    return (x << (0x10 - c) & 0xFFFF) | (x >> c)


def rotr8(x: int, c: int):
    return (x << (0x08 - c) & 0xFF) | (x >> c)


def rotr(size: int, x: int, c: int) -> int:
    mask = (1 << size) - 1
    c %= size
    return (x >> c) | (x << (size - c) & mask)


def rotl(size: int, x: int, c: int) -> int:
    mask = (1 << size) - 1
    c %= size
    return (x >> (size - c)) | (x << c & mask)


class Direction(str, Enum):
    Encrypt = 'encrypt'
    Decrypt = 'decrypt'


class CipherMode(ABC):

    encrypt_block: Callable[[memoryview], memoryview]
    decrypt_block: Callable[[memoryview], memoryview]
    aligned: bool = True
    _identifier: ClassVar[int]

    @abstractmethod
    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        raise NotImplementedError

    @abstractmethod
    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        raise NotImplementedError

    def apply(
        self,
        direction: Direction,
        dst: memoryview,
        src: memoryview,
        encrypt_block: Callable[[memoryview], memoryview],
        decrypt_block: Callable[[memoryview], memoryview],
        blocksize: int,
    ) -> memoryview:
        self.encrypt_block = encrypt_block
        self.decrypt_block = decrypt_block
        engine: Generator[memoryview, memoryview, None] = {
            Direction.Encrypt: self.encrypt,
            Direction.Decrypt: self.decrypt,
        }[direction]()
        next(engine)
        top, rest = divmod(len(src), blocksize)
        top *= blocksize
        for k in range(0, top, blocksize):
            dst[k:k + blocksize] = engine.send(src[k:k + blocksize])
        if rest:
            dst[-rest:] = engine.send(src[-rest:])[:rest]
        engine.close()
        return dst


@_register_cipher_mode
class ECB(CipherMode):

    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        M = None
        D = self.decrypt_block
        while True:
            C = yield M
            M = D(C)

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        C = None
        E = self.encrypt_block
        while True:
            M = yield C
            C = E(M)


class DataUnaligned(ValueError):
    def __init__(self) -> None:
        super().__init__('Data not aligned to block size.')


class StatefulCipherMode(CipherMode):

    iv: BufferType

    def __init__(self, iv: BufferType):
        self.iv = iv


@_register_cipher_mode
class CBC(StatefulCipherMode):

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        C = self.iv
        E = self.encrypt_block
        while True:
            M = yield C
            C = E(strxor(M, C))

    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        S = self.iv
        M = None
        D = self.decrypt_block
        while True:
            C = yield M
            M = strxor(D(C), S)
            S = bytes(C)


@_register_cipher_mode
class PCBC(StatefulCipherMode):

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        S = self.iv
        C = None
        E = self.encrypt_block
        while True:
            M = yield C
            C = E(strxor(M, S))
            S = strxor(C, M)

    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        S = self.iv
        M = None
        D = self.decrypt_block
        while True:
            C = yield M
            M = strxor(S, D(C))
            S = strxor(M, C)


@_register_cipher_mode
class CFB(CipherMode):
    """
    Cipher Feedback Mode: https://csrc.nist.gov/publications/detail/sp/800-38a/final
    """

    iv: BufferType
    segment_size: int
    aligned = False

    def __init__(self, iv: BufferType, segment_size: Optional[int] = None):
        if segment_size is None:
            segment_size = 8
        if segment_size % 8 != 0:
            raise NotImplementedError('segment sizes may only be multiples of 8')
        segment_size = segment_size // 8
        if len(iv) % segment_size != 0:
            raise NotImplementedError(
                F'the block size {len(iv) * 8} is not an even multiple of the segment '
                F'size {segment_size * 8}; this is currently not supported.')
        self.segment_size = segment_size
        self.iv = iv

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        s = self.segment_size
        S = bytearray(self.iv)
        E = self.encrypt_block
        C = bytearray(len(self.iv))
        if s == 1:
            while True:
                M = yield C
                for k, m in enumerate(M):
                    C[k] = c = m ^ E(S)[0]
                    S[:-1], S[-1] = memoryview(S)[1:], c
        else:
            segments = [slice(i, i + s) for i in range(0, len(S), s)]
            while True:
                M = yield C
                for k in segments:
                    m = M[k]
                    C[k] = c = strxor(m, E(S)[:s])
                    S[:-s], S[-s:] = memoryview(S)[s:], c

    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        s = self.segment_size
        S = bytearray(self.iv)
        E = self.encrypt_block
        M = bytearray(len(self.iv))
        if s == 1:
            while True:
                C = yield M
                for k, c in enumerate(C):
                    M[k] = c ^ E(S)[0]
                    S[:-1], S[-1] = memoryview(S)[1:], c
        else:
            segments = [slice(i, i + s) for i in range(0, len(S), s)]
            while True:
                C = yield M
                for k in segments:
                    c = C[k]
                    M[k] = strxor(c, E(S)[:s])
                    S[:-s], S[-s:] = memoryview(S)[s:], c


@_register_cipher_mode
class OFB(StatefulCipherMode):

    aligned = False

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        S = self.iv
        C = None
        E = self.encrypt_block
        while True:
            M = yield C
            S = E(S)
            C = strxor(M, S)

    decrypt = encrypt


@_register_cipher_mode
class CTR(CipherMode):

    counter_len: int
    prefix: BufferType
    suffix: BufferType
    initial_value: int
    little_endian: bool
    block_size: int

    aligned = False

    @property
    def byte_order(self):
        return 'little' if self.little_endian else 'big'

    def __init__(
        self,
        block_size: Optional[int] = None,
        counter: Optional[Dict] = None,
        nonce: Optional[BufferType] = None,
        initial_value: Optional[int] = 0,
        little_endian: bool = False
    ):
        if counter is not None:
            self.initial_value = counter.get('initial_value', initial_value)
            self.little_endian = counter.get('little_endian', little_endian)
            self.prefix = counter['prefix']
            self.suffix = counter['suffix']
            self.counter_len = counter['counter_len']
            self.block_size = self.counter_len + len(self.prefix) + len(self.suffix)
            if block_size not in {None, self.block_size}:
                raise ValueError('Information in counter object does not align with block size.')
            return
        if block_size is None:
            raise ValueError('Unable to construct CTR mode object without block_size or counter argument.')

        self.initial_value = initial_value
        self.little_endian = little_endian
        self.suffix = B''
        self.block_size = block_size

        if nonce is not None:
            if len(nonce) > block_size:
                raise ValueError('Nonce length exceeds block length.')
            self.counter_len = block_size - len(nonce)
            self.prefix = nonce
        else:
            self.counter_len = block_size // 2
            self.prefix = B'\0' * (block_size - self.counter_len)

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        S = bytearray(self.block_size)
        J = slice(len(self.prefix), self.block_size - len(self.suffix))
        K = self.initial_value
        if self.prefix:
            S[:+len(self.prefix)] = self.prefix
        if self.suffix:
            S[-len(self.suffix):] = self.suffix
        C = None
        E = self.encrypt_block
        order = self.byte_order
        csize = self.counter_len
        mask = (1 << (csize * 8)) - 1
        while True:
            M = yield C
            S[J] = K.to_bytes(csize, order)
            K = K + 1 & mask
            C = strxor(E(S), M)

    decrypt = encrypt


class CipherInterface(ABC):
    key_size: Container[int]
    block_size: int

    @abstractmethod
    def encrypt(self, M: BufferType) -> BufferType: ...
    @abstractmethod
    def decrypt(self, C: BufferType) -> BufferType: ...


class CipherObjectFactory(ABC):
    name: str
    key_size: Optional[Container[int]] = None
    block_size: Optional[int] = None

    @abstractmethod
    def new(
        self,
        key: BufferType,
        iv: Optional[BufferType] = None,
        counter: Optional[int] = None,
        initial_value: Optional[int] = 0,
        nonce: Optional[BufferType] = None,
        mode: Optional[str] = None,
        segment_size: Optional[int] = None,
        block_size: Optional[int] = None,
        **cipher_args
    ) -> CipherInterface:
        ...


class PyCryptoFactoryWrapper(CipherObjectFactory):
    def __init__(self, module):
        self.module = module

    def new(self, *a, **k) -> CipherInterface:
        return self.module.new(*a, **k)

    @property
    def key_size(self):
        try:
            value = self.module.key_size
        except AttributeError:
            return None
        if isinstance(value, int):
            return {value}
        return value

    @property
    def block_size(self):
        try:
            value = self.module.block_size
        except AttributeError:
            return None
        return value

    def __repr__(self):
        return repr(self.module)

    def __dir__(self):
        return dir(self.module)

    def __getattr__(self, key):
        return getattr(self.module, key)


class BlockCipherFactory(CipherObjectFactory):
    cipher: Type[BlockCipher]

    def __init__(self, cipher: Type[BlockCipher]):
        self.cipher = cipher
        self._modes = []
        for name, mode in CIPHER_MODES.items():
            setattr(self, F'MODE_{name}', mode._identifier)
            self._modes.append(mode)

    def new(self, key, mode=None, **args) -> CipherInterface:
        if mode is not None:
            mode = self._modes[mode]
        mode_arguments = {}
        cipher = self.cipher
        for arg in ('iv', 'counter', 'initial_value', 'nonce', 'mode', 'segment_size'):
            try:
                mode_arguments[arg] = args.pop(arg)
            except KeyError:
                pass
        if mode is CTR:
            block_size = self.block_size
            if block_size is None:
                # This happens for ciphers that do not have a fixed block size, i.e. the
                # block size is truly an instance attribute and not a class property.
                # In this case, we create a temporary cipher object and use it to obtain
                # the true block size.
                block_size = cipher(key, ECB, **args).block_size
            mode_arguments.update(block_size=block_size)
        mode = mode(**mode_arguments)
        return cipher(key, mode, **args)

    @property
    def name(self):
        return self.cipher.__name__

    @property
    def key_size(self):
        try:
            value = self.cipher.key_size
        except AttributeError:
            return None
        if isinstance(value, property):
            return None
        return value

    @property
    def block_size(self):
        try:
            value = self.cipher.block_size
        except AttributeError:
            return None
        if not isinstance(value, int):
            return None
        return value


class BlockCipher(CipherInterface, ABC):
    block_size: int
    key: BufferType
    mode: CipherMode
    key_size: Container[int]

    def __init__(self, key: BufferType, mode: Optional[CipherMode]):
        if len(key) not in self.key_size:
            raise ValueError(F'The key size {len(key)} is not supported by {self.__class__.__name__.lower()}.')
        self.key = key
        self.mode = mode or ECB()

    @abstractmethod
    def block_encrypt(self, data: BufferType) -> BufferType:
        raise NotImplementedError

    @abstractmethod
    def block_decrypt(self, data: BufferType) -> BufferType:
        raise NotImplementedError

    def _apply_blockwise(self, direction: Direction, data: BufferType) -> BufferType:
        block_size = self.block_size
        mode = self.mode
        if len(data) % block_size != 0 and mode.aligned:
            raise DataUnaligned
        dst = src = memoryview(data)
        if dst.readonly:
            dst = bytearray(src)
        return mode.apply(
            direction,
            dst,
            src,
            self.block_encrypt,
            self.block_decrypt,
            block_size
        )

    def encrypt(self, data: BufferType) -> BufferType:
        return self._apply_blockwise(Direction.Encrypt, data)

    def decrypt(self, data: BufferType) -> BufferType:
        return self._apply_blockwise(Direction.Decrypt, data)

Functions

def strxor(a, b)
Expand source code Browse git
def strxor(a: bytes, b: bytes):
    return bytes(a ^ b for a, b in zip(a, b))
def des_set_odd_parity(key)
Expand source code Browse git
def des_set_odd_parity(key: bytearray):
    key[:] = (_DES_PARITYTABLE[b] for b in key)
def rotl128(x, c)
Expand source code Browse git
def rotl128(x: int, c: int):
    return ((x << c) | (x >> (0x80 - c))) & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
def rotl64(x, c)
Expand source code Browse git
def rotl64(x: int, c: int):
    return ((x << c) | (x >> (0x40 - c))) & 0xFFFFFFFFFFFFFFFF
def rotl32(x, c)
Expand source code Browse git
def rotl32(x: int, c: int):
    return ((x << c) | (x >> (0x20 - c))) & 0xFFFFFFFF
def rotl16(x, c)
Expand source code Browse git
def rotl16(x: int, c: int):
    return ((x << c) | (x >> (0x10 - c))) & 0xFFFF
def rotl8(x, c)
Expand source code Browse git
def rotl8(x: int, c: int):
    return ((x << c) | (x >> (0x08 - c))) & 0xFF
def rotr128(x, c)
Expand source code Browse git
def rotr128(x: int, c: int):
    return (x << (0x80 - c) & 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF) | (x >> c)
def rotr64(x, c)
Expand source code Browse git
def rotr64(x: int, c: int):
    return (x << (0x40 - c) & 0xFFFFFFFFFFFFFFFF) | (x >> c)
def rotr32(x, c)
Expand source code Browse git
def rotr32(x: int, c: int):
    return (x << (0x20 - c) & 0xFFFFFFFF) | (x >> c)
def rotr16(x, c)
Expand source code Browse git
def rotr16(x: int, c: int):
    return (x << (0x10 - c) & 0xFFFF) | (x >> c)
def rotr8(x, c)
Expand source code Browse git
def rotr8(x: int, c: int):
    return (x << (0x08 - c) & 0xFF) | (x >> c)
def rotr(size, x, c)
Expand source code Browse git
def rotr(size: int, x: int, c: int) -> int:
    mask = (1 << size) - 1
    c %= size
    return (x >> c) | (x << (size - c) & mask)
def rotl(size, x, c)
Expand source code Browse git
def rotl(size: int, x: int, c: int) -> int:
    mask = (1 << size) - 1
    c %= size
    return (x >> (size - c)) | (x << c & mask)

Classes

class Direction (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code Browse git
class Direction(str, Enum):
    Encrypt = 'encrypt'
    Decrypt = 'decrypt'

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var Encrypt
var Decrypt
class CipherMode

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code Browse git
class CipherMode(ABC):

    encrypt_block: Callable[[memoryview], memoryview]
    decrypt_block: Callable[[memoryview], memoryview]
    aligned: bool = True
    _identifier: ClassVar[int]

    @abstractmethod
    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        raise NotImplementedError

    @abstractmethod
    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        raise NotImplementedError

    def apply(
        self,
        direction: Direction,
        dst: memoryview,
        src: memoryview,
        encrypt_block: Callable[[memoryview], memoryview],
        decrypt_block: Callable[[memoryview], memoryview],
        blocksize: int,
    ) -> memoryview:
        self.encrypt_block = encrypt_block
        self.decrypt_block = decrypt_block
        engine: Generator[memoryview, memoryview, None] = {
            Direction.Encrypt: self.encrypt,
            Direction.Decrypt: self.decrypt,
        }[direction]()
        next(engine)
        top, rest = divmod(len(src), blocksize)
        top *= blocksize
        for k in range(0, top, blocksize):
            dst[k:k + blocksize] = engine.send(src[k:k + blocksize])
        if rest:
            dst[-rest:] = engine.send(src[-rest:])[:rest]
        engine.close()
        return dst

Ancestors

  • abc.ABC

Subclasses

Class variables

var encrypt_block
var decrypt_block
var aligned

Methods

def encrypt(self)
Expand source code Browse git
@abstractmethod
def encrypt(self) -> Generator[memoryview, memoryview, None]:
    raise NotImplementedError
def decrypt(self)
Expand source code Browse git
@abstractmethod
def decrypt(self) -> Generator[memoryview, memoryview, None]:
    raise NotImplementedError
def apply(self, direction, dst, src, encrypt_block, decrypt_block, blocksize)
Expand source code Browse git
def apply(
    self,
    direction: Direction,
    dst: memoryview,
    src: memoryview,
    encrypt_block: Callable[[memoryview], memoryview],
    decrypt_block: Callable[[memoryview], memoryview],
    blocksize: int,
) -> memoryview:
    self.encrypt_block = encrypt_block
    self.decrypt_block = decrypt_block
    engine: Generator[memoryview, memoryview, None] = {
        Direction.Encrypt: self.encrypt,
        Direction.Decrypt: self.decrypt,
    }[direction]()
    next(engine)
    top, rest = divmod(len(src), blocksize)
    top *= blocksize
    for k in range(0, top, blocksize):
        dst[k:k + blocksize] = engine.send(src[k:k + blocksize])
    if rest:
        dst[-rest:] = engine.send(src[-rest:])[:rest]
    engine.close()
    return dst
class ECB

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code Browse git
class ECB(CipherMode):

    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        M = None
        D = self.decrypt_block
        while True:
            C = yield M
            M = D(C)

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        C = None
        E = self.encrypt_block
        while True:
            M = yield C
            C = E(M)

Ancestors

Class variables

var encrypt_block
var decrypt_block
var aligned

Methods

def decrypt(self)
Expand source code Browse git
def decrypt(self) -> Generator[memoryview, memoryview, None]:
    M = None
    D = self.decrypt_block
    while True:
        C = yield M
        M = D(C)
def encrypt(self)
Expand source code Browse git
def encrypt(self) -> Generator[memoryview, memoryview, None]:
    C = None
    E = self.encrypt_block
    while True:
        M = yield C
        C = E(M)
class DataUnaligned

Inappropriate argument value (of correct type).

Expand source code Browse git
class DataUnaligned(ValueError):
    def __init__(self) -> None:
        super().__init__('Data not aligned to block size.')

Ancestors

  • builtins.ValueError
  • builtins.Exception
  • builtins.BaseException
class StatefulCipherMode (iv)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code Browse git
class StatefulCipherMode(CipherMode):

    iv: BufferType

    def __init__(self, iv: BufferType):
        self.iv = iv

Ancestors

Subclasses

Class variables

var iv
class CBC (iv)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code Browse git
class CBC(StatefulCipherMode):

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        C = self.iv
        E = self.encrypt_block
        while True:
            M = yield C
            C = E(strxor(M, C))

    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        S = self.iv
        M = None
        D = self.decrypt_block
        while True:
            C = yield M
            M = strxor(D(C), S)
            S = bytes(C)

Ancestors

Class variables

var iv

Methods

def encrypt(self)
Expand source code Browse git
def encrypt(self) -> Generator[memoryview, memoryview, None]:
    C = self.iv
    E = self.encrypt_block
    while True:
        M = yield C
        C = E(strxor(M, C))
def decrypt(self)
Expand source code Browse git
def decrypt(self) -> Generator[memoryview, memoryview, None]:
    S = self.iv
    M = None
    D = self.decrypt_block
    while True:
        C = yield M
        M = strxor(D(C), S)
        S = bytes(C)
class PCBC (iv)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code Browse git
class PCBC(StatefulCipherMode):

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        S = self.iv
        C = None
        E = self.encrypt_block
        while True:
            M = yield C
            C = E(strxor(M, S))
            S = strxor(C, M)

    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        S = self.iv
        M = None
        D = self.decrypt_block
        while True:
            C = yield M
            M = strxor(S, D(C))
            S = strxor(M, C)

Ancestors

Class variables

var iv

Methods

def encrypt(self)
Expand source code Browse git
def encrypt(self) -> Generator[memoryview, memoryview, None]:
    S = self.iv
    C = None
    E = self.encrypt_block
    while True:
        M = yield C
        C = E(strxor(M, S))
        S = strxor(C, M)
def decrypt(self)
Expand source code Browse git
def decrypt(self) -> Generator[memoryview, memoryview, None]:
    S = self.iv
    M = None
    D = self.decrypt_block
    while True:
        C = yield M
        M = strxor(S, D(C))
        S = strxor(M, C)
class CFB (iv, segment_size=None)

Cipher Feedback Mode: https://csrc.nist.gov/publications/detail/sp/800-38a/final

Expand source code Browse git
class CFB(CipherMode):
    """
    Cipher Feedback Mode: https://csrc.nist.gov/publications/detail/sp/800-38a/final
    """

    iv: BufferType
    segment_size: int
    aligned = False

    def __init__(self, iv: BufferType, segment_size: Optional[int] = None):
        if segment_size is None:
            segment_size = 8
        if segment_size % 8 != 0:
            raise NotImplementedError('segment sizes may only be multiples of 8')
        segment_size = segment_size // 8
        if len(iv) % segment_size != 0:
            raise NotImplementedError(
                F'the block size {len(iv) * 8} is not an even multiple of the segment '
                F'size {segment_size * 8}; this is currently not supported.')
        self.segment_size = segment_size
        self.iv = iv

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        s = self.segment_size
        S = bytearray(self.iv)
        E = self.encrypt_block
        C = bytearray(len(self.iv))
        if s == 1:
            while True:
                M = yield C
                for k, m in enumerate(M):
                    C[k] = c = m ^ E(S)[0]
                    S[:-1], S[-1] = memoryview(S)[1:], c
        else:
            segments = [slice(i, i + s) for i in range(0, len(S), s)]
            while True:
                M = yield C
                for k in segments:
                    m = M[k]
                    C[k] = c = strxor(m, E(S)[:s])
                    S[:-s], S[-s:] = memoryview(S)[s:], c

    def decrypt(self) -> Generator[memoryview, memoryview, None]:
        s = self.segment_size
        S = bytearray(self.iv)
        E = self.encrypt_block
        M = bytearray(len(self.iv))
        if s == 1:
            while True:
                C = yield M
                for k, c in enumerate(C):
                    M[k] = c ^ E(S)[0]
                    S[:-1], S[-1] = memoryview(S)[1:], c
        else:
            segments = [slice(i, i + s) for i in range(0, len(S), s)]
            while True:
                C = yield M
                for k in segments:
                    c = C[k]
                    M[k] = strxor(c, E(S)[:s])
                    S[:-s], S[-s:] = memoryview(S)[s:], c

Ancestors

Class variables

var iv
var segment_size
var aligned

Methods

def encrypt(self)
Expand source code Browse git
def encrypt(self) -> Generator[memoryview, memoryview, None]:
    s = self.segment_size
    S = bytearray(self.iv)
    E = self.encrypt_block
    C = bytearray(len(self.iv))
    if s == 1:
        while True:
            M = yield C
            for k, m in enumerate(M):
                C[k] = c = m ^ E(S)[0]
                S[:-1], S[-1] = memoryview(S)[1:], c
    else:
        segments = [slice(i, i + s) for i in range(0, len(S), s)]
        while True:
            M = yield C
            for k in segments:
                m = M[k]
                C[k] = c = strxor(m, E(S)[:s])
                S[:-s], S[-s:] = memoryview(S)[s:], c
def decrypt(self)
Expand source code Browse git
def decrypt(self) -> Generator[memoryview, memoryview, None]:
    s = self.segment_size
    S = bytearray(self.iv)
    E = self.encrypt_block
    M = bytearray(len(self.iv))
    if s == 1:
        while True:
            C = yield M
            for k, c in enumerate(C):
                M[k] = c ^ E(S)[0]
                S[:-1], S[-1] = memoryview(S)[1:], c
    else:
        segments = [slice(i, i + s) for i in range(0, len(S), s)]
        while True:
            C = yield M
            for k in segments:
                c = C[k]
                M[k] = strxor(c, E(S)[:s])
                S[:-s], S[-s:] = memoryview(S)[s:], c
class OFB (iv)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code Browse git
class OFB(StatefulCipherMode):

    aligned = False

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        S = self.iv
        C = None
        E = self.encrypt_block
        while True:
            M = yield C
            S = E(S)
            C = strxor(M, S)

    decrypt = encrypt

Ancestors

Class variables

var iv
var aligned

Methods

def encrypt(self)
Expand source code Browse git
def encrypt(self) -> Generator[memoryview, memoryview, None]:
    S = self.iv
    C = None
    E = self.encrypt_block
    while True:
        M = yield C
        S = E(S)
        C = strxor(M, S)
def decrypt(self)
Expand source code Browse git
def encrypt(self) -> Generator[memoryview, memoryview, None]:
    S = self.iv
    C = None
    E = self.encrypt_block
    while True:
        M = yield C
        S = E(S)
        C = strxor(M, S)
class CTR (block_size=None, counter=None, nonce=None, initial_value=0, little_endian=False)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code Browse git
class CTR(CipherMode):

    counter_len: int
    prefix: BufferType
    suffix: BufferType
    initial_value: int
    little_endian: bool
    block_size: int

    aligned = False

    @property
    def byte_order(self):
        return 'little' if self.little_endian else 'big'

    def __init__(
        self,
        block_size: Optional[int] = None,
        counter: Optional[Dict] = None,
        nonce: Optional[BufferType] = None,
        initial_value: Optional[int] = 0,
        little_endian: bool = False
    ):
        if counter is not None:
            self.initial_value = counter.get('initial_value', initial_value)
            self.little_endian = counter.get('little_endian', little_endian)
            self.prefix = counter['prefix']
            self.suffix = counter['suffix']
            self.counter_len = counter['counter_len']
            self.block_size = self.counter_len + len(self.prefix) + len(self.suffix)
            if block_size not in {None, self.block_size}:
                raise ValueError('Information in counter object does not align with block size.')
            return
        if block_size is None:
            raise ValueError('Unable to construct CTR mode object without block_size or counter argument.')

        self.initial_value = initial_value
        self.little_endian = little_endian
        self.suffix = B''
        self.block_size = block_size

        if nonce is not None:
            if len(nonce) > block_size:
                raise ValueError('Nonce length exceeds block length.')
            self.counter_len = block_size - len(nonce)
            self.prefix = nonce
        else:
            self.counter_len = block_size // 2
            self.prefix = B'\0' * (block_size - self.counter_len)

    def encrypt(self) -> Generator[memoryview, memoryview, None]:
        S = bytearray(self.block_size)
        J = slice(len(self.prefix), self.block_size - len(self.suffix))
        K = self.initial_value
        if self.prefix:
            S[:+len(self.prefix)] = self.prefix
        if self.suffix:
            S[-len(self.suffix):] = self.suffix
        C = None
        E = self.encrypt_block
        order = self.byte_order
        csize = self.counter_len
        mask = (1 << (csize * 8)) - 1
        while True:
            M = yield C
            S[J] = K.to_bytes(csize, order)
            K = K + 1 & mask
            C = strxor(E(S), M)

    decrypt = encrypt

Ancestors

Class variables

var counter_len
var prefix
var suffix
var initial_value
var little_endian
var block_size
var aligned

Instance variables

var byte_order
Expand source code Browse git
@property
def byte_order(self):
    return 'little' if self.little_endian else 'big'

Methods

def encrypt(self)
Expand source code Browse git
def encrypt(self) -> Generator[memoryview, memoryview, None]:
    S = bytearray(self.block_size)
    J = slice(len(self.prefix), self.block_size - len(self.suffix))
    K = self.initial_value
    if self.prefix:
        S[:+len(self.prefix)] = self.prefix
    if self.suffix:
        S[-len(self.suffix):] = self.suffix
    C = None
    E = self.encrypt_block
    order = self.byte_order
    csize = self.counter_len
    mask = (1 << (csize * 8)) - 1
    while True:
        M = yield C
        S[J] = K.to_bytes(csize, order)
        K = K + 1 & mask
        C = strxor(E(S), M)
def decrypt(self)
Expand source code Browse git
def encrypt(self) -> Generator[memoryview, memoryview, None]:
    S = bytearray(self.block_size)
    J = slice(len(self.prefix), self.block_size - len(self.suffix))
    K = self.initial_value
    if self.prefix:
        S[:+len(self.prefix)] = self.prefix
    if self.suffix:
        S[-len(self.suffix):] = self.suffix
    C = None
    E = self.encrypt_block
    order = self.byte_order
    csize = self.counter_len
    mask = (1 << (csize * 8)) - 1
    while True:
        M = yield C
        S[J] = K.to_bytes(csize, order)
        K = K + 1 & mask
        C = strxor(E(S), M)
class CipherInterface

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code Browse git
class CipherInterface(ABC):
    key_size: Container[int]
    block_size: int

    @abstractmethod
    def encrypt(self, M: BufferType) -> BufferType: ...
    @abstractmethod
    def decrypt(self, C: BufferType) -> BufferType: ...

Ancestors

  • abc.ABC

Subclasses

Class variables

var key_size
var block_size

Methods

def encrypt(self, M)
Expand source code Browse git
@abstractmethod
def encrypt(self, M: BufferType) -> BufferType: ...
def decrypt(self, C)
Expand source code Browse git
@abstractmethod
def decrypt(self, C: BufferType) -> BufferType: ...
class CipherObjectFactory

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code Browse git
class CipherObjectFactory(ABC):
    name: str
    key_size: Optional[Container[int]] = None
    block_size: Optional[int] = None

    @abstractmethod
    def new(
        self,
        key: BufferType,
        iv: Optional[BufferType] = None,
        counter: Optional[int] = None,
        initial_value: Optional[int] = 0,
        nonce: Optional[BufferType] = None,
        mode: Optional[str] = None,
        segment_size: Optional[int] = None,
        block_size: Optional[int] = None,
        **cipher_args
    ) -> CipherInterface:
        ...

Ancestors

  • abc.ABC

Subclasses

Class variables

var name
var key_size
var block_size

Methods

def new(self, key, iv=None, counter=None, initial_value=0, nonce=None, mode=None, segment_size=None, block_size=None, **cipher_args)
Expand source code Browse git
@abstractmethod
def new(
    self,
    key: BufferType,
    iv: Optional[BufferType] = None,
    counter: Optional[int] = None,
    initial_value: Optional[int] = 0,
    nonce: Optional[BufferType] = None,
    mode: Optional[str] = None,
    segment_size: Optional[int] = None,
    block_size: Optional[int] = None,
    **cipher_args
) -> CipherInterface:
    ...
class PyCryptoFactoryWrapper (module)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code Browse git
class PyCryptoFactoryWrapper(CipherObjectFactory):
    def __init__(self, module):
        self.module = module

    def new(self, *a, **k) -> CipherInterface:
        return self.module.new(*a, **k)

    @property
    def key_size(self):
        try:
            value = self.module.key_size
        except AttributeError:
            return None
        if isinstance(value, int):
            return {value}
        return value

    @property
    def block_size(self):
        try:
            value = self.module.block_size
        except AttributeError:
            return None
        return value

    def __repr__(self):
        return repr(self.module)

    def __dir__(self):
        return dir(self.module)

    def __getattr__(self, key):
        return getattr(self.module, key)

Ancestors

Class variables

var name

Instance variables

var key_size
Expand source code Browse git
@property
def key_size(self):
    try:
        value = self.module.key_size
    except AttributeError:
        return None
    if isinstance(value, int):
        return {value}
    return value
var block_size
Expand source code Browse git
@property
def block_size(self):
    try:
        value = self.module.block_size
    except AttributeError:
        return None
    return value

Methods

def new(self, *a, **k)
Expand source code Browse git
def new(self, *a, **k) -> CipherInterface:
    return self.module.new(*a, **k)
class BlockCipherFactory (cipher)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code Browse git
class BlockCipherFactory(CipherObjectFactory):
    cipher: Type[BlockCipher]

    def __init__(self, cipher: Type[BlockCipher]):
        self.cipher = cipher
        self._modes = []
        for name, mode in CIPHER_MODES.items():
            setattr(self, F'MODE_{name}', mode._identifier)
            self._modes.append(mode)

    def new(self, key, mode=None, **args) -> CipherInterface:
        if mode is not None:
            mode = self._modes[mode]
        mode_arguments = {}
        cipher = self.cipher
        for arg in ('iv', 'counter', 'initial_value', 'nonce', 'mode', 'segment_size'):
            try:
                mode_arguments[arg] = args.pop(arg)
            except KeyError:
                pass
        if mode is CTR:
            block_size = self.block_size
            if block_size is None:
                # This happens for ciphers that do not have a fixed block size, i.e. the
                # block size is truly an instance attribute and not a class property.
                # In this case, we create a temporary cipher object and use it to obtain
                # the true block size.
                block_size = cipher(key, ECB, **args).block_size
            mode_arguments.update(block_size=block_size)
        mode = mode(**mode_arguments)
        return cipher(key, mode, **args)

    @property
    def name(self):
        return self.cipher.__name__

    @property
    def key_size(self):
        try:
            value = self.cipher.key_size
        except AttributeError:
            return None
        if isinstance(value, property):
            return None
        return value

    @property
    def block_size(self):
        try:
            value = self.cipher.block_size
        except AttributeError:
            return None
        if not isinstance(value, int):
            return None
        return value

Ancestors

Class variables

var cipher

Instance variables

var name
Expand source code Browse git
@property
def name(self):
    return self.cipher.__name__
var key_size
Expand source code Browse git
@property
def key_size(self):
    try:
        value = self.cipher.key_size
    except AttributeError:
        return None
    if isinstance(value, property):
        return None
    return value
var block_size
Expand source code Browse git
@property
def block_size(self):
    try:
        value = self.cipher.block_size
    except AttributeError:
        return None
    if not isinstance(value, int):
        return None
    return value

Methods

def new(self, key, mode=None, **args)
Expand source code Browse git
def new(self, key, mode=None, **args) -> CipherInterface:
    if mode is not None:
        mode = self._modes[mode]
    mode_arguments = {}
    cipher = self.cipher
    for arg in ('iv', 'counter', 'initial_value', 'nonce', 'mode', 'segment_size'):
        try:
            mode_arguments[arg] = args.pop(arg)
        except KeyError:
            pass
    if mode is CTR:
        block_size = self.block_size
        if block_size is None:
            # This happens for ciphers that do not have a fixed block size, i.e. the
            # block size is truly an instance attribute and not a class property.
            # In this case, we create a temporary cipher object and use it to obtain
            # the true block size.
            block_size = cipher(key, ECB, **args).block_size
        mode_arguments.update(block_size=block_size)
    mode = mode(**mode_arguments)
    return cipher(key, mode, **args)
class BlockCipher (key, mode)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code Browse git
class BlockCipher(CipherInterface, ABC):
    block_size: int
    key: BufferType
    mode: CipherMode
    key_size: Container[int]

    def __init__(self, key: BufferType, mode: Optional[CipherMode]):
        if len(key) not in self.key_size:
            raise ValueError(F'The key size {len(key)} is not supported by {self.__class__.__name__.lower()}.')
        self.key = key
        self.mode = mode or ECB()

    @abstractmethod
    def block_encrypt(self, data: BufferType) -> BufferType:
        raise NotImplementedError

    @abstractmethod
    def block_decrypt(self, data: BufferType) -> BufferType:
        raise NotImplementedError

    def _apply_blockwise(self, direction: Direction, data: BufferType) -> BufferType:
        block_size = self.block_size
        mode = self.mode
        if len(data) % block_size != 0 and mode.aligned:
            raise DataUnaligned
        dst = src = memoryview(data)
        if dst.readonly:
            dst = bytearray(src)
        return mode.apply(
            direction,
            dst,
            src,
            self.block_encrypt,
            self.block_decrypt,
            block_size
        )

    def encrypt(self, data: BufferType) -> BufferType:
        return self._apply_blockwise(Direction.Encrypt, data)

    def decrypt(self, data: BufferType) -> BufferType:
        return self._apply_blockwise(Direction.Decrypt, data)

Ancestors

Subclasses

Class variables

var block_size
var key
var mode
var key_size

Methods

def block_encrypt(self, data)
Expand source code Browse git
@abstractmethod
def block_encrypt(self, data: BufferType) -> BufferType:
    raise NotImplementedError
def block_decrypt(self, data)
Expand source code Browse git
@abstractmethod
def block_decrypt(self, data: BufferType) -> BufferType:
    raise NotImplementedError
def encrypt(self, data)
Expand source code Browse git
def encrypt(self, data: BufferType) -> BufferType:
    return self._apply_blockwise(Direction.Encrypt, data)
def decrypt(self, data)
Expand source code Browse git
def decrypt(self, data: BufferType) -> BufferType:
    return self._apply_blockwise(Direction.Decrypt, data)