Completed
Push — master ( aba28f...22db48 )
by Andrea
01:05
created

MetadataParser.get_certstats()   A

Complexity

Conditions 3

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 15
rs 9.4285
cc 3
1
#################################################################
2
# MET v2 Metadate Explorer Tool
3
#
4
# This Software is Open Source. See License: https://github.com/TERENA/met/blob/master/LICENSE.md
5
# Copyright (c) 2012, TERENA All rights reserved.
6
#
7
# This Software is based on MET v1 developed for TERENA by Yaco Sistemas, http://www.yaco.es/
8
# MET v2 was developed for TERENA by Tamim Ziai, DAASI International GmbH, http://www.daasi.de
9
# Current version of MET has been revised for performance improvements by Andrea Biancini,
10
# Consortium GARR, http://www.garr.it
11
#########################################################################################
12
13
from lxml import etree
14
from cryptography import x509
15
from cryptography.hazmat.backends import default_backend
16
import simplejson as json
17
18
NAMESPACES = {
19
    'xml': 'http://www.w3.org/XML/1998/namespace',
20
    'xs': 'xs="http://www.w3.org/2001/XMLSchema',
21
    'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
22
    'md': 'urn:oasis:names:tc:SAML:2.0:metadata',
23
    'mdui': 'urn:oasis:names:tc:SAML:metadata:ui',
24
    'ds': 'http://www.w3.org/2000/09/xmldsig#',
25
    'saml': 'urn:oasis:names:tc:SAML:2.0:assertion',
26
    'samlp': 'urn:oasis:names:tc:SAML:2.0:protocol',
27
    'mdrpi': 'urn:oasis:names:tc:SAML:metadata:rpi',
28
    'shibmd': 'urn:mace:shibboleth:metadata:1.0',
29
    'mdattr': 'urn:oasis:names:tc:SAML:metadata:attribute',
30
    }
31
32
SAML_METADATA_NAMESPACE = NAMESPACES['md']
33
34
XML_NAMESPACE = NAMESPACES['xml']
35
XMLDSIG_NAMESPACE = NAMESPACES['ds']
36
MDUI_NAMESPACE = NAMESPACES['mdui']
37
38
DESCRIPTOR_TYPES = ('IDPSSODescriptor', 'SPSSODescriptor', 'AASSODescriptor')
39
DESCRIPTOR_TYPES_DISPLAY = {}
40
for item in DESCRIPTOR_TYPES:
41
    DESCRIPTOR_TYPES_DISPLAY[item] = item.replace('SSODescriptor', '')
42
43
DESCRIPTOR_TYPES_UTIL = ["md:%s" % item for item in DESCRIPTOR_TYPES]
44
45
46
def addns(node_name, namespace=SAML_METADATA_NAMESPACE):
47
    '''Return a node name qualified with the XML namespace'''
48
    return '{' + namespace + '}' + node_name
49
50
51
def delns(node, namespace=SAML_METADATA_NAMESPACE):
52
    return node.replace('{' + namespace + '}', '')
53
54
55
def getlang(node):
56
    if 'lang' in node.attrib:
57
        return node.attrib['lang']
58
    elif addns('lang', NAMESPACES['xml']) in node.attrib:
59
        return node.attrib[addns('lang', NAMESPACES['xml'])]
60
61
FEDERATION_ROOT_TAG = addns('EntitiesDescriptor')
62
ENTITY_ROOT_TAG = addns('EntityDescriptor')
63
64
65
class MetadataParser(object):
66
    def __init__(self, filename=None):
67
        if filename is None:
68
            raise ValueError('filename is required')
69
70
        self.filename = filename
71
        with open(filename, 'r') as myfile:
72
            data = myfile.read().replace('\n', '')
73
        self.rootelem = etree.fromstring(data)
74
        self.file_id = self.rootelem.get('ID', None)
75
        self.is_federation = self.rootelem.tag == FEDERATION_ROOT_TAG
76
        self.is_entity = not self.is_federation
77
78
    @staticmethod
79
    def _get_entity_details(element):
80
        entity = {}
