Completed
Push — master ( 13b617...40e09b )
by Andrea
01:18
created

MetadataParser.entity_categories()   A

Complexity

Conditions 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 8
rs 9.4285
cc 2
1
#################################################################
2
# MET v2 Metadate Explorer Tool
3
#
4
# This Software is Open Source. See License: https://github.com/TERENA/met/blob/master/LICENSE.md
5
# Copyright (c) 2012, TERENA All rights reserved.
6
#
7
# This Software is based on MET v1 developed for TERENA by Yaco Sistemas, http://www.yaco.es/
8
# MET v2 was developed for TERENA by Tamim Ziai, DAASI International GmbH, http://www.daasi.de
9
# Current version of MET has been revised for performance improvements by Andrea Biancini,
10
# Consortium GARR, http://www.garr.it
11
#########################################################################################
12
13
from lxml import etree
14
15
NAMESPACES = {
16
    'xml': 'http://www.w3.org/XML/1998/namespace',
17
    'xs': 'xs="http://www.w3.org/2001/XMLSchema',
18
    'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
19
    'md': 'urn:oasis:names:tc:SAML:2.0:metadata',
20
    'mdui': 'urn:oasis:names:tc:SAML:metadata:ui',
21
    'ds': 'http://www.w3.org/2000/09/xmldsig#',
22
    'saml': 'urn:oasis:names:tc:SAML:2.0:assertion',
23
    'samlp': 'urn:oasis:names:tc:SAML:2.0:protocol',
24
    'mdrpi': 'urn:oasis:names:tc:SAML:metadata:rpi',
25
    'shibmd': 'urn:mace:shibboleth:metadata:1.0',
26
    'mdattr': 'urn:oasis:names:tc:SAML:metadata:attribute',
27
    }
28
29
SAML_METADATA_NAMESPACE = NAMESPACES['md']
30
31
XML_NAMESPACE = NAMESPACES['xml']
32
XMLDSIG_NAMESPACE = NAMESPACES['ds']
33
MDUI_NAMESPACE = NAMESPACES['mdui']
34
35
DESCRIPTOR_TYPES = ('IDPSSODescriptor', 'SPSSODescriptor', 'AASSODescriptor')
36
DESCRIPTOR_TYPES_DISPLAY = {}
37
for item in DESCRIPTOR_TYPES:
38
    DESCRIPTOR_TYPES_DISPLAY[item] = item.replace('SSODescriptor', '')
39
40
DESCRIPTOR_TYPES_UTIL = ["md:%s" % item for item in DESCRIPTOR_TYPES]
41
42
43
def addns(node_name, namespace=SAML_METADATA_NAMESPACE):
44
    '''Return a node name qualified with the XML namespace'''
45
    return '{' + namespace + '}' + node_name
46
47
48
def delns(node, namespace=SAML_METADATA_NAMESPACE):
49
    return node.replace('{' + namespace + '}', '')
50
51
52
def getlang(node):
53
    if 'lang' in node.attrib:
54
        return node.attrib['lang']
55
    elif addns('lang', NAMESPACES['xml']) in node.attrib:
56
        return node.attrib[addns('lang', NAMESPACES['xml'])]
57
58
FEDERATION_ROOT_TAG = addns('EntitiesDescriptor')
59
ENTITY_ROOT_TAG = addns('EntityDescriptor')
60
61
62
class MetadataParser(object):
63
    def __init__(self, filename=None):
64
        if filename is None:
65
            raise ValueError('filename is required')
66
67
        self.filename = filename
68
        context = etree.iterparse(self.filename, events=('start',), huge_tree=True, remove_blank_text=True)
69
        context = iter(context)
70
        _, self.rootelem = context.next()
71
        self.file_id = self.rootelem.get('ID', None)
72
        self.is_federation = self.rootelem.tag == FEDERATION_ROOT_TAG
73
        self.is_entity = not self.is_federation
74
75
    @staticmethod
76
    def _get_entity_details(element):
77
        entity = {}
78
79
        entity['xml'] = etree.tostring(element, pretty_print=True)
80
81
        entity['description'] = MetadataParser.entity_description(element)
82
        entity['infoUrl'] = MetadataParser.entity_information_url(element)
83
        entity['privacyUrl'] = MetadataParser.entity_privacy_url(element)
84
        entity['organization'] = MetadataParser.entity_organization(element)
85
        entity['logos'] = MetadataParser.entity_logos(element)
86
        entity['scopes'] = MetadataParser.entity_attribute_scope(element)
87
        entity['attr_requested'] = MetadataParser.entity_requested_attributes(element)
88
        entity['contacts'] = MetadataParser.entity_contacts(element)
89
        entity['registration_policy'] = MetadataParser.registration_policy(element)
90
91
        return entity
92
93
    @staticmethod
94
    def _entity_lang_seen(entity):
95
        languages = set()
96
        for key in ['description', 'infoUrl', 'privacyUrl', 'organization', 'displayName']:
97
            if key in entity.keys() and entity[key]:
98
                languages |= set(entity[key].keys())
99
100
        return languages
