Module refinery.units.crypto.cipher.rsa

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from refinery.units import Arg, Unit, RefineryPartialResult
from refinery.lib.tools import splitchunks
from refinery.lib.mscrypto import BCRYPT_RSAKEY_BLOB, CRYPTOKEY, TYPES
from refinery.lib.xml import ForgivingParse

from base64 import b64decode, b16decode
from contextlib import suppress
from enum import IntEnum, Enum

from Cryptodome.Random import get_random_bytes
from Cryptodome.Cipher import PKCS1_OAEP
from Cryptodome.PublicKey import RSA
from Cryptodome.Util import number


class KF(str, Enum):
    TXT = 'custom'
    XML = 'XML'
    PEM = 'PEM'
    DER = 'DER'
    MSB = 'Microsoft key blob'


def normalize_rsa_key(key: bytes, force_public=False):
    try:
        mod, colon, exp = key.partition(B':')
        if colon == B':':
            mod = number.bytes_to_long(b16decode(mod, casefold=True))
            exp = number.bytes_to_long(b16decode(exp, casefold=True))
            return KF.TXT, RSA.construct((mod, exp))
    except Exception:
        pass
    try:
        key = b64decode(key, validate=True)
    except Exception:
        pass
    try:
        dom = ForgivingParse(key)
    except ValueError:
        pass
    else:
        data = {child.tag.upper(): number.bytes_to_long(b64decode(child.text)) for child in dom.getroot()}
        components = (data['MODULUS'], data['EXPONENT'])
        if not force_public:
            if 'D' in data:
                components += data['D'],
            if 'P' in data and 'Q' in data:
                components += data['P'], data['Q']
        return KF.XML, RSA.construct(components)
    try:
        blob = CRYPTOKEY(key)
    except ValueError:
        pass
    else:
        if blob.header.type not in {TYPES.PUBLICKEYBLOB, TYPES.PRIVATEKEYBLOB}:
            raise ValueError(F'The provided key is of invalid type {blob.header.type!s}, the algorithm is {blob.header.algorithm!s}.')
        if force_public and blob.header.type is TYPES.PRIVATEKEYBLOB:
            blob = blob.pub
        return KF.MSB, blob.key.convert()
    try:
        blob = BCRYPT_RSAKEY_BLOB(key)
    except ValueError:
        fmt = KF.PEM if B'----' in key else KF.DER
        key = RSA.import_key(key)
        if force_public:
            key = key.public_key()
        return fmt, key
    else:
        return KF.MSB, blob.convert(force_public=force_public)


class PAD(IntEnum):
    AUTO = 0
    NONE = 1
    OAEP = 2
    PKCS15 = 3
    PKCS10 = 4


