Module refinery.units.crypto.cipher.rsakey
Expand source code Browse git
from __future__ import annotations
import base64
import enum
import itertools
import json
import textwrap
from refinery.units import Arg, Unit
from refinery.lib.json import flattened
from refinery.units.crypto.cipher.rsa import normalize_rsa_key
from Cryptodome.Util.asn1 import DerSequence
from Cryptodome.Util import number
class RSAFormat(str, enum.Enum):
PEM = 'PEM'
DER = 'DER'
XKMS = 'XKMS'
TEXT = 'TEXT'
JSON = 'JSON'
BLOB = 'BLOB'
class rsakey(Unit):
"""
Parse RSA keys in various formats; PEM, DER, Microsoft BLOB, and W3C-XKMS (XML) format are supported.
The same formats are supported for the input format, but you can also specify a key in the following
format, where both modulus and exponent have to be hex-encoded: `[modulus]:[exponent]`
"""
def __init__(
self,
output: Arg.Option(help='Select an output format ({choices}), default is {default}.', choices=RSAFormat) = RSAFormat.PEM,
public: Arg.Switch('-p', help='Force public key output even if the input is private.') = False,
):
super().__init__(output=Arg.AsOption(output, RSAFormat), public=public)
def _xkms_wrap(self, number: int):
size, r = divmod(number.bit_length(), 8)
size += int(bool(r))
return base64.b64encode(number.to_bytes(size, 'big'))
def process(self, data):
from refinery.lib.mscrypto import TYPES, ALGORITHMS
fmt, key = normalize_rsa_key(data, force_public=self.args.public)
self.log_info(F'parsing input as {fmt.value} format')
out = self.args.output
if out is RSAFormat.PEM:
yield key.export_key('PEM')
return
if out is RSAFormat.DER:
yield key.export_key('DER')
return
if out is RSAFormat.BLOB:
def le(v: int, s: int):
return v.to_bytes(s, 'little')
buffer = bytearray()
buffer.append(TYPES.PRIVATEKEYBLOB if key.has_private() else TYPES.PUBLICKEYBLOB)
buffer.extend(le(2, 3))
buffer.extend(le(ALGORITHMS.CALG_RSA_KEYX, 4))
buffer.extend(B'RSA2' if key.has_private() else B'RSA1')
size = 2
while size < key.n.bit_length():
size <<= 1
self.log_info(F'using bit size {size}')
buffer.extend(le(size, 4))
size //= 8
buffer.extend(le(key.e, 4))
buffer.extend(le(key.n, size))
if key.has_private():
exp_1 = key.d % (key.p - 1)
exp_2 = key.d % (key.q - 1)
coeff = pow(key.q, -1, key.p)
half = size // 2
buffer.extend(le(key.p, half))
buffer.extend(le(key.q, half))
buffer.extend(le(exp_1, half))
buffer.extend(le(exp_2, half))
buffer.extend(le(coeff, half))
buffer.extend(le(key.d, size))
yield buffer
return
components = {
'Modulus' : key.n,
'Exponent': key.e,
}
if key.has_private():
decoded = DerSequence()
decoded.decode(key.export_key('DER'))
it = itertools.islice(decoded, 3, None)
for v in ('D', 'P', 'Q', 'DP', 'DQ', 'InverseQ'):
try:
components[v] = next(it)
except StopIteration:
break
if out is RSAFormat.XKMS:
for tag in components:
components[tag] = base64.b64encode(number.long_to_bytes(components[tag])).decode('ascii')
tags = '\n'.join(F'\t<{tag}>{value}</{tag}>' for tag, value in components.items())
yield F'<RSAKeyPair>\n{tags}\n</RSAKeyPair>'.encode(self.codec)
return
components['BitSize'] = key.n.bit_length()
for tag, value in components.items():
if value.bit_length() > 32:
components[tag] = F'{value:X}'
if out is RSAFormat.JSON:
yield json.dumps(components, indent=4).encode(self.codec)
return
if out is RSAFormat.TEXT:
table = list(flattened(components))
for key, value in table:
value = F'0x{value}' if isinstance(value, str) else str(value)
value = '\n'.join(F'{L}' for L in textwrap.wrap(value, 80))
yield F'-- {key + " ":-<77}\n{value!s}'.encode(self.codec)
Classes
class RSAFormat (*args, **kwds)
-
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
Expand source code Browse git
class RSAFormat(str, enum.Enum): PEM = 'PEM' DER = 'DER' XKMS = 'XKMS' TEXT = 'TEXT' JSON = 'JSON' BLOB = 'BLOB'
Ancestors
- builtins.str
- enum.Enum
Class variables
var PEM
var DER
var XKMS
var TEXT
var JSON
var BLOB
class rsakey (output=RSAFormat.PEM, public=False)
-
Parse RSA keys in various formats; PEM, DER, Microsoft BLOB, and W3C-XKMS (XML) format are supported. The same formats are supported for the input format, but you can also specify a key in the following format, where both modulus and exponent have to be hex-encoded:
[modulus]:[exponent]
Expand source code Browse git
class rsakey(Unit): """ Parse RSA keys in various formats; PEM, DER, Microsoft BLOB, and W3C-XKMS (XML) format are supported. The same formats are supported for the input format, but you can also specify a key in the following format, where both modulus and exponent have to be hex-encoded: `[modulus]:[exponent]` """ def __init__( self, output: Arg.Option(help='Select an output format ({choices}), default is {default}.', choices=RSAFormat) = RSAFormat.PEM, public: Arg.Switch('-p', help='Force public key output even if the input is private.') = False, ): super().__init__(output=Arg.AsOption(output, RSAFormat), public=public) def _xkms_wrap(self, number: int): size, r = divmod(number.bit_length(), 8) size += int(bool(r)) return base64.b64encode(number.to_bytes(size, 'big')) def process(self, data): from refinery.lib.mscrypto import TYPES, ALGORITHMS fmt, key = normalize_rsa_key(data, force_public=self.args.public) self.log_info(F'parsing input as {fmt.value} format') out = self.args.output if out is RSAFormat.PEM: yield key.export_key('PEM') return if out is RSAFormat.DER: yield key.export_key('DER') return if out is RSAFormat.BLOB: def le(v: int, s: int): return v.to_bytes(s, 'little') buffer = bytearray() buffer.append(TYPES.PRIVATEKEYBLOB if key.has_private() else TYPES.PUBLICKEYBLOB) buffer.extend(le(2, 3)) buffer.extend(le(ALGORITHMS.CALG_RSA_KEYX, 4)) buffer.extend(B'RSA2' if key.has_private() else B'RSA1') size = 2 while size < key.n.bit_length(): size <<= 1 self.log_info(F'using bit size {size}') buffer.extend(le(size, 4)) size //= 8 buffer.extend(le(key.e, 4)) buffer.extend(le(key.n, size)) if key.has_private(): exp_1 = key.d % (key.p - 1) exp_2 = key.d % (key.q - 1) coeff = pow(key.q, -1, key.p) half = size // 2 buffer.extend(le(key.p, half)) buffer.extend(le(key.q, half)) buffer.extend(le(exp_1, half)) buffer.extend(le(exp_2, half)) buffer.extend(le(coeff, half)) buffer.extend(le(key.d, size)) yield buffer return components = { 'Modulus' : key.n, 'Exponent': key.e, } if key.has_private(): decoded = DerSequence() decoded.decode(key.export_key('DER')) it = itertools.islice(decoded, 3, None) for v in ('D', 'P', 'Q', 'DP', 'DQ', 'InverseQ'): try: components[v] = next(it) except StopIteration: break if out is RSAFormat.XKMS: for tag in components: components[tag] = base64.b64encode(number.long_to_bytes(components[tag])).decode('ascii') tags = '\n'.join(F'\t<{tag}>{value}</{tag}>' for tag, value in components.items()) yield F'<RSAKeyPair>\n{tags}\n</RSAKeyPair>'.encode(self.codec) return components['BitSize'] = key.n.bit_length() for tag, value in components.items(): if value.bit_length() > 32: components[tag] = F'{value:X}' if out is RSAFormat.JSON: yield json.dumps(components, indent=4).encode(self.codec) return if out is RSAFormat.TEXT: table = list(flattened(components)) for key, value in table: value = F'0x{value}' if isinstance(value, str) else str(value) value = '\n'.join(F'{L}' for L in textwrap.wrap(value, 80)) yield F'-- {key + " ":-<77}\n{value!s}'.encode(self.codec)
Ancestors
Subclasses
Class variables
var required_dependencies
var optional_dependencies
var console
var reverse
Inherited members