Module refinery.units.crypto.cipher.fernet
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from datetime import datetime
from refinery.units import Unit, Arg
from refinery.units.crypto.cipher.aes import aes
from refinery.units.encoding.b64 import b64
from refinery.lib.structures import StructReader
from Cryptodome.Hash import HMAC, SHA256
class fernet(Unit):
"""
Decrypt Fernet messages.
"""
def __init__(self, key: Arg(help='A fernet key, either in base64 or raw binary.')):
super().__init__(key=key)
def _b64(self, data):
try:
return data | b64(urlsafe=True) | bytearray
except Exception:
return data
def process(self, data):
fk = self._b64(self.args.key)
if len(fk) != 32:
raise ValueError(F'The given Fernet key has length {len(fk)}, expected 32 bytes.')
signing_key = fk[:16]
encryption_key = fk[16:]
decoded = self._b64(data)
reader = StructReader(memoryview(decoded), bigendian=True)
signed_data = reader.peek(reader.remaining_bytes - 32)
version = reader.u8()
timestamp = datetime.fromtimestamp(reader.u64())
iv = reader.read(16)
if version != 0x80:
self.log_warn(F'The Fernet version is 0x{version:02X}, the only documented one is 0x80.')
ciphertext = reader.read(reader.remaining_bytes - 32)
if len(ciphertext) % 16 != 0:
raise ValueError('The encoded ciphertext is not 16-byte block aligned.')
signature = reader.read(32)
hmac = HMAC.new(signing_key, digestmod=SHA256)
hmac.update(signed_data)
if hmac.digest() != signature:
self.log_warn('HMAC verification failed; the message has been tampered with.')
self.log_info(F'computed signature: {hmac.hexdigest().upper()}')
self.log_info(F'provided signature: {signature.hex().upper()}')
plaintext = ciphertext | aes(mode='cbc', iv=iv, key=encryption_key) | bytearray
return self.labelled(plaintext, timestamp=timestamp.isoformat(' ', 'seconds'))
Classes
class fernet (key)
-
Decrypt Fernet messages.
Expand source code Browse git
class fernet(Unit): """ Decrypt Fernet messages. """ def __init__(self, key: Arg(help='A fernet key, either in base64 or raw binary.')): super().__init__(key=key) def _b64(self, data): try: return data | b64(urlsafe=True) | bytearray except Exception: return data def process(self, data): fk = self._b64(self.args.key) if len(fk) != 32: raise ValueError(F'The given Fernet key has length {len(fk)}, expected 32 bytes.') signing_key = fk[:16] encryption_key = fk[16:] decoded = self._b64(data) reader = StructReader(memoryview(decoded), bigendian=True) signed_data = reader.peek(reader.remaining_bytes - 32) version = reader.u8() timestamp = datetime.fromtimestamp(reader.u64()) iv = reader.read(16) if version != 0x80: self.log_warn(F'The Fernet version is 0x{version:02X}, the only documented one is 0x80.') ciphertext = reader.read(reader.remaining_bytes - 32) if len(ciphertext) % 16 != 0: raise ValueError('The encoded ciphertext is not 16-byte block aligned.') signature = reader.read(32) hmac = HMAC.new(signing_key, digestmod=SHA256) hmac.update(signed_data) if hmac.digest() != signature: self.log_warn('HMAC verification failed; the message has been tampered with.') self.log_info(F'computed signature: {hmac.hexdigest().upper()}') self.log_info(F'provided signature: {signature.hex().upper()}') plaintext = ciphertext | aes(mode='cbc', iv=iv, key=encryption_key) | bytearray return self.labelled(plaintext, timestamp=timestamp.isoformat(' ', 'seconds'))
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Inherited members