101
102
    @staticmethod
103
    def _get_entity_by_id(context, entityid, details):
104
        for _, element in context:
105
            if element.attrib['entityID'] == entityid:
106
                entity = {}
107
108
                entity['entityid'] = entityid
109
                entity['file_id'] = element.get('ID', None)
110
                entity['displayName'] = MetadataParser.entity_displayname(element)
111
                reg_info = MetadataParser.registration_information(element)
112
                if reg_info and 'authority' in reg_info:
113
                   entity['registration_authority'] = reg_info['authority']
114
                if reg_info and 'instant' in reg_info:
115
                   entity['registration_instant'] = reg_info['instant']
116
                entity['entity_categories'] = MetadataParser.entity_categories(element)
117
                entity['entity_types'] = MetadataParser.entity_types(element)
118
                entity['protocols'] = MetadataParser.entity_protocols(element, entity['entity_types'])
119
120
                if details:
121
                    entity_details = MetadataParser._get_entity_details(element)
122
                    entity.update(entity_details)
123
                    entity = dict((k, v) for k, v in entity.iteritems() if v)
124
125
                entity['languages'] = MetadataParser._entity_lang_seen(entity)
126
                yield entity
127
128
            element.clear()
129
            while element.getprevious() is not None:
130
                del element.getparent()[0]
131
        del context
132
133
    def get_federation(self):
134
        assert self.is_federation
135
136
        federation = {}
137
        federation['ID'] = self.rootelem.get('ID', None)
138
        federation['Name'] = self.rootelem.get('Name', None)
139
140
        return federation
141
142
    def get_entity(self, entityid, details=True):
143
        context = etree.iterparse(self.filename, tag=addns('EntityDescriptor'), events=('end',), huge_tree=True, remove_blank_text=True)
144
        element = None
145
        for element in MetadataParser._get_entity_by_id(context, entityid, details):
146
            return element
147
148
        raise ValueError("Entity not found: %s" % entityid)
149
150
    def entity_exist(self, entityid):
151
        entity_xpath = self.rootelem.xpath("//md:EntityDescriptor[@entityID='%s']"
152
                                         % entityid, namespaces=NAMESPACES)
153
        return len(entity_xpath) > 0
154
155
    @staticmethod
156
    def _get_entities_id(context):
157
        for _, element in context:
158
            yield element.attrib['entityID']
159
            element.clear()
160
            while element.getprevious() is not None:
161
                del element.getparent()[0]
162
        del context
163
164
    def get_entities(self):
165
        # Return entityid list
166
        context = etree.iterparse(self.filename, tag=addns('EntityDescriptor'), events=('end',), huge_tree=True, remove_blank_text=True)
167
        return list(self._get_entities_id(context))
168
169
    @staticmethod
170
    def entity_types(entity):
171
        expression = "|".join([desc for desc in DESCRIPTOR_TYPES_UTIL])
172
        elements = entity.xpath(expression, namespaces=NAMESPACES)
173
        types = [element.tag.split("}")[1] for element in elements]
174
        if len(types) == 0:
175
            types = ['AASSODescriptor']
176
        return types
177
178
    @staticmethod
179
    def entity_categories(entity):
180
        elements = entity.xpath(".//mdattr:EntityAttributes"
181
                                "//saml:Attribute[@Name='http://macedir.org/entity-category-support' or @Name='http://macedir.org/entity-category']"
182
                                "//saml:AttributeValue",
183
                                namespaces=NAMESPACES)
184
        categories = [dnnode.text.strip() for dnnode in elements]
185
        return categories
186
187
    @staticmethod
188
    def entity_protocols(entity, entity_types):
189
        if isinstance(entity_types, list) and len(entity_types) > 0:
190
            e_type = entity_types[0]
191
        else:
192
            e_type = 'IDPSSODescriptor'
193
194
        raw_protocols = entity.xpath(".//md:%s"
195
                                     "/@protocolSupportEnumeration" % e_type,
196
                                     namespaces=NAMESPACES)
197
        if raw_protocols:
198
            protocols = raw_protocols[0]
199
            return protocols.split(' ')
200
201
        return []
202
203
    @staticmethod
204
    def entity_displayname(entity):
205
        languages = {}
206
207
        names = entity.xpath(".//mdui:UIInfo"
208
                             "//mdui:DisplayName",
209
                             namespaces=NAMESPACES)
210
211
        for dn_node in names:
212
            lang = getlang(dn_node)
213
            languages[lang] = dn_node.text
214
215
        if None in languages.keys():
216
            del languages[None]
217
        return languages
218
219
    @staticmethod
220
    def entity_description(entity):
221
        languages = {}
222
223
        names = entity.xpath(".//mdui:UIInfo"
224
                             "//mdui:Description",
225
                             namespaces=NAMESPACES)
226
227
        for dn_node in names:
228
            lang = getlang(dn_node)
229
            languages[lang] = dn_node.text
230
231
        if None in languages.keys():
232
            del languages[None]
233
        return languages
234
235
    @staticmethod
236
    def entity_information_url(entity):
237
        languages = {}
