Module refinery.units.formats.pkcs7

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json

from contextlib import suppress
from datetime import datetime

from refinery.units import Unit
from refinery.lib.json import BytesAsArrayEncoder


class pkcs7(Unit):
    """
    Converts PKCS7 encoded data to a JSON representation.
    """
    @Unit.Requires('asn1crypto', 'default', 'extended')
    def _asn1crypto():
        import asn1crypto
        import asn1crypto.cms
        import asn1crypto.core
        import asn1crypto.x509
        return asn1crypto

    def process(self, data: bytes):
        asn1 = self._asn1crypto.core
        cms = self._asn1crypto.cms
        signature = cms.ContentInfo.load(data)

        def unsign(data):
            if isinstance(data, int):
                size = data.bit_length()
                if data < 0:
                    data = (1 << (size + 1)) - ~data - 1
                if data > 0xFFFFFFFF_FFFFFFFF:
                    size, r = divmod(size, 8)
                    size += bool(r)
                    data = data.to_bytes(size, 'big').hex()
                return data
            elif isinstance(data, dict):
                return {key: unsign(value) for key, value in data.items()}
            elif isinstance(data, list):
                return [unsign(x) for x in data]
            else:
                return data

        class SpcString(asn1.Choice):
            _alternatives = [
                ('unicode', asn1.BMPString, {'implicit': 0}),
                ('ascii', asn1.IA5String, {'implicit': 1})
            ]

        SpcUuid = asn1.OctetString

        class SpcSerializedObject(asn1.Sequence):
            _fields = [
                ('classId', SpcUuid),
                ('serializedData', asn1.OctetString),
            ]

        class SpcLink(asn1.Choice):
            _alternatives = [
                ('url', asn1.IA5String, {'implicit': 0}),
                ('monikier', SpcSerializedObject, {'implicit': 1}),
                ('file', SpcString, {'explicit': 2})
            ]

        class SpcSpOpusInfo(asn1.Sequence):
            _fields = [
                ('programName', SpcString, {'optional': True, 'explicit': 0}),
                ('moreInfo', SpcLink, {'optional': True, 'explicit': 1}),
            ]

        class SetOfInfos(asn1.SetOf):
            _child_spec = SpcSpOpusInfo

        cms.CMSAttributeType._map['1.3.6.1.4.1.311.2.1.12'] = 'authenticode_info'
        cms.CMSAttribute._oid_specs['authenticode_info'] = SetOfInfos

        class ParsedASN1ToJSON(BytesAsArrayEncoder):
            unit = self

            @classmethod
            def _is_keyval(cls, obj):
                return (
                    isinstance(obj, dict)
                    and set(obj.keys()) == {'type', 'values'}
                    and len(obj['values']) == 1
                )

            @classmethod
            def handled(cls, obj) -> bool:
                return BytesAsArrayEncoder.handled(obj) or cls._is_keyval(obj)

            def encode_bytes(self, obj: bytes):
                with suppress(Exception):
                    string = obj.decode('latin1')
                    if string.isprintable():
                        return string
                return super().encode_bytes(obj)

            def default(self, obj):
                if self._is_keyval(obj):
                    return dict(type=obj['type'], value=obj['values'][0])
                with suppress(TypeError):
                    return super().default(obj)
                if isinstance(obj, (set, tuple)):
                    return list(obj)
                if isinstance(obj, datetime):
                    return str(obj)
                dict_result = {}
                list_result = None
                if isinstance(obj, self.unit._asn1crypto.x509.Certificate):
                    dict_result.update(fingerprint=obj.sha1.hex())
                if isinstance(obj, asn1.BitString):
                    return {'bit_string': obj.native}
                with suppress(Exception):
                    list_result = list(obj)
                    if all(isinstance(k, str) for k in list_result):
                        dict_result.update((key, obj[key]) for key in list_result)
                if dict_result:
                    return dict_result
                if list_result is not None:
                    return list_result
                if isinstance(obj, self.unit._asn1crypto.cms.CertificateChoices):
                    return obj.chosen
                if isinstance(obj, asn1.Sequence):
                    children = obj.children
                    if children:
                        return children
                    return obj.dump()
                with suppress(Exception):
                    return obj.native
                if isinstance(obj, asn1.Any):
                    parsed = None
                    with suppress(Exception):
                        parsed = obj.parse()
                    if parsed:
                        return parsed
                    return obj.dump()
                if isinstance(obj, asn1.Asn1Value):
                    return obj.dump()
                raise ValueError(F'Unable to determine JSON encoding of {obj.__class__.__name__} object.')

        with ParsedASN1ToJSON as encoder:
            encoded = encoder.dumps(signature)
            converted = unsign(json.loads(encoded))
            return json.dumps(converted, indent=4).encode(self.codec)

