Module refinery.units.crypto.cipher

Implements several popular block and stream ciphers.

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Implements several popular block and stream ciphers.
"""
import abc

from typing import (
    Any,
    ByteString,
    ClassVar,
    Iterable,
    Optional,
    Sequence,
    Type,
)
from refinery.lib.crypto import (
    CipherObjectFactory,
    CipherInterface,
)
from refinery.lib.argformats import (
    Option,
    extract_options,
    OptionFactory,
)
from refinery.units import (
    Arg,
    Executable,
    RefineryCriticalException,
    RefineryPartialResult,
    Unit,
)


class CipherUnit(Unit, abstract=True):

    key_size: Optional[Sequence[int]] = None
    block_size: int

    def __init__(self, key: Arg(help='The encryption key.'), **keywords):
        super().__init__(key=key, **keywords)

    @abc.abstractmethod
    def decrypt(self, data: ByteString) -> ByteString:
        raise NotImplementedError

    @abc.abstractmethod
    def encrypt(self, data: ByteString) -> ByteString:
        raise NotImplementedError

    def process(self, data: ByteString) -> ByteString:
        ks = self.key_size
        if ks and len(self.args.key) not in ks:
            import itertools
            key_size_iter = iter(ks)
            key_size_options = [str(k) for k in itertools.islice(key_size_iter, 0, 5)]
            try:
                next(key_size_iter)
            except StopIteration:
                pt = '.'
            else:
                pt = ', ...'
                if isinstance(ks, range):
                    pt = F'{pt}, {ks.stop - 1}'
            if len(key_size_options) == 1:
                msg = F'{self.name} requires a key size of {key_size_options[0]}'
            else:
                msg = R', '.join(key_size_options)
                msg = F'possible key sizes for {self.name} are: {msg}'
            raise ValueError(F'the given key has an invalid length of {len(self.args.key)} bytes; {msg}{pt}')
        return self.decrypt(data)

    def reverse(self, data: ByteString) -> ByteString:
        return self.encrypt(data)


class StreamCipherUnit(CipherUnit, abstract=True):

    block_size = 1

    def __init__(
        self, key,
        discard: Arg.Number('-d', help='Discard the first {varname} bytes of the keystream, {default} by default.') = 0,
        stateful: Arg.Switch('-s', help='Do not reset the key stream while processing the chunks of one frame.') = False,
        **keywords
    ):
        super().__init__(key=key, stateful=stateful, discard=discard, **keywords)
        self._keystream = None

    @abc.abstractmethod
    def keystream(self) -> Iterable[int]:
        raise NotImplementedError

    @Unit.Requires('numpy', 'speed', 'default', 'extended')
    def _numpy():
        import numpy
        return numpy

    def encrypt(self, data: bytearray) -> bytearray:
        it = self._keystream or self.keystream()
        for _ in range(self.args.discard):
            next(it)
        try:
            np = self._numpy
        except ImportError:
            self.log_info('this unit could perform faster if numpy was installed.')
            out = bytearray(a ^ b for a, b in zip(it, data))
        else:
            key = np.fromiter(it, dtype=np.uint8, count=len(data))
            out = np.frombuffer(
                memoryview(data), dtype=np.uint8, count=len(data))
            out ^= key
        return out

    def filter(self, chunks: Iterable):
        if self.args.stateful:
            self._keystream = self.keystream()
        yield from chunks
        self._keystream = None

    decrypt = encrypt


PADDINGS_LIB = ['pkcs7', 'iso7816', 'x923']
PADDING_NONE = 'raw'
PADDINGS_ALL = PADDINGS_LIB + [PADDING_NONE]


class BlockCipherUnitBase(CipherUnit, abstract=True):
    def __init__(
        self, key, iv: Arg('-i', '--iv', help=(
            'Specifies the initialization vector. If none is specified, then a block of zero bytes is used.')) = None,
        padding: Arg.Choice('-p', type=str.lower, choices=PADDINGS_ALL, metavar='P', help=(
            'Choose a padding algorithm ({choices}). The raw algorithm does nothing. By default, all other algorithms '
            'are attempted. In most cases, the data was not correctly decrypted if none of these work.')
        ) = None,
        raw: Arg.Switch('-r', '--raw', help='Set the padding to raw; ignored when a padding is specified.') = False,
        **keywords
    ):
        if not padding and raw:
            padding = PADDING_NONE
        super().__init__(key=key, iv=iv, padding=padding, **keywords)

    @property
    @abc.abstractmethod
    def block_size(self) -> int:
        raise NotImplementedError

    @property
    def iv(self) -> ByteString:
        return self.args.iv or bytes(self.block_size)

    def _default_padding(self) -> Optional[str]:
        return self.args.padding

    def reverse(self, data: ByteString) -> ByteString:
        padding = self._default_padding()
        if padding is not None:
            self.log_info('padding method:', padding)
            if padding in PADDINGS_LIB:
                from Cryptodome.Util.Padding import pad
                data = pad(data, self.block_size, padding)
        return super().reverse(data)

    def process(self, data: ByteString) -> ByteString:
        padding = self._default_padding()
        result = super().process(data)
        if padding is None:
            return result

        from Cryptodome.Util.Padding import unpad
        padding = [padding, *(p for p in PADDINGS_LIB if p != padding)]

        for p in padding:
            if p == PADDING_NONE:
                return result
            try:
                unpadded = unpad(result, self.block_size, p.lower())
            except Exception:
                pass
            else:
                self.log_info(F'unpadding worked using {p}')
                return unpadded
        raise RefineryPartialResult(
            'None of these paddings worked: {}'.format(', '.join(padding)),
            partial=result)


class StandardCipherExecutable(Executable):

    _available_block_cipher_modes: ClassVar[Type[Option]]
    _cipher_factory: ClassVar[Optional[CipherObjectFactory]]

    def __new__(mcs, name, bases, nmspc, cipher: Optional[CipherObjectFactory] = None):
        keywords = dict(abstract=(cipher is None))
        return super(StandardCipherExecutable, mcs).__new__(mcs, name, bases, nmspc, **keywords)

    def __init__(_class, name, bases, nmspc, cipher: Optional[CipherObjectFactory] = None):
        abstract = cipher is None
        super(StandardCipherExecutable, _class).__init__(name, bases, nmspc, abstract=abstract)
        _class._cipher_factory = cipher
        if abstract:
            return
        b_size = cipher.block_size
        k_size = cipher.key_size
        if b_size is not None:
            _class.block_size = b_size
        else:
            b_size = getattr(_class, 'block_size', 2)
            if not isinstance(b_size, int):
                b_size = None
        if k_size is not None:
            _class.key_size = k_size
        if b_size and b_size <= 1:
            return
        if 'mode' not in _class._argument_specification:
            return
        modes = extract_options(cipher, 'MODE_', 'SIV', 'OPENPGP')
        check = set(modes)
        if not modes:
            raise RefineryCriticalException(F'No cipher block mode constants found in {cipher!r}')
        if not check & {'CFB'}:
            _class._argument_specification.pop('segment_size', None)
        if not check & {'EAX', 'GCM', 'OCB', 'CCM'}:
            _class._argument_specification.pop('mac_len', None)
        if not check & {'CCM'}:
            _class._argument_specification.pop('assoc_len', None)
        _class._available_block_cipher_modes = OptionFactory(modes, ignorecase=True)
        _class._argument_specification['mode'].merge_all(Arg(
            '-m', '--mode', type=str.upper, metavar='M', nargs=Arg.delete, choices=list(modes),
            help=(
                'Choose cipher mode to be used. Possible values are: {}. By default, the CBC mode'
                '  is used when an IV is is provided, and ECB otherwise.'.format(', '.join(modes))
            )
        ))


class StandardCipherUnit(CipherUnit, metaclass=StandardCipherExecutable):

    _available_block_cipher_modes: ClassVar[Type[Option]]
    _cipher_factory: ClassVar[CipherObjectFactory]
    _cipher_interface: Optional[CipherInterface] = None

    def _new_cipher(self, **optionals) -> CipherInterface:
        self.log_info(lambda: F'encryption key: {self.args.key.hex()}')
        return self._cipher_factory.new(key=self.args.key, **optionals)

    def _get_cipher(self, reset_cache=False) -> CipherInterface:
        co = self._cipher_interface
        if co is None or reset_cache:
            self._cipher_interface = co = self._new_cipher()
        return co

    @property
    def block_size(self) -> int:
        return self._get_cipher().block_size

    @property
    def key_size(self) -> Optional[Sequence[int]]:
        return self._get_cipher().key_size

    def encrypt(self, data: bytes) -> bytes:
        cipher = self._get_cipher(True)
        assert cipher.block_size == self.block_size
        return cipher.encrypt(data)

    def decrypt(self, data: bytes) -> bytes:
        cipher = self._get_cipher(True)
        assert cipher.block_size == self.block_size
        try:
            return cipher.decrypt(data)
        except ValueError:
            overlap = len(data) % self.block_size
            if not overlap:
                raise
            data[-overlap:] = []
            self.log_warn(F'removing {overlap} bytes from the input to make it a multiple of the {self.block_size}-byte block size')
            return cipher.decrypt(data)


class StandardBlockCipherUnit(BlockCipherUnitBase, StandardCipherUnit):

    def __init__(
        self, key, iv=B'', *,
        padding=None, mode=None, raw=False,
        little_endian: Arg.Switch('-e', '--little-endian',
            help='Only for CTR: Use a little endian counter instead of the default big endian.') = False,
        segment_size: Arg.Number('-S', '--segment-size', help=(
            'Only for CFB: Number of bits into which data is segmented. It must be a multiple of 8. The default of {default} means '
            'that the block size will be used as the segment size.')) = 0,
        mac_len: Arg.Number('-M', '--mac-len', bound=(4, 16),
            help='Only for EAX, GCM, OCB, and CCM: Length of the authentication tag, in bytes.') = 0,
        assoc_len: Arg.Number('-A', '--assoc-len',
            help='Only for CCM: Length of the associated data. If not specified, all associated data is buffered internally.') = 0,
        **keywords
    ):
        mode = self._available_block_cipher_modes(mode or iv and 'CBC' or 'ECB')
        if iv and mode.name == 'ECB':
            raise ValueError('No initialization vector can be specified for ECB mode.')
        super().__init__(
            key=key,
            iv=iv,
            padding=padding,
            mode=mode,
            raw=raw,
            segment_size=segment_size,
            mac_len=mac_len,
            assoc_len=assoc_len,
            little_endian=little_endian,
            **keywords
        )

    def _default_padding(self) -> Optional[str]:
        padding = super()._default_padding()
        if padding is not None:
            return padding
        elif self.args.mode.name in {'ECB', 'CBC', 'PCBC'}:
            return PADDINGS_LIB[0]

    @property
    def block_size(self) -> int:
        provider = StandardCipherUnit
        return provider.block_size.fget(self)

    @property
    def key_size(self) -> Sequence[int]:
        provider = StandardCipherUnit
        return provider.key_size.fget(self)

    def _new_cipher(self, **optionals) -> CipherInterface:
        mode = self.args.mode.name
        if mode != 'ECB':
            iv = bytes(self.iv)
            if mode == 'CTR':
                from Cryptodome.Util import Counter
                little_endian = self.args.little_endian
                order = 'little' if little_endian else 'big'
                counter = Counter.new(
                    self.block_size * 8,
                    initial_value=int.from_bytes(iv, order),
                    little_endian=little_endian)
                optionals['counter'] = counter
            elif mode in ('CCM', 'EAX', 'GCM', 'SIV', 'OCB', 'CTR'):
                if mode in ('CCM', 'EAX', 'GCM', 'OCB'):
                    ml = self.args.mac_len
                    if ml > 0:
                        if ml not in range(4, 17):
                            raise ValueError(F'The given mac length {ml} is not in range [4,16].')
                        optionals['mac_len'] = ml
                if mode == 'CCM':
                    al = self.args.assoc_len
                    if al > 0:
                        optionals['assoc_len'] = al
                bounds = {
                    'CCM': (7, self.block_size - 2),
                    'OCB': (1, self.block_size),
                    'CTR': (1, self.block_size),
                }.get(mode, None)
                if bounds and len(iv) not in range(*bounds):
                    raise ValueError(F'Invalid nonce length, must be in {bounds} for {mode}.')
                optionals['nonce'] = iv
            elif mode in ('PCBC', 'CBC', 'CFB', 'OFB', 'OPENPGP'):
                if mode == 'CFB':
                    sz = self.args.segment_size
                    if sz % 8 != 0:
                        raise ValueError(F'The given segment size {sz} is not a multiple of 8.')
                    if not sz:
                        sz = self.block_size * 8
                    optionals['segment_size'] = sz
                if len(iv) > self.block_size:
                    self.log_warn(F'The IV has length {len(self.args.iv)} and will be truncated to the block size {self.block_size}.')
                    iv = iv[:self.block_size]
                elif len(iv) < self.block_size:
                    raise ValueError(F'The IV has length {len(self.args.iv)} but the block size is {self.block_size}.')
                optionals['iv'] = iv
            self.log_info('initial vector:', iv.hex())
        if self.args.mode:
            optionals['mode'] = self.args.mode.value
        return super()._new_cipher(**optionals)


class LatinCipherUnit(StreamCipherUnit, abstract=True):
    key_size = {16, 32, 64}
    block_size = 1

    def __init__(
        self, key, stateful=False, discard=0,
        nonce: Arg(help='The nonce. Default is the string {default}.') = B'REFINERY',
        magic: Arg('-m', help='The magic constant; depends on the key size by default.') = B'',
        offset: Arg.Number('-x', help='Optionally specify the stream index, default is {default}.') = 0,
        rounds: Arg.Number('-r', help='The number of rounds. Has to be an even number.') = 20,
    ):
        super().__init__(
            key=key,
            nonce=nonce,
            magic=magic,
            offset=offset,
            rounds=rounds,
            stateful=stateful,
            discard=discard
        )


class LatinCipherStandardUnit(StandardCipherUnit):
    def __init__(self, key, nonce: Arg(help='The nonce. Default is the string {default}.') = B'REFINERY'):
        super().__init__(key, nonce=nonce)

    def _new_cipher(self, **optionals) -> Any:
        self.log_info('one-time nonce:', self.args.nonce.hex())
        return super()._new_cipher(nonce=self.args.nonce)

Sub-modules

refinery.units.crypto.cipher.aes
refinery.units.crypto.cipher.blabla
refinery.units.crypto.cipher.blowfish
refinery.units.crypto.cipher.camellia
refinery.units.crypto.cipher.cast
refinery.units.crypto.cipher.chacha
refinery.units.crypto.cipher.chaskey
refinery.units.crypto.cipher.des
refinery.units.crypto.cipher.des3
refinery.units.crypto.cipher.fernet
refinery.units.crypto.cipher.gost
refinery.units.crypto.cipher.hc128

Pure Python implementation of HC-128

refinery.units.crypto.cipher.hc256

Pure Python implementation of HC-128

refinery.units.crypto.cipher.isaac
refinery.units.crypto.cipher.rabbit

Pure Python implementation of the RABBIT stream cipher.

refinery.units.crypto.cipher.rc2
refinery.units.crypto.cipher.rc4
refinery.units.crypto.cipher.rc4mod
refinery.units.crypto.cipher.rc5
refinery.units.crypto.cipher.rc6
refinery.units.crypto.cipher.rijndael
refinery.units.crypto.cipher.rncrypt

The source code for this refinery unit is based on RNCryptor for Python: …

refinery.units.crypto.cipher.rot
refinery.units.crypto.cipher.rsa
refinery.units.crypto.cipher.rsakey
refinery.units.crypto.cipher.salsa
refinery.units.crypto.cipher.seal

Pure Python implementation of SEAL3 …

refinery.units.crypto.cipher.secstr
refinery.units.crypto.cipher.serpent
refinery.units.crypto.cipher.sm4
refinery.units.crypto.cipher.sosemanuk
refinery.units.crypto.cipher.tea
refinery.units.crypto.cipher.vigenere
refinery.units.crypto.cipher.xtea
refinery.units.crypto.cipher.xxtea

Classes

class CipherUnit (key, **keywords)
Expand source code Browse git
class CipherUnit(Unit, abstract=True):

    key_size: Optional[Sequence[int]] = None
    block_size: int

    def __init__(self, key: Arg(help='The encryption key.'), **keywords):
        super().__init__(key=key, **keywords)

    @abc.abstractmethod
    def decrypt(self, data: ByteString) -> ByteString:
        raise NotImplementedError

    @abc.abstractmethod
    def encrypt(self, data: ByteString) -> ByteString:
        raise NotImplementedError

    def process(self, data: ByteString) -> ByteString:
        ks = self.key_size
        if ks and len(self.args.key) not in ks:
            import itertools
            key_size_iter = iter(ks)
            key_size_options = [str(k) for k in itertools.islice(key_size_iter, 0, 5)]
            try:
                next(key_size_iter)
            except StopIteration:
                pt = '.'
            else:
                pt = ', ...'
                if isinstance(ks, range):
                    pt = F'{pt}, {ks.stop - 1}'
            if len(key_size_options) == 1:
                msg = F'{self.name} requires a key size of {key_size_options[0]}'
            else:
                msg = R', '.join(key_size_options)
                msg = F'possible key sizes for {self.name} are: {msg}'
            raise ValueError(F'the given key has an invalid length of {len(self.args.key)} bytes; {msg}{pt}')
        return self.decrypt(data)

    def reverse(self, data: ByteString) -> ByteString:
        return self.encrypt(data)

Ancestors

Subclasses

Class variables

var block_size
var key_size

Methods

def decrypt(self, data)
Expand source code Browse git
@abc.abstractmethod
def decrypt(self, data: ByteString) -> ByteString:
    raise NotImplementedError
def encrypt(self, data)
Expand source code Browse git
@abc.abstractmethod
def encrypt(self, data: ByteString) -> ByteString:
    raise NotImplementedError

Inherited members

class StreamCipherUnit (key, discard=0, stateful=False, **keywords)
Expand source code Browse git
class StreamCipherUnit(CipherUnit, abstract=True):

    block_size = 1

    def __init__(
        self, key,
        discard: Arg.Number('-d', help='Discard the first {varname} bytes of the keystream, {default} by default.') = 0,
        stateful: Arg.Switch('-s', help='Do not reset the key stream while processing the chunks of one frame.') = False,
        **keywords
    ):
        super().__init__(key=key, stateful=stateful, discard=discard, **keywords)
        self._keystream = None

    @abc.abstractmethod
    def keystream(self) -> Iterable[int]:
        raise NotImplementedError

    @Unit.Requires('numpy', 'speed', 'default', 'extended')
    def _numpy():
        import numpy
        return numpy

    def encrypt(self, data: bytearray) -> bytearray:
        it = self._keystream or self.keystream()
        for _ in range(self.args.discard):
            next(it)
        try:
            np = self._numpy
        except ImportError:
            self.log_info('this unit could perform faster if numpy was installed.')
            out = bytearray(a ^ b for a, b in zip(it, data))
        else:
            key = np.fromiter(it, dtype=np.uint8, count=len(data))
            out = np.frombuffer(
                memoryview(data), dtype=np.uint8, count=len(data))
            out ^= key
        return out

    def filter(self, chunks: Iterable):
        if self.args.stateful:
            self._keystream = self.keystream()
        yield from chunks
        self._keystream = None

    decrypt = encrypt

Ancestors

Subclasses

Class variables

var key_size
var block_size
var optional_dependencies

Methods

def keystream(self)
Expand source code Browse git
@abc.abstractmethod
def keystream(self) -> Iterable[int]:
    raise NotImplementedError
def encrypt(self, data)
Expand source code Browse git
def encrypt(self, data: bytearray) -> bytearray:
    it = self._keystream or self.keystream()
    for _ in range(self.args.discard):
        next(it)
    try:
        np = self._numpy
    except ImportError:
        self.log_info('this unit could perform faster if numpy was installed.')
        out = bytearray(a ^ b for a, b in zip(it, data))
    else:
        key = np.fromiter(it, dtype=np.uint8, count=len(data))
        out = np.frombuffer(
            memoryview(data), dtype=np.uint8, count=len(data))
        out ^= key
    return out
def decrypt(self, data)
Expand source code Browse git
def encrypt(self, data: bytearray) -> bytearray:
    it = self._keystream or self.keystream()
    for _ in range(self.args.discard):
        next(it)
    try:
        np = self._numpy
    except ImportError:
        self.log_info('this unit could perform faster if numpy was installed.')
        out = bytearray(a ^ b for a, b in zip(it, data))
    else:
        key = np.fromiter(it, dtype=np.uint8, count=len(data))
        out = np.frombuffer(
            memoryview(data), dtype=np.uint8, count=len(data))
        out ^= key
    return out

Inherited members

class BlockCipherUnitBase (key, iv=None, padding=None, raw=False, **keywords)
Expand source code Browse git
class BlockCipherUnitBase(CipherUnit, abstract=True):
    def __init__(
        self, key, iv: Arg('-i', '--iv', help=(
            'Specifies the initialization vector. If none is specified, then a block of zero bytes is used.')) = None,
        padding: Arg.Choice('-p', type=str.lower, choices=PADDINGS_ALL, metavar='P', help=(
            'Choose a padding algorithm ({choices}). The raw algorithm does nothing. By default, all other algorithms '
            'are attempted. In most cases, the data was not correctly decrypted if none of these work.')
        ) = None,
        raw: Arg.Switch('-r', '--raw', help='Set the padding to raw; ignored when a padding is specified.') = False,
        **keywords
    ):
        if not padding and raw:
            padding = PADDING_NONE
        super().__init__(key=key, iv=iv, padding=padding, **keywords)

    @property
    @abc.abstractmethod
    def block_size(self) -> int:
        raise NotImplementedError

    @property
    def iv(self) -> ByteString:
        return self.args.iv or bytes(self.block_size)

    def _default_padding(self) -> Optional[str]:
        return self.args.padding

    def reverse(self, data: ByteString) -> ByteString:
        padding = self._default_padding()
        if padding is not None:
            self.log_info('padding method:', padding)
            if padding in PADDINGS_LIB:
                from Cryptodome.Util.Padding import pad
                data = pad(data, self.block_size, padding)
        return super().reverse(data)

    def process(self, data: ByteString) -> ByteString:
        padding = self._default_padding()
        result = super().process(data)
        if padding is None:
            return result

        from Cryptodome.Util.Padding import unpad
        padding = [padding, *(p for p in PADDINGS_LIB if p != padding)]

        for p in padding:
            if p == PADDING_NONE:
                return result
            try:
                unpadded = unpad(result, self.block_size, p.lower())
            except Exception:
                pass
            else:
                self.log_info(F'unpadding worked using {p}')
                return unpadded
        raise RefineryPartialResult(
            'None of these paddings worked: {}'.format(', '.join(padding)),
            partial=result)

Ancestors

Subclasses

Class variables

var key_size

Instance variables

var block_size
Expand source code Browse git
@property
@abc.abstractmethod
def block_size(self) -> int:
    raise NotImplementedError
var iv
Expand source code Browse git
@property
def iv(self) -> ByteString:
    return self.args.iv or bytes(self.block_size)

Inherited members

class StandardCipherExecutable (name, bases, nmspc, cipher=None)

This is the metaclass for refinery units. A class which is of this type is required to implement a method run(). If the class is created in the currently executing module, then an instance of the class is automatically created after it is defined and its run() method is invoked.

Expand source code Browse git
class StandardCipherExecutable(Executable):

    _available_block_cipher_modes: ClassVar[Type[Option]]
    _cipher_factory: ClassVar[Optional[CipherObjectFactory]]

    def __new__(mcs, name, bases, nmspc, cipher: Optional[CipherObjectFactory] = None):
        keywords = dict(abstract=(cipher is None))
        return super(StandardCipherExecutable, mcs).__new__(mcs, name, bases, nmspc, **keywords)

    def __init__(_class, name, bases, nmspc, cipher: Optional[CipherObjectFactory] = None):
        abstract = cipher is None
        super(StandardCipherExecutable, _class).__init__(name, bases, nmspc, abstract=abstract)
        _class._cipher_factory = cipher
        if abstract:
            return
        b_size = cipher.block_size
        k_size = cipher.key_size
        if b_size is not None:
            _class.block_size = b_size
        else:
            b_size = getattr(_class, 'block_size', 2)
            if not isinstance(b_size, int):
                b_size = None
        if k_size is not None:
            _class.key_size = k_size
        if b_size and b_size <= 1:
            return
        if 'mode' not in _class._argument_specification:
            return
        modes = extract_options(cipher, 'MODE_', 'SIV', 'OPENPGP')
        check = set(modes)
        if not modes:
            raise RefineryCriticalException(F'No cipher block mode constants found in {cipher!r}')
        if not check & {'CFB'}:
            _class._argument_specification.pop('segment_size', None)
        if not check & {'EAX', 'GCM', 'OCB', 'CCM'}:
            _class._argument_specification.pop('mac_len', None)
        if not check & {'CCM'}:
            _class._argument_specification.pop('assoc_len', None)
        _class._available_block_cipher_modes = OptionFactory(modes, ignorecase=True)
        _class._argument_specification['mode'].merge_all(Arg(
            '-m', '--mode', type=str.upper, metavar='M', nargs=Arg.delete, choices=list(modes),
            help=(
                'Choose cipher mode to be used. Possible values are: {}. By default, the CBC mode'
                '  is used when an IV is is provided, and ECB otherwise.'.format(', '.join(modes))
            )
        ))

Ancestors

Inherited members

class StandardCipherUnit (key, **keywords)
Expand source code Browse git
class StandardCipherUnit(CipherUnit, metaclass=StandardCipherExecutable):

    _available_block_cipher_modes: ClassVar[Type[Option]]
    _cipher_factory: ClassVar[CipherObjectFactory]
    _cipher_interface: Optional[CipherInterface] = None

    def _new_cipher(self, **optionals) -> CipherInterface:
        self.log_info(lambda: F'encryption key: {self.args.key.hex()}')
        return self._cipher_factory.new(key=self.args.key, **optionals)

    def _get_cipher(self, reset_cache=False) -> CipherInterface:
        co = self._cipher_interface
        if co is None or reset_cache:
            self._cipher_interface = co = self._new_cipher()
        return co

    @property
    def block_size(self) -> int:
        return self._get_cipher().block_size

    @property
    def key_size(self) -> Optional[Sequence[int]]:
        return self._get_cipher().key_size

    def encrypt(self, data: bytes) -> bytes:
        cipher = self._get_cipher(True)
        assert cipher.block_size == self.block_size
        return cipher.encrypt(data)

    def decrypt(self, data: bytes) -> bytes:
        cipher = self._get_cipher(True)
        assert cipher.block_size == self.block_size
        try:
            return cipher.decrypt(data)
        except ValueError:
            overlap = len(data) % self.block_size
            if not overlap:
                raise
            data[-overlap:] = []
            self.log_warn(F'removing {overlap} bytes from the input to make it a multiple of the {self.block_size}-byte block size')
            return cipher.decrypt(data)

Ancestors

Subclasses

Instance variables

var block_size
Expand source code Browse git
@property
def block_size(self) -> int:
    return self._get_cipher().block_size
var key_size
Expand source code Browse git
@property
def key_size(self) -> Optional[Sequence[int]]:
    return self._get_cipher().key_size

Methods

def encrypt(self, data)
Expand source code Browse git
def encrypt(self, data: bytes) -> bytes:
    cipher = self._get_cipher(True)
    assert cipher.block_size == self.block_size
    return cipher.encrypt(data)
def decrypt(self, data)
Expand source code Browse git
def decrypt(self, data: bytes) -> bytes:
    cipher = self._get_cipher(True)
    assert cipher.block_size == self.block_size
    try:
        return cipher.decrypt(data)
    except ValueError:
        overlap = len(data) % self.block_size
        if not overlap:
            raise
        data[-overlap:] = []
        self.log_warn(F'removing {overlap} bytes from the input to make it a multiple of the {self.block_size}-byte block size')
        return cipher.decrypt(data)

Inherited members

class StandardBlockCipherUnit (key, iv=b'', *, padding=None, mode=None, raw=False, little_endian=False, segment_size=0, mac_len=0, assoc_len=0, **keywords)
Expand source code Browse git
class StandardBlockCipherUnit(BlockCipherUnitBase, StandardCipherUnit):

    def __init__(
        self, key, iv=B'', *,
        padding=None, mode=None, raw=False,
        little_endian: Arg.Switch('-e', '--little-endian',
            help='Only for CTR: Use a little endian counter instead of the default big endian.') = False,
        segment_size: Arg.Number('-S', '--segment-size', help=(
            'Only for CFB: Number of bits into which data is segmented. It must be a multiple of 8. The default of {default} means '
            'that the block size will be used as the segment size.')) = 0,
        mac_len: Arg.Number('-M', '--mac-len', bound=(4, 16),
            help='Only for EAX, GCM, OCB, and CCM: Length of the authentication tag, in bytes.') = 0,
        assoc_len: Arg.Number('-A', '--assoc-len',
            help='Only for CCM: Length of the associated data. If not specified, all associated data is buffered internally.') = 0,
        **keywords
    ):
        mode = self._available_block_cipher_modes(mode or iv and 'CBC' or 'ECB')
        if iv and mode.name == 'ECB':
            raise ValueError('No initialization vector can be specified for ECB mode.')
        super().__init__(
            key=key,
            iv=iv,
            padding=padding,
            mode=mode,
            raw=raw,
            segment_size=segment_size,
            mac_len=mac_len,
            assoc_len=assoc_len,
            little_endian=little_endian,
            **keywords
        )

    def _default_padding(self) -> Optional[str]:
        padding = super()._default_padding()
        if padding is not None:
            return padding
        elif self.args.mode.name in {'ECB', 'CBC', 'PCBC'}:
            return PADDINGS_LIB[0]

    @property
    def block_size(self) -> int:
        provider = StandardCipherUnit
        return provider.block_size.fget(self)

    @property
    def key_size(self) -> Sequence[int]:
        provider = StandardCipherUnit
        return provider.key_size.fget(self)

    def _new_cipher(self, **optionals) -> CipherInterface:
        mode = self.args.mode.name
        if mode != 'ECB':
            iv = bytes(self.iv)
            if mode == 'CTR':
                from Cryptodome.Util import Counter
                little_endian = self.args.little_endian
                order = 'little' if little_endian else 'big'
                counter = Counter.new(
                    self.block_size * 8,
                    initial_value=int.from_bytes(iv, order),
                    little_endian=little_endian)
                optionals['counter'] = counter
            elif mode in ('CCM', 'EAX', 'GCM', 'SIV', 'OCB', 'CTR'):
                if mode in ('CCM', 'EAX', 'GCM', 'OCB'):
                    ml = self.args.mac_len
                    if ml > 0:
                        if ml not in range(4, 17):
                            raise ValueError(F'The given mac length {ml} is not in range [4,16].')
                        optionals['mac_len'] = ml
                if mode == 'CCM':
                    al = self.args.assoc_len
                    if al > 0:
                        optionals['assoc_len'] = al
                bounds = {
                    'CCM': (7, self.block_size - 2),
                    'OCB': (1, self.block_size),
                    'CTR': (1, self.block_size),
                }.get(mode, None)
                if bounds and len(iv) not in range(*bounds):
                    raise ValueError(F'Invalid nonce length, must be in {bounds} for {mode}.')
                optionals['nonce'] = iv
            elif mode in ('PCBC', 'CBC', 'CFB', 'OFB', 'OPENPGP'):
                if mode == 'CFB':
                    sz = self.args.segment_size
                    if sz % 8 != 0:
                        raise ValueError(F'The given segment size {sz} is not a multiple of 8.')
                    if not sz:
                        sz = self.block_size * 8
                    optionals['segment_size'] = sz
                if len(iv) > self.block_size:
                    self.log_warn(F'The IV has length {len(self.args.iv)} and will be truncated to the block size {self.block_size}.')
                    iv = iv[:self.block_size]
                elif len(iv) < self.block_size:
                    raise ValueError(F'The IV has length {len(self.args.iv)} but the block size is {self.block_size}.')
                optionals['iv'] = iv
            self.log_info('initial vector:', iv.hex())
        if self.args.mode:
            optionals['mode'] = self.args.mode.value
        return super()._new_cipher(**optionals)

Ancestors

Subclasses

Instance variables

var block_size
Expand source code Browse git
@property
def block_size(self) -> int:
    provider = StandardCipherUnit
    return provider.block_size.fget(self)
var key_size
Expand source code Browse git
@property
def key_size(self) -> Sequence[int]:
    provider = StandardCipherUnit
    return provider.key_size.fget(self)

Inherited members

class LatinCipherUnit (key, stateful=False, discard=0, nonce=b'REFINERY', magic=b'', offset=0, rounds=20)
Expand source code Browse git
class LatinCipherUnit(StreamCipherUnit, abstract=True):
    key_size = {16, 32, 64}
    block_size = 1

    def __init__(
        self, key, stateful=False, discard=0,
        nonce: Arg(help='The nonce. Default is the string {default}.') = B'REFINERY',
        magic: Arg('-m', help='The magic constant; depends on the key size by default.') = B'',
        offset: Arg.Number('-x', help='Optionally specify the stream index, default is {default}.') = 0,
        rounds: Arg.Number('-r', help='The number of rounds. Has to be an even number.') = 20,
    ):
        super().__init__(
            key=key,
            nonce=nonce,
            magic=magic,
            offset=offset,
            rounds=rounds,
            stateful=stateful,
            discard=discard
        )

Ancestors

Subclasses

Class variables

var key_size
var block_size

Inherited members

class LatinCipherStandardUnit (key, nonce=b'REFINERY')
Expand source code Browse git
class LatinCipherStandardUnit(StandardCipherUnit):
    def __init__(self, key, nonce: Arg(help='The nonce. Default is the string {default}.') = B'REFINERY'):
        super().__init__(key, nonce=nonce)

    def _new_cipher(self, **optionals) -> Any:
        self.log_info('one-time nonce:', self.args.nonce.hex())
        return super()._new_cipher(nonce=self.args.nonce)

Ancestors

Subclasses

Inherited members