238
239
        names = entity.xpath(".//mdui:UIInfo"
240
                             "//mdui:InformationURL",
241
                             namespaces=NAMESPACES)
242
243
        for dn_node in names:
244
            lang = getlang(dn_node)
245
            languages[lang] = dn_node.text
246
247
        if None in languages.keys():
248
            del languages[None]
249
        return languages
250
251
    @staticmethod
252
    def entity_privacy_url(entity):
253
        languages = {}
254
255
        names = entity.xpath(".//mdui:UIInfo"
256
                             "//mdui:PrivacyStatementURL",
257
                             namespaces=NAMESPACES)
258
259
        for dn_node in names:
260
            lang = getlang(dn_node)
261
            languages[lang] = dn_node.text
262
263
        if None in languages.keys():
264
            del languages[None]
265
        return languages
266
 
267
    @staticmethod
268
    def entity_organization(entity):
269
        orgs = entity.xpath(".//md:Organization",
270
                            namespaces=NAMESPACES)
271
        languages = {}
272
        for org_node in orgs:
273
            for attr in 'name', 'displayName', 'URL':
274
                node_name = 'Organization' + attr[0].upper() + attr[1:]
275
                for node in org_node.findall(addns(node_name)):
276
                    lang = getlang(node)
277
                    lang_dict = languages.setdefault(lang, {})
278
                    lang_dict[attr] = node.text
279
280
        if None in languages.keys():
281
            del languages[None]
282
        return languages
283
284
    @staticmethod
285
    def entity_logos(entity):
286
        xmllogos = entity.xpath(".//mdui:UIInfo"
287
                                "//mdui:Logo",
288
                                namespaces=NAMESPACES)
289
        logos = []
290
        for logo_node in xmllogos:
291
            if logo_node.text is None:
292
                continue  # the file attribute is required
293
            logo = {}
294
            logo['width'] = int(logo_node.attrib.get('width', '0'))
295
            logo['height'] = int(logo_node.attrib.get('height', '0'))
296
            logo['file'] = logo_node.text
297
            logo['lang'] = getlang(logo_node)
298
            logos.append(logo)
299
        return logos
300
301
    @staticmethod
302
    def registration_information(entity):
303
        reg_info = entity.xpath(".//md:Extensions"
304
                                "//mdrpi:RegistrationInfo",
305
                                namespaces=NAMESPACES)
306
        info = {}
307
        if reg_info:
308
            info['authority'] = reg_info[0].attrib.get('registrationAuthority')
309
            info['instant'] = reg_info[0].attrib.get('registrationInstant')
310
        return info
311
312
    @staticmethod
313
    def registration_policy(entity):
314
        reg_policy = entity.xpath(".//md:Extensions"
315
                                "//mdrpi:RegistrationInfo"
316
                                "//mdrpi:RegistrationPolicy",
317
                                namespaces=NAMESPACES)
318
        languages = {}
319
        for dn_node in reg_policy:
320
            lang = getlang(dn_node)
321
            if lang is None:
322
                continue  # the lang attribute is required
323
324
            languages[lang] = dn_node.text
325
326
        return languages
327
328
    @staticmethod
329
    def entity_attribute_scope(entity):
330
        scope_node = entity.xpath(".//md:Extensions"
331
                                  "//shibmd:Scope",
332
                                  namespaces=NAMESPACES)
333
334
        scope = []
335
        for cur_scope in scope_node:
336
            if not cur_scope.text in scope:
337
                scope.append(cur_scope.text)
338
        return scope
339
340
    @staticmethod
341
    def entity_requested_attributes(entity):
342
        xmllogos = entity.xpath(".//md:AttributeConsumingService"
343
                                "//md:RequestedAttribute",
344
                                namespaces=NAMESPACES)
345
        attrs = {}
346
        attrs['required'] = []
347
        attrs['optional'] = []
348
        for attr_node in xmllogos:
349
            required = attr_node.attrib.get('isRequired', 'false')
350
            index = 'required' if required == 'true' else 'optional'
351
            attrs[index].append([attr_node.attrib.get('Name', None), attr_node.attrib.get('FriendlyName', None)])
352
        return attrs
353
354
    @staticmethod
355
    def entity_contacts(entity):
356
        contacts = entity.xpath(".//md:ContactPerson",
357
                                namespaces=NAMESPACES)
358
        cont = []
359
        for cont_node in contacts:
360
            c_type = cont_node.attrib.get('contactType', '')
361
            name = cont_node.xpath(".//md:GivenName", namespaces=NAMESPACES)
362
            if name:
363
                name = name[0].text
364
            else:
365
                name = None
366
            surname = cont_node.xpath(".//md:SurName", namespaces=NAMESPACES)
367
            if surname:
368
                surname = surname[0].text
369
            else:
370
                surname = None
371
            email = cont_node.xpath(".//md:EmailAddress", namespaces=NAMESPACES)
372
            if email:
373
                email = email[0].text
374
            else:
375
                email = None
376
            cont.append({ 'type': c_type, 'name': name, 'surname': surname, 'email': email })
377
        return cont
378