81
82
        entity['xml'] = etree.tostring(element, pretty_print=True)
83
84
        entity['description'] = MetadataParser.entity_description(element)
85
        entity['infoUrl'] = MetadataParser.entity_information_url(element)
86
        entity['privacyUrl'] = MetadataParser.entity_privacy_url(element)
87
        entity['organization'] = MetadataParser.entity_organization(element)
88
        entity['logos'] = MetadataParser.entity_logos(element)
89
        entity['scopes'] = MetadataParser.entity_attribute_scope(element)
90
        entity['attr_requested'] = MetadataParser.entity_requested_attributes(element)
91
        entity['contacts'] = MetadataParser.entity_contacts(element)
92
        entity['registration_policy'] = MetadataParser.registration_policy(element)
93
        entity['certstats'] = MetadataParser.get_certstats(element)
94
95
        return entity
96
97
    @staticmethod
98
    def _entity_lang_seen(entity):
99
        languages = set()
100
        for key in ['description', 'infoUrl', 'privacyUrl', 'organization', 'displayName']:
101
            if key in entity.keys() and entity[key]:
102
                languages |= set(entity[key].keys())
103
104
        return languages
105
106
    @staticmethod
107
    def _get_entity_by_id(context, entityid, details):
108
        for _, element in context:
109
            if element.attrib['entityID'] == entityid:
110
                entity = {}
111
112
                entity['entityid'] = entityid
113
                entity['file_id'] = element.get('ID', None)
114
                entity['displayName'] = MetadataParser.entity_displayname(element)
115
                reg_info = MetadataParser.registration_information(element)
116
                if reg_info and 'authority' in reg_info:
117
                   entity['registration_authority'] = reg_info['authority']
118
                if reg_info and 'instant' in reg_info:
119
                   entity['registration_instant'] = reg_info['instant']
120
                entity['entity_categories'] = MetadataParser.entity_categories(element)
121
                entity['entity_types'] = MetadataParser.entity_types(element)
122
                entity['protocols'] = MetadataParser.entity_protocols(element, entity['entity_types'])
123
124
                if details:
125
                    entity_details = MetadataParser._get_entity_details(element)
126
                    entity.update(entity_details)
127
                    entity = dict((k, v) for k, v in entity.iteritems() if v)
128
129
                entity['languages'] = MetadataParser._entity_lang_seen(entity)
130
                yield entity
131
132
            element.clear()
133
            while element.getprevious() is not None:
134
                del element.getparent()[0]
135
        del context
136
137
    def get_federation(self):
138
        assert self.is_federation
139
140
        federation = {}
141
        federation['ID'] = self.rootelem.get('ID', None)
142
        federation['Name'] = self.rootelem.get('Name', None)
143
144
        return federation
145
146
    @staticmethod
147
    def _chunkstring(string, length):
148
        return (string[0+i:length+i] for i in range(0, len(string), length))
149
150
    def get_entity(self, entityid, details=True):
151
        context = etree.iterparse(self.filename, tag=addns('EntityDescriptor'), events=('end',), huge_tree=True, remove_blank_text=True)
152
        element = None
153
        for element in MetadataParser._get_entity_by_id(context, entityid, details):
154
            return element
155
156
        raise ValueError("Entity not found: %s" % entityid)
157
158
    def entity_exist(self, entityid):
159
        entity_xpath = self.rootelem.xpath("//md:EntityDescriptor[@entityID='%s']"
160
                                           % entityid, namespaces=NAMESPACES)
161
        return len(entity_xpath) > 0
162
163
    @staticmethod
164
    def _get_entities_id(context):
165
        for _, element in context:
166
            yield element.attrib['entityID']
167
            element.clear()
168
            while element.getprevious() is not None:
169
                del element.getparent()[0]
170
        del context
171
172
    def get_entities(self):
173
        # Return entityid list
174
        context = etree.iterparse(self.filename, tag=addns('EntityDescriptor'), events=('end',), huge_tree=True, remove_blank_text=True)
175
        return list(self._get_entities_id(context))
176
177
    @staticmethod
178
    def entity_types(entity):
