Module refinery.units.formats.pkcs7

Expand source code Browse git
from __future__ import annotations

from contextlib import suppress

from refinery.lib import json
from refinery.lib.tools import convert
from refinery.units import Unit


class pkcs7(Unit):
    """
    Converts PKCS7 encoded data to a JSON representation.
    """
    @Unit.Requires('asn1crypto', ['default', 'extended'])
    def _asn1crypto():
        import asn1crypto
        import asn1crypto.cms
        import asn1crypto.core
        import asn1crypto.x509
        return asn1crypto

    def process(self, data):
        asn1 = self._asn1crypto.core
        cms = self._asn1crypto.cms
        x509 = self._asn1crypto.x509
        signature = cms.ContentInfo.load(convert(data, bytes))

        def unsign(data):
            if isinstance(data, int):
                size = data.bit_length()
                if data < 0:
                    data = (1 << (size + 1)) - ~data - 1
                if data > 0xFFFFFFFF_FFFFFFFF:
                    size, r = divmod(size, 8)
                    size += bool(r)
                    data = data.to_bytes(size, 'big').hex()
                return data
            elif isinstance(data, dict):
                for key in list(data):
                    data[key] = unsign(data[key])
            elif isinstance(data, list):
                return [unsign(x) for x in data]
            else:
                return data

        class SpcString(asn1.Choice):
            _alternatives = [
                ('unicode', asn1.BMPString, {'implicit': 0}),
                ('ascii', asn1.IA5String, {'implicit': 1})
            ]

        SpcUuid = asn1.OctetString

        class SpcSerializedObject(asn1.Sequence):
            _fields = [
                ('classId', SpcUuid),
                ('serializedData', asn1.OctetString),
            ]

        class SpcLink(asn1.Choice):
            _alternatives = [
                ('url', asn1.IA5String, {'implicit': 0}),
                ('monikier', SpcSerializedObject, {'implicit': 1}),
                ('file', SpcString, {'explicit': 2})
            ]

        class SpcSpOpusInfo(asn1.Sequence):
            _fields = [
                ('programName', SpcString, {'optional': True, 'explicit': 0}),
                ('moreInfo', SpcLink, {'optional': True, 'explicit': 1}),
            ]

        class SetOfInfos(asn1.SetOf):
            _child_spec = SpcSpOpusInfo

        cms.CMSAttributeType._map['1.3.6.1.4.1.311.2.1.12'] = 'authenticode_info'
        cms.CMSAttribute._oid_specs['authenticode_info'] = SetOfInfos

        def max64(v):
            if isinstance(v, int) and v.bit_length() > 64:
                return hex(v)
            return v

        def tojson(obj):
            if isinstance(obj, dict) and set(obj.keys()) == {'type', 'values'} and len(obj['values']) == 1:
                obj['value'] = obj.pop('values')[0]
                return obj

            dict_result = {}
            list_result = None

            if isinstance(obj, x509.Certificate):
                dict_result.update(fingerprint=obj.sha1.hex())
            if isinstance(obj, asn1.BitString):
                return {'bit_string': obj.native}
            with suppress(Exception):
                list_result = list(obj)
                if all(isinstance(k, str) for k in list_result):
                    dict_result.update((key, obj[key]) for key in list_result)
            if dict_result:
                return json.preprocess(dict_result)
            if list_result is not None:
                return json.preprocess(list_result)

            if isinstance(obj, cms.CertificateChoices):
                out = obj.chosen
            elif isinstance(obj, asn1.Sequence):
                if children := obj.children:
                    return children
                out = obj.dump()
            else:
                try:
                    out = getattr(obj, 'native')
                except Exception:
                    if isinstance(obj, asn1.Any):
                        parsed = None
                        with suppress(Exception):
                            parsed = obj.parse()
                        out = parsed or obj.dump()
                    elif isinstance(obj, asn1.Asn1Value):
                        out = obj.dump()
                    else:
                        return json.bytes_as_array(obj)
                else:
                    out = json.preprocess(out)

            return out

        return json.dumps(unsign(signature), tojson=tojson, pretty=False)

Classes