class rsa(Unit):
    """
    Implements single block RSA encryption and decryption. This unit can be used to encrypt
    and decrypt blocks generated by openssl's `rsautl` tool when using the mode `-verify`.
    When it is executed with a public key for decryption or with a private key for encryption,
    it will perform a raw RSA operation. The result of these operations are (un)padded using
    EMSA-PKCS1-v1_5.
    """
    def __init__(
        self,
        key: Arg(help='RSA key in PEM, DER, or Microsoft BLOB format.'),
        swapkeys: Arg.Switch('-s', help='Swap public and private exponent.') = False,
        textbook: Arg.Switch('-t', group='PAD', help='Equivalent to --padding=NONE.') = False,
        padding : Arg.Option('-p', group='PAD', choices=PAD,
            help='Choose one of the following padding modes: {choices}. The default is AUTO.') = PAD.AUTO,
        rsautl  : Arg.Switch('-r', group='PAD',
            help='Act as rsautl from OpenSSH; This is equivalent to --swapkeys --padding=PKCS10') = False,
    ):
        padding = Arg.AsOption(padding, PAD)
        if textbook:
            if padding != PAD.AUTO:
                raise ValueError('Conflicting padding options!')
            padding = padding.NONE
        if rsautl:
            if padding and padding != PAD.PKCS10:
                raise ValueError('Conflicting padding options!')
            swapkeys = True
            padding = PAD.PKCS10

        super().__init__(key=key, textbook=textbook, padding=padding, swapkeys=swapkeys)

        self._key_hash = None
        self._key_data = None

    @property
    def blocksize(self) -> int:
        return self.key.size_in_bytes()

    @property
    def _blocksize_plain(self) -> int:
        # PKCS#1 v1.5 padding is at least 11 bytes.
        return self.blocksize - 11

    @property
    def pub(self):
        return self.key.d if self.args.swapkeys else self.key.e

    @property
    def prv(self):
        return self.key.e if self.args.swapkeys else self.key.d

    def _get_msg(self, data):
        msg = int.from_bytes(data, byteorder='big')
        if msg > self.key.n:
            raise ValueError(F'This key can only handle messages of size {self.blocksize}.')
        return msg

    def _encrypt_raw(self, data):
        return pow(
            self._get_msg(data),
            self.pub,
            self.key.n
        ).to_bytes(self.blocksize, byteorder='big')

    def _decrypt_raw(self, data):
        return pow(
            self._get_msg(data),
            self.prv,
            self.key.n
        ).to_bytes(self.blocksize, byteorder='big')

    def _unpad(self, data, head, padbyte=None):
        if len(data) > self.blocksize:
            raise ValueError(F'This key can only handle messages of size {self.blocksize}.')
        if data.startswith(head):
            pos = data.find(B'\0', 2)
            if pos > 0:
                pad = data[2:pos]
                if padbyte is None or all(b == padbyte for b in pad):
                    return data[pos + 1:]
        raise ValueError('Incorrect padding')

    def _pad(self, data, head, padbyte=None):
        if len(data) > self._blocksize_plain:
            raise ValueError(F'This key can only encrypt messages of size at most {self._blocksize_plain}.')
        pad = self.blocksize - len(data) - len(head) - 1
        if padbyte is not None:
            padding = pad * bytes((padbyte,))
        else:
            padding = bytearray(1)
            while not all(padding):
                padding = bytearray(filter(None, padding))
                padding.extend(get_random_bytes(pad - len(padding)))
        return head + padding + B'\0' + data

    def _unpad_pkcs10(self, data):
        return self._unpad(data, B'\x00\x01', 0xFF)

    def _unpad_pkcs15(self, data):
        return self._unpad(data, B'\x00\x02', None)

    def _pad_pkcs10(self, data):
        return self._pad(data, B'\x00\x01', 0xFF)

    def _pad_pkcs15(self, data):
        return self._pad(data, B'\x00\x02', None)

    def _decrypt_block_OAEP(self, data):
        self.log_debug('Attempting decryption with PyCrypto PKCS1 OAEP.')
        return PKCS1_OAEP.new(self.key).decrypt(data)

    def _encrypt_block_OAEP(self, data):
        self.log_debug('Attempting encryption with PyCrypto PKCS1 OAEP.')
        return PKCS1_OAEP.new(self.key).encrypt(data)

    def _decrypt_block(self, data):
        if self._oaep and self._pads in {PAD.AUTO, PAD.OAEP}:
            try:
                return self._decrypt_block_OAEP(data)
            except ValueError as E:
                if self._pads:
                    raise
                self.log_debug(F'{E!s} No longer attempting OAEP.')
                self._oaep = False

        data = self._decrypt_raw(data)
        return self._unpad_per_argument(data)

    def _unpad_per_argument(self, data):
        if self._pads == PAD.NONE:
            return data
        elif self._pads == PAD.PKCS10:
            return self._unpad_pkcs10(data)
        elif self._pads == PAD.PKCS15:
            return self._unpad_pkcs15(data)
        elif self._pads == PAD.AUTO:
            with suppress(ValueError):
                data = self._unpad_pkcs10(data)
                self.log_info('Detected PKCS1.0 padding.')
                self._pads = PAD.PKCS10
                return data
            with suppress(ValueError):
                data = self._unpad_pkcs15(data)
                self.log_info('Detected PKCS1.5 padding.')
                self._pads = PAD.PKCS15
                return data
            raise RefineryPartialResult('No padding worked, returning raw decrypted blocks.', data)
        else:
            raise ValueError(F'Invalid padding value: {self._pads!r}')

    def _encrypt_block(self, data):
        if self._pads in {PAD.AUTO, PAD.OAEP}:
            try:
                return self._encrypt_block_OAEP(data)
            except ValueError:
                if self._pads: raise
                self.log_debug('PyCrypto primitives for OAEP failed, falling back to PKCS1.5.')
                self._pads = PAD.PKCS15

        if self._pads == PAD.PKCS15:
            data = self._pad_pkcs15(data)
        elif self._pads == PAD.PKCS10:
            data = self._pad_pkcs10(data)

        return self._encrypt_raw(data)

    @property
    def key(self) -> RSA.RsaKey:
        key_blob = self.args.key
        key_hash = hash(key_blob)
        if key_hash != self._key_hash:
            fmt, key_data = normalize_rsa_key(key_blob)
            self.log_info(F'successfully parsed RSA key as {fmt.value}')
            self._key_hash = key_hash
            self._key_data = key_data
        return self._key_data

    def process(self, data):
        self._oaep = True
        self._pads = self.args.padding
        if not self.key.has_private():
            try:
                return self._unpad_per_argument(self._encrypt_raw(data))
            except Exception as E:
                raise ValueError(F'A public key was given for decryption and rsautl mode resulted in an error: {E}') from E
        return B''.join(self._decrypt_block(block) for block in splitchunks(data, self.blocksize))

    def reverse(self, data):
        self._pads = self.args.padding
        return B''.join(self._encrypt_block(block) for block in splitchunks(data, self._blocksize_plain))