Classes

class pkcs7

Converts PKCS7 encoded data to a JSON representation.

Expand source code Browse git
class pkcs7(Unit):
    """
    Converts PKCS7 encoded data to a JSON representation.
    """
    @Unit.Requires('asn1crypto', 'default', 'extended')
    def _asn1crypto():
        import asn1crypto
        import asn1crypto.cms
        import asn1crypto.core
        import asn1crypto.x509
        return asn1crypto

    def process(self, data: bytes):
        asn1 = self._asn1crypto.core
        cms = self._asn1crypto.cms
        signature = cms.ContentInfo.load(data)

        def unsign(data):
            if isinstance(data, int):
                size = data.bit_length()
                if data < 0:
                    data = (1 << (size + 1)) - ~data - 1
                if data > 0xFFFFFFFF_FFFFFFFF:
                    size, r = divmod(size, 8)
                    size += bool(r)
                    data = data.to_bytes(size, 'big').hex()
                return data
            elif isinstance(data, dict):
                return {key: unsign(value) for key, value in data.items()}
            elif isinstance(data, list):
                return [unsign(x) for x in data]
            else:
                return data

        class SpcString(asn1.Choice):
            _alternatives = [
                ('unicode', asn1.BMPString, {'implicit': 0}),
                ('ascii', asn1.IA5String, {'implicit': 1})
            ]

        SpcUuid = asn1.OctetString

        class SpcSerializedObject(asn1.Sequence):
            _fields = [
                ('classId', SpcUuid),
                ('serializedData', asn1.OctetString),
            ]

        class SpcLink(asn1.Choice):
            _alternatives = [
                ('url', asn1.IA5String, {'implicit': 0}),
                ('monikier', SpcSerializedObject, {'implicit': 1}),
                ('file', SpcString, {'explicit': 2})
            ]

        class SpcSpOpusInfo(asn1.Sequence):
            _fields = [
                ('programName', SpcString, {'optional': True, 'explicit': 0}),
                ('moreInfo', SpcLink, {'optional': True, 'explicit': 1}),
            ]

        class SetOfInfos(asn1.SetOf):
            _child_spec = SpcSpOpusInfo

        cms.CMSAttributeType._map['1.3.6.1.4.1.311.2.1.12'] = 'authenticode_info'
        cms.CMSAttribute._oid_specs['authenticode_info'] = SetOfInfos

        class ParsedASN1ToJSON(BytesAsArrayEncoder):
            unit = self

            @classmethod
            def _is_keyval(cls, obj):
                return (
                    isinstance(obj, dict)
                    and set(obj.keys()) == {'type', 'values'}
                    and len(obj['values']) == 1
                )

            @classmethod
            def handled(cls, obj) -> bool:
                return BytesAsArrayEncoder.handled(obj) or cls._is_keyval(obj)

            def encode_bytes(self, obj: bytes):
                with suppress(Exception):
                    string = obj.decode('latin1')
                    if string.isprintable():
                        return string
                return super().encode_bytes(obj)

            def default(self, obj):
                if self._is_keyval(obj):
                    return dict(type=obj['type'], value=obj['values'][0])
                with suppress(TypeError):
                    return super().default(obj)
                if isinstance(obj, (set, tuple)):
                    return list(obj)
                if isinstance(obj, datetime):
                    return str(obj)
                dict_result = {}
                list_result = None
                if isinstance(obj, self.unit._asn1crypto.x509.Certificate):
                    dict_result.update(fingerprint=obj.sha1.hex())
                if isinstance(obj, asn1.BitString):
                    return {'bit_string': obj.native}
                with suppress(Exception):
                    list_result = list(obj)
                    if all(isinstance(k, str) for k in list_result):
                        dict_result.update((key, obj[key]) for key in list_result)
                if dict_result:
                    return dict_result
                if list_result is not None:
                    return list_result
                if isinstance(obj, self.unit._asn1crypto.cms.CertificateChoices):
                    return obj.chosen
                if isinstance(obj, asn1.Sequence):
                    children = obj.children
                    if children:
                        return children
                    return obj.dump()
                with suppress(Exception):
                    return obj.native
                if isinstance(obj, asn1.Any):
                    parsed = None
                    with suppress(Exception):
                        parsed = obj.parse()
                    if parsed:
                        return parsed
                    return obj.dump()
                if isinstance(obj, asn1.Asn1Value):
                    return obj.dump()
                raise ValueError(F'Unable to determine JSON encoding of {obj.__class__.__name__} object.')

        with ParsedASN1ToJSON as encoder:
            encoded = encoder.dumps(signature)
            converted = unsign(json.loads(encoded))
            return json.dumps(converted, indent=4).encode(self.codec)

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Inherited members