179
        expression = "|".join([desc for desc in DESCRIPTOR_TYPES_UTIL])
180
        elements = entity.xpath(expression, namespaces=NAMESPACES)
181
        types = [element.tag.split("}")[1] for element in elements]
182
        if len(types) == 0:
183
            types = ['AASSODescriptor']
184
        return types
185
186
    @staticmethod
187
    def entity_categories(entity):
188
        elements = entity.xpath(".//mdattr:EntityAttributes"
189
                                "//saml:Attribute[@Name='http://macedir.org/entity-category-support' or @Name='http://macedir.org/entity-category']"
190
                                "//saml:AttributeValue",
191
                                namespaces=NAMESPACES)
192
        categories = [dnnode.text.strip() for dnnode in elements]
193
        return categories
194
195
    @staticmethod
196
    def entity_protocols(entity, entity_types):
197
        if isinstance(entity_types, list) and len(entity_types) > 0:
198
            e_type = entity_types[0]
199
        else:
200
            e_type = 'IDPSSODescriptor'
201
202
        raw_protocols = entity.xpath(".//md:%s"
203
                                     "/@protocolSupportEnumeration" % e_type,
204
                                     namespaces=NAMESPACES)
205
        if raw_protocols:
206
            protocols = raw_protocols[0]
207
            return protocols.split(' ')
208
209
        return []
210
211
    @staticmethod
212
    def get_certstats(element):
213
        hashes = {}
214
215
        for x in element.xpath(".//ds:X509Certificate", namespaces=NAMESPACES):
216
            text = x.text.replace("\n", "").replace(" ", "").replace("\t", "")
217
            text = "\n".join(MetadataParser._chunkstring(text, 64))
218
            certText = "\n".join(["-----BEGIN CERTIFICATE-----", text, '-----END CERTIFICATE-----'])
219
            cert = x509.load_pem_x509_certificate(certText, default_backend())
220
            certName = cert.signature_hash_algorithm.name
221
            if certName not in hashes:
222
                hashes[certName] = 0
223
            hashes[certName] += 1
224
225
        return json.dumps(hashes)
226
227
    @staticmethod
228
    def entity_displayname(entity):
229
        languages = {}
230
231
        names = entity.xpath(".//mdui:UIInfo"
232
                             "//mdui:DisplayName",
233
                             namespaces=NAMESPACES)
234
235
        for dn_node in names:
236
            lang = getlang(dn_node)
237
            languages[lang] = dn_node.text
238
239
        if None in languages.keys():
240
            del languages[None]
241
        return languages
242
243
    @staticmethod
244
    def entity_description(entity):
245
        languages = {}
246
247
        names = entity.xpath(".//mdui:UIInfo"
248
                             "//mdui:Description",
249
                             namespaces=NAMESPACES)
250
251
        for dn_node in names:
252
            lang = getlang(dn_node)
253
            languages[lang] = dn_node.text
254
255
        if None in languages.keys():
256
            del languages[None]
257
        return languages
258
259
    @staticmethod
260
    def entity_information_url(entity):
261
        languages = {}
262
263
        names = entity.xpath(".//mdui:UIInfo"
264
                             "//mdui:InformationURL",
265
                             namespaces=NAMESPACES)
266
267
        for dn_node in names:
268
            lang = getlang(dn_node)
269
            languages[lang] = dn_node.text
270
271
        if None in languages.keys():
272
            del languages[None]
273
        return languages
274
275
    @staticmethod
276
    def entity_privacy_url(entity):
277
        languages = {}
278
279
        names = entity.xpath(".//mdui:UIInfo"
280
                             "//mdui:PrivacyStatementURL",
281
                             namespaces=NAMESPACES)
282
283
        for dn_node in names:
284
            lang = getlang(dn_node)
285
            languages[lang] = dn_node.text
286
287
        if None in languages.keys():
288
            del languages[None]
289
        return languages
290
 
291
    @staticmethod
292
    def entity_organization(entity):
293
        orgs = entity.xpath(".//md:Organization",
294
                            namespaces=NAMESPACES)
295
        languages = {}