Functions

def normalize_rsa_key(key, force_public=False)
Expand source code Browse git
def normalize_rsa_key(key: bytes, force_public=False):
    try:
        mod, colon, exp = key.partition(B':')
        if colon == B':':
            mod = number.bytes_to_long(b16decode(mod, casefold=True))
            exp = number.bytes_to_long(b16decode(exp, casefold=True))
            return KF.TXT, RSA.construct((mod, exp))
    except Exception:
        pass
    try:
        key = b64decode(key, validate=True)
    except Exception:
        pass
    try:
        dom = ForgivingParse(key)
    except ValueError:
        pass
    else:
        data = {child.tag.upper(): number.bytes_to_long(b64decode(child.text)) for child in dom.getroot()}
        components = (data['MODULUS'], data['EXPONENT'])
        if not force_public:
            if 'D' in data:
                components += data['D'],
            if 'P' in data and 'Q' in data:
                components += data['P'], data['Q']
        return KF.XML, RSA.construct(components)
    try:
        blob = CRYPTOKEY(key)
    except ValueError:
        pass
    else:
        if blob.header.type not in {TYPES.PUBLICKEYBLOB, TYPES.PRIVATEKEYBLOB}:
            raise ValueError(F'The provided key is of invalid type {blob.header.type!s}, the algorithm is {blob.header.algorithm!s}.')
        if force_public and blob.header.type is TYPES.PRIVATEKEYBLOB:
            blob = blob.pub
        return KF.MSB, blob.key.convert()
    try:
        blob = BCRYPT_RSAKEY_BLOB(key)
    except ValueError:
        fmt = KF.PEM if B'----' in key else KF.DER
        key = RSA.import_key(key)
        if force_public:
            key = key.public_key()
        return fmt, key
    else:
        return KF.MSB, blob.convert(force_public=force_public)

Classes