class pkcs7

Converts PKCS7 encoded data to a JSON representation.

Expand source code Browse git
class pkcs7(Unit):
    """
    Converts PKCS7 encoded data to a JSON representation.
    """
    @Unit.Requires('asn1crypto', ['default', 'extended'])
    def _asn1crypto():
        import asn1crypto
        import asn1crypto.cms
        import asn1crypto.core
        import asn1crypto.x509
        return asn1crypto

    def process(self, data):
        asn1 = self._asn1crypto.core
        cms = self._asn1crypto.cms
        x509 = self._asn1crypto.x509
        signature = cms.ContentInfo.load(convert(data, bytes))

        def unsign(data):
            if isinstance(data, int):
                size = data.bit_length()
                if data < 0:
                    data = (1 << (size + 1)) - ~data - 1
                if data > 0xFFFFFFFF_FFFFFFFF:
                    size, r = divmod(size, 8)
                    size += bool(r)
                    data = data.to_bytes(size, 'big').hex()
                return data
            elif isinstance(data, dict):
                for key in list(data):
                    data[key] = unsign(data[key])
            elif isinstance(data, list):
                return [unsign(x) for x in data]
            else:
                return data

        class SpcString(asn1.Choice):
            _alternatives = [
                ('unicode', asn1.BMPString, {'implicit': 0}),
                ('ascii', asn1.IA5String, {'implicit': 1})
            ]

        SpcUuid = asn1.OctetString

        class SpcSerializedObject(asn1.Sequence):
            _fields = [
                ('classId', SpcUuid),
                ('serializedData', asn1.OctetString),
            ]

        class SpcLink(asn1.Choice):
            _alternatives = [
                ('url', asn1.IA5String, {'implicit': 0}),
                ('monikier', SpcSerializedObject, {'implicit': 1}),
                ('file', SpcString, {'explicit': 2})
            ]

        class SpcSpOpusInfo(asn1.Sequence):
            _fields = [
                ('programName', SpcString, {'optional': True, 'explicit': 0}),
                ('moreInfo', SpcLink, {'optional': True, 'explicit': 1}),
            ]

        class SetOfInfos(asn1.SetOf):
            _child_spec = SpcSpOpusInfo

        cms.CMSAttributeType._map['1.3.6.1.4.1.311.2.1.12'] = 'authenticode_info'
        cms.CMSAttribute._oid_specs['authenticode_info'] = SetOfInfos

        def max64(v):
            if isinstance(v, int) and v.bit_length() > 64:
                return hex(v)
            return v

        def tojson(obj):
            if isinstance(obj, dict) and set(obj.keys()) == {'type', 'values'} and len(obj['values']) == 1:
                obj['value'] = obj.pop('values')[0]
                return obj

            dict_result = {}
            list_result = None

            if isinstance(obj, x509.Certificate):
                dict_result.update(fingerprint=obj.sha1.hex())
            if isinstance(obj, asn1.BitString):
                return {'bit_string': obj.native}
            with suppress(Exception):
                list_result = list(obj)
                if all(isinstance(k, str) for k in list_result):
                    dict_result.update((key, obj[key]) for key in list_result)
            if dict_result:
                return json.preprocess(dict_result)
            if list_result is not None:
                return json.preprocess(list_result)

            if isinstance(obj, cms.CertificateChoices):
                out = obj.chosen
            elif isinstance(obj, asn1.Sequence):
                if children := obj.children:
                    return children
                out = obj.dump()
            else:
                try:
                    out = getattr(obj, 'native')
                except Exception:
                    if isinstance(obj, asn1.Any):
                        parsed = None
                        with suppress(Exception):
                            parsed = obj.parse()
                        out = parsed or obj.dump()
                    elif isinstance(obj, asn1.Asn1Value):
                        out = obj.dump()
                    else:
                        return json.bytes_as_array(obj)
                else:
                    out = json.preprocess(out)

            return out

        return json.dumps(unsign(signature), tojson=tojson, pretty=False)

Ancestors

Subclasses

Class variables

var required_dependencies
var console
var reverse
var optional_dependencies

Inherited members