296
        for org_node in orgs:
297
            for attr in 'name', 'displayName', 'URL':
298
                node_name = 'Organization' + attr[0].upper() + attr[1:]
299
                for node in org_node.findall(addns(node_name)):
300
                    lang = getlang(node)
301
                    lang_dict = languages.setdefault(lang, {})
302
                    lang_dict[attr] = node.text
303
304
        if None in languages.keys():
305
            del languages[None]
306
        return languages
307
308
    @staticmethod
309
    def entity_logos(entity):
310
        xmllogos = entity.xpath(".//mdui:UIInfo"
311
                                "//mdui:Logo",
312
                                namespaces=NAMESPACES)
313
        logos = []
314
        for logo_node in xmllogos:
315
            if logo_node.text is None:
316
                continue  # the file attribute is required
317
            logo = {}
318
            logo['width'] = int(logo_node.attrib.get('width', '0'))
319
            logo['height'] = int(logo_node.attrib.get('height', '0'))
320
            logo['file'] = logo_node.text
321
            logo['lang'] = getlang(logo_node)
322
            logos.append(logo)
323
        return logos
324
325
    @staticmethod
326
    def registration_information(entity):
327
        reg_info = entity.xpath(".//md:Extensions"
328
                                "//mdrpi:RegistrationInfo",
329
                                namespaces=NAMESPACES)
330
        info = {}
331
        if reg_info:
332
            info['authority'] = reg_info[0].attrib.get('registrationAuthority')
333
            info['instant'] = reg_info[0].attrib.get('registrationInstant')
334
        return info
335
336
    @staticmethod
337
    def registration_policy(entity):
338
        reg_policy = entity.xpath(".//md:Extensions"
339
                                "//mdrpi:RegistrationInfo"
340
                                "//mdrpi:RegistrationPolicy",
341
                                namespaces=NAMESPACES)
342
        languages = {}
343
        for dn_node in reg_policy:
344
            lang = getlang(dn_node)
345
            if lang is None:
346
                continue  # the lang attribute is required
347
348
            languages[lang] = dn_node.text
349
350
        return languages
351
352
    @staticmethod
353
    def entity_attribute_scope(entity):
354
        scope_node = entity.xpath(".//md:Extensions"
355
                                  "//shibmd:Scope",
356
                                  namespaces=NAMESPACES)
357
358
        scope = []
359
        for cur_scope in scope_node:
360
            if not cur_scope.text in scope:
361
                scope.append(cur_scope.text)
362
        return scope
363
364
    @staticmethod
365
    def entity_requested_attributes(entity):
366
        xmllogos = entity.xpath(".//md:AttributeConsumingService"
367
                                "//md:RequestedAttribute",
368
                                namespaces=NAMESPACES)
369
        attrs = {}
370
        attrs['required'] = []
371
        attrs['optional'] = []
372
        for attr_node in xmllogos:
373
            required = attr_node.attrib.get('isRequired', 'false')
374
            index = 'required' if required == 'true' else 'optional'
375
            attrs[index].append([attr_node.attrib.get('Name', None), attr_node.attrib.get('FriendlyName', None)])
376
        return attrs
377
378
    @staticmethod
379
    def entity_contacts(entity):
380
        contacts = entity.xpath(".//md:ContactPerson",
381
                                namespaces=NAMESPACES)
382
        cont = []
383
        for cont_node in contacts:
384
            c_type = cont_node.attrib.get('contactType', '')
385
            name = cont_node.xpath(".//md:GivenName", namespaces=NAMESPACES)
386
            if name:
387
                name = name[0].text
388
            else:
389
                name = None
390
            surname = cont_node.xpath(".//md:SurName", namespaces=NAMESPACES)
391
            if surname:
392
                surname = surname[0].text
393
            else:
394
                surname = None
395
            email = cont_node.xpath(".//md:EmailAddress", namespaces=NAMESPACES)
396
            if email:
397
                email = email[0].text
398
            else:
399
                email = None
400
            cont.append({ 'type': c_type, 'name': name, 'surname': surname, 'email': email })
401
        return cont
402