class KF (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code Browse git
class KF(str, Enum):
    TXT = 'custom'
    XML = 'XML'
    PEM = 'PEM'
    DER = 'DER'
    MSB = 'Microsoft key blob'

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var TXT
var XML
var PEM
var DER
var MSB
class PAD (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code Browse git
class PAD(IntEnum):
    AUTO = 0
    NONE = 1
    OAEP = 2
    PKCS15 = 3
    PKCS10 = 4

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var AUTO
var NONE
var OAEP
var PKCS15
var PKCS10
class rsa (key, swapkeys=False, textbook=False, padding=PAD.AUTO, rsautl=False)

Implements single block RSA encryption and decryption. This unit can be used to encrypt and decrypt blocks generated by openssl's rsautl tool when using the mode -verify. When it is executed with a public key for decryption or with a private key for encryption, it will perform a raw RSA operation. The result of these operations are (un)padded using EMSA-PKCS1-v1_5.

Expand source code Browse git
class rsa(Unit):
    """
    Implements single block RSA encryption and decryption. This unit can be used to encrypt
    and decrypt blocks generated by openssl's `rsautl` tool when using the mode `-verify`.
    When it is executed with a public key for decryption or with a private key for encryption,
    it will perform a raw RSA operation. The result of these operations are (un)padded using
    EMSA-PKCS1-v1_5.
    """
    def __init__(
        self,
        key: Arg(help='RSA key in PEM, DER, or Microsoft BLOB format.'),
        swapkeys: Arg.Switch('-s', help='Swap public and private exponent.') = False,
        textbook: Arg.Switch('-t', group='PAD', help='Equivalent to --padding=NONE.') = False,
        padding : Arg.Option('-p', group='PAD', choices=PAD,
            help='Choose one of the following padding modes: {choices}. The default is AUTO.') = PAD.AUTO,
        rsautl  : Arg.Switch('-r', group='PAD',
            help='Act as rsautl from OpenSSH; This is equivalent to --swapkeys --padding=PKCS10') = False,
    ):
        padding = Arg.AsOption(padding, PAD)
        if textbook:
            if padding != PAD.AUTO:
                raise ValueError('Conflicting padding options!')
            padding = padding.NONE
        if rsautl:
            if padding and padding != PAD.PKCS10:
                raise ValueError('Conflicting padding options!')
            swapkeys = True
            padding = PAD.PKCS10

        super().__init__(key=key, textbook=textbook, padding=padding, swapkeys=swapkeys)

        self._key_hash = None
        self._key_data = None

    @property
    def blocksize(self) -> int:
        return self.key.size_in_bytes()

    @property
    def _blocksize_plain(self) -> int:
        # PKCS#1 v1.5 padding is at least 11 bytes.
        return self.blocksize - 11

    @property
    def pub(self):
        return self.key.d if self.args.swapkeys else self.key.e

    @property
    def prv(self):
        return self.key.e if self.args.swapkeys else self.key.d

    def _get_msg(self, data):
        msg = int.from_bytes(data, byteorder='big')
        if msg > self.key.n:
            raise ValueError(F'This key can only handle messages of size {self.blocksize}.')
        return msg

    def _encrypt_raw(self, data):
        return pow(
            self._get_msg(data),
            self.pub,
            self.key.n
        ).to_bytes(self.blocksize, byteorder='big')

    def _decrypt_raw(self, data):
        return pow(
            self._get_msg(data),
            self.prv,
            self.key.n
        ).to_bytes(self.blocksize, byteorder='big')

    def _unpad(self, data, head, padbyte=None):
        if len(data) > self.blocksize:
            raise ValueError(F'This key can only handle messages of size {self.blocksize}.')
        if data.startswith(head):
            pos = data.find(B'\0', 2)
            if pos > 0:
                pad = data[2:pos]
                if padbyte is None or all(b == padbyte for b in pad):
                    return data[pos + 1:]
        raise ValueError('Incorrect padding')

    def _pad(self, data, head, padbyte=None):
        if len(data) > self._blocksize_plain:
            raise ValueError(F'This key can only encrypt messages of size at most {self._blocksize_plain}.')
        pad = self.blocksize - len(data) - len(head) - 1
        if padbyte is not None:
            padding = pad * bytes((padbyte,))
        else:
            padding = bytearray(1)
            while not all(padding):
                padding = bytearray(filter(None, padding))
                padding.extend(get_random_bytes(pad - len(padding)))
        return head + padding + B'\0' + data

    def _unpad_pkcs10(self, data):
        return self._unpad(data, B'\x00\x01', 0xFF)

    def _unpad_pkcs15(self, data):
        return self._unpad(data, B'\x00\x02', None)

    def _pad_pkcs10(self, data):
        return self._pad(data, B'\x00\x01', 0xFF)

    def _pad_pkcs15(self, data):
        return self._pad(data, B'\x00\x02', None)

    def _decrypt_block_OAEP(self, data):
        self.log_debug('Attempting decryption with PyCrypto PKCS1 OAEP.')
        return PKCS1_OAEP.new(self.key).decrypt(data)

    def _encrypt_block_OAEP(self, data):
        self.log_debug('Attempting encryption with PyCrypto PKCS1 OAEP.')
        return PKCS1_OAEP.new(self.key).encrypt(data)

    def _decrypt_block(self, data):
        if self._oaep and self._pads in {PAD.AUTO, PAD.OAEP}:
            try:
                return self._decrypt_block_OAEP(data)
            except ValueError as E:
                if self._pads:
                    raise
                self.log_debug(F'{E!s} No longer attempting OAEP.')
                self._oaep = False

        data = self._decrypt_raw(data)
        return self._unpad_per_argument(data)

    def _unpad_per_argument(self, data):
        if self._pads == PAD.NONE:
            return data
        elif self._pads == PAD.PKCS10:
            return self._unpad_pkcs10(data)
        elif self._pads == PAD.PKCS15:
            return self._unpad_pkcs15(data)
        elif self._pads == PAD.AUTO:
            with suppress(ValueError):
                data = self._unpad_pkcs10(data)
                self.log_info('Detected PKCS1.0 padding.')
                self._pads = PAD.PKCS10
                return data
            with suppress(ValueError):
                data = self._unpad_pkcs15(data)
                self.log_info('Detected PKCS1.5 padding.')
                self._pads = PAD.PKCS15
                return data
            raise RefineryPartialResult('No padding worked, returning raw decrypted blocks.', data)
        else:
            raise ValueError(F'Invalid padding value: {self._pads!r}')

    def _encrypt_block(self, data):
        if self._pads in {PAD.AUTO, PAD.OAEP}:
            try:
                return self._encrypt_block_OAEP(data)
            except ValueError:
                if self._pads: raise
                self.log_debug('PyCrypto primitives for OAEP failed, falling back to PKCS1.5.')
                self._pads = PAD.PKCS15

        if self._pads == PAD.PKCS15:
            data = self._pad_pkcs15(data)
        elif self._pads == PAD.PKCS10:
            data = self._pad_pkcs10(data)

        return self._encrypt_raw(data)

    @property
    def key(self) -> RSA.RsaKey:
        key_blob = self.args.key
        key_hash = hash(key_blob)
        if key_hash != self._key_hash:
            fmt, key_data = normalize_rsa_key(key_blob)
            self.log_info(F'successfully parsed RSA key as {fmt.value}')
            self._key_hash = key_hash
            self._key_data = key_data
        return self._key_data

    def process(self, data):
        self._oaep = True
        self._pads = self.args.padding
        if not self.key.has_private():
            try:
                return self._unpad_per_argument(self._encrypt_raw(data))
            except Exception as E:
                raise ValueError(F'A public key was given for decryption and rsautl mode resulted in an error: {E}') from E
        return B''.join(self._decrypt_block(block) for block in splitchunks(data, self.blocksize))

    def reverse(self, data):
        self._pads = self.args.padding
        return B''.join(self._encrypt_block(block) for block in splitchunks(data, self._blocksize_plain))

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Instance variables

var blocksize
Expand source code Browse git
@property
def blocksize(self) -> int:
    return self.key.size_in_bytes()
var pub
Expand source code Browse git
@property
def pub(self):
    return self.key.d if self.args.swapkeys else self.key.e
var prv
Expand source code Browse git
@property
def prv(self):
    return self.key.e if self.args.swapkeys else self.key.d
var key
Expand source code Browse git
@property
def key(self) -> RSA.RsaKey:
    key_blob = self.args.key
    key_hash = hash(key_blob)
    if key_hash != self._key_hash:
        fmt, key_data = normalize_rsa_key(key_blob)
        self.log_info(F'successfully parsed RSA key as {fmt.value}')
        self._key_hash = key_hash
        self._key_data = key_data
    return self._key_data

Inherited members