Module refinery.units.formats.pkcs7

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asn1crypto
import asn1crypto.cms
import asn1crypto.core
import asn1crypto.x509

from ...lib.json import BytesAsArrayEncoder

from contextlib import suppress
from datetime import datetime

from .. import Unit


class ParsedASN1ToJSON(BytesAsArrayEncoder):

    @classmethod
    def _is_keyval(cls, obj):
        return (
            isinstance(obj, dict)
            and set(obj.keys()) == {'type', 'values'}
            and len(obj['values']) == 1
        )

    @classmethod
    def handled(cls, obj) -> bool:
        return (
            BytesAsArrayEncoder.handled(obj)
            or cls._is_keyval(obj)
        )

    def default(self, obj):
        if self._is_keyval(obj):
            return dict(type=obj['type'], value=obj['values'][0])
        with suppress(TypeError):
            return super().default(obj)
        if isinstance(obj, (set, tuple)):
            return list(obj)
        if isinstance(obj, datetime):
            return str(obj)
        dictionary_result = {}
        if isinstance(obj, asn1crypto.x509.Certificate):
            dictionary_result.update(fingerprint=obj.sha1.hex())
        with suppress(Exception):
            keys = list(obj)
            if all(isinstance(k, str) for k in keys):
                dictionary_result.update((key, obj[key]) for key in keys)
        if dictionary_result:
            return dictionary_result
        with suppress(Exception):
            return list(obj)
        if isinstance(obj, asn1crypto.cms.CertificateChoices):
            return asn1crypto.x509.Certificate.load(obj.dump())
        with suppress(AttributeError, ValueError):
            return obj.native
        if isinstance(obj, asn1crypto.core.Any):
            return obj.dump()
        if isinstance(obj, asn1crypto.core.Asn1Value):
            return obj.dump()
        raise ValueError(F'Unable to determine JSON encoding of {obj.__class__.__name__} object.')


class pkcs7(Unit):
    """
    Converts PKCS7 encoded data to a JSON representation.
    """
    def process(self, data: bytes):
        signature = asn1crypto.cms.ContentInfo.load(data)
        with ParsedASN1ToJSON as encoder:
            return encoder.dumps(signature).encode(self.codec)

Classes

class ParsedASN1ToJSON (*args, **kwargs)

This JSON Encoder encodes byte strings as arrays of integers.

Constructor for JSONEncoder, with sensible defaults.

If skipkeys is false, then it is a TypeError to attempt encoding of keys that are not str, int, float or None. If skipkeys is True, such items are simply skipped.

If ensure_ascii is true, the output is guaranteed to be str objects with all incoming non-ASCII characters escaped. If ensure_ascii is false, the output can contain non-ASCII characters.

If check_circular is true, then lists, dicts, and custom encoded objects will be checked for circular references during encoding to prevent an infinite recursion (which would cause an OverflowError). Otherwise, no such check takes place.

If allow_nan is true, then NaN, Infinity, and -Infinity will be encoded as such. This behavior is not JSON specification compliant, but is consistent with most JavaScript based encoders and decoders. Otherwise, it will be a ValueError to encode such floats.

If sort_keys is true, then the output of dictionaries will be sorted by key; this is useful for regression tests to ensure that JSON serializations can be compared on a day-to-day basis.

If indent is a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation.

If specified, separators should be an (item_separator, key_separator) tuple. The default is (', ', ': ') if indent is None and (',', ': ') otherwise. To get the most compact JSON representation, you should specify (',', ':') to eliminate whitespace.

If specified, default is a function that gets called for objects that can't otherwise be serialized. It should return a JSON encodable version of the object or raise a TypeError.

Expand source code Browse git
class ParsedASN1ToJSON(BytesAsArrayEncoder):

    @classmethod
    def _is_keyval(cls, obj):
        return (
            isinstance(obj, dict)
            and set(obj.keys()) == {'type', 'values'}
            and len(obj['values']) == 1
        )

    @classmethod
    def handled(cls, obj) -> bool:
        return (
            BytesAsArrayEncoder.handled(obj)
            or cls._is_keyval(obj)
        )

    def default(self, obj):
        if self._is_keyval(obj):
            return dict(type=obj['type'], value=obj['values'][0])
        with suppress(TypeError):
            return super().default(obj)
        if isinstance(obj, (set, tuple)):
            return list(obj)
        if isinstance(obj, datetime):
            return str(obj)
        dictionary_result = {}
        if isinstance(obj, asn1crypto.x509.Certificate):
            dictionary_result.update(fingerprint=obj.sha1.hex())
        with suppress(Exception):
            keys = list(obj)
            if all(isinstance(k, str) for k in keys):
                dictionary_result.update((key, obj[key]) for key in keys)
        if dictionary_result:
            return dictionary_result
        with suppress(Exception):
            return list(obj)
        if isinstance(obj, asn1crypto.cms.CertificateChoices):
            return asn1crypto.x509.Certificate.load(obj.dump())
        with suppress(AttributeError, ValueError):
            return obj.native
        if isinstance(obj, asn1crypto.core.Any):
            return obj.dump()
        if isinstance(obj, asn1crypto.core.Asn1Value):
            return obj.dump()
        raise ValueError(F'Unable to determine JSON encoding of {obj.__class__.__name__} object.')

Ancestors

Inherited members

class pkcs7

Converts PKCS7 encoded data to a JSON representation.

Expand source code Browse git
class pkcs7(Unit):
    """
    Converts PKCS7 encoded data to a JSON representation.
    """
    def process(self, data: bytes):
        signature = asn1crypto.cms.ContentInfo.load(data)
        with ParsedASN1ToJSON as encoder:
            return encoder.dumps(signature).encode(self.codec)

Ancestors

Inherited members