Completed
Push — master ( 28b0ba...17cdbf )
by Andrea
01:44
created

MetadataParser._get_entity_by_id()   F

Complexity

Conditions 11

Size

Total Lines 29

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 1 Features 0
Metric Value
c 3
b 1
f 0
dl 0
loc 29
rs 3.1764
cc 11

How to fix   Complexity   

Complexity

Complex classes like MetadataParser._get_entity_by_id() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#################################################################
2
# MET v2 Metadate Explorer Tool
3
#
4
# This Software is Open Source. See License: https://github.com/TERENA/met/blob/master/LICENSE.md
5
# Copyright (c) 2012, TERENA All rights reserved.
6
#
7
# This Software is based on MET v1 developed for TERENA by Yaco Sistemas, http://www.yaco.es/
8
# MET v2 was developed for TERENA by Tamim Ziai, DAASI International GmbH, http://www.daasi.de
9
# Current version of MET has been revised for performance improvements by Andrea Biancini,
10
# Consortium GARR, http://www.garr.it
11
#########################################################################################
12
13
from lxml import etree
14
15
NAMESPACES = {
16
    'xml': 'http://www.w3.org/XML/1998/namespace',
17
    'xs': 'xs="http://www.w3.org/2001/XMLSchema',
18
    'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
19
    'md': 'urn:oasis:names:tc:SAML:2.0:metadata',
20
    'mdui': 'urn:oasis:names:tc:SAML:metadata:ui',
21
    'ds': 'http://www.w3.org/2000/09/xmldsig#',
22
    'saml': 'urn:oasis:names:tc:SAML:2.0:assertion',
23
    'samlp': 'urn:oasis:names:tc:SAML:2.0:protocol',
24
    'mdrpi': 'urn:oasis:names:tc:SAML:metadata:rpi',
25
    'shibmd': 'urn:mace:shibboleth:metadata:1.0',
26
    }
27
28
SAML_METADATA_NAMESPACE = NAMESPACES['md']
29
30
XML_NAMESPACE = NAMESPACES['xml']
31
XMLDSIG_NAMESPACE = NAMESPACES['ds']
32
MDUI_NAMESPACE = NAMESPACES['mdui']
33
34
DESCRIPTOR_TYPES = ('IDPSSODescriptor', 'SPSSODescriptor',)
35
DESCRIPTOR_TYPES_DISPLAY = {}
36
for item in DESCRIPTOR_TYPES:
37
    DESCRIPTOR_TYPES_DISPLAY[item] = item.replace('SSODescriptor', '')
38
39
DESCRIPTOR_TYPES_UTIL = ["md:%s" % item for item in DESCRIPTOR_TYPES]
40
41
42
def addns(node_name, namespace=SAML_METADATA_NAMESPACE):
43
    '''Return a node name qualified with the XML namespace'''
44
    return '{' + namespace + '}' + node_name
45
46
47
def delns(node, namespace=SAML_METADATA_NAMESPACE):
48
    return node.replace('{' + namespace + '}', '')
49
50
51
def getlang(node):
52
    if 'lang' in node.attrib:
53
        return node.attrib['lang']
54
    elif addns('lang', NAMESPACES['xml']) in node.attrib:
55
        return node.attrib[addns('lang', NAMESPACES['xml'])]
56
57
FEDERATION_ROOT_TAG = addns('EntitiesDescriptor')
58
ENTITY_ROOT_TAG = addns('EntityDescriptor')
59
60
61
class MetadataParser(object):
62
    def __init__(self, filename=None):
63
        if filename is None:
64
            raise ValueError('filename is required')
65
66
        self.filename = filename
67
        context = etree.iterparse(self.filename, events=('start',), huge_tree=True, remove_blank_text=True)
68
        context = iter(context)
69
        event, self.rootelem = context.next()
70
        self.file_id = self.rootelem.get('ID', None)
71
        self.is_federation = self.rootelem.tag == FEDERATION_ROOT_TAG
72
        self.is_entity = not self.is_federation
73
74
    @staticmethod
75
    def _get_entity_details(element):
76
        entity = {}
77
78
        entity['xml'] = etree.tostring(element, pretty_print=True)
79
80
        entity['description'] = MetadataParser.entity_description(element)
81
        entity['infoUrl'] = MetadataParser.entity_information_url(element)
82
        entity['privacyUrl'] = MetadataParser.entity_privacy_url(element)
83
        entity['organization'] = MetadataParser.entity_organization(element)
84
        entity['logos'] = MetadataParser.entity_logos(element)
85
        entity['scopes'] = MetadataParser.entity_attribute_scope(element)
86
        entity['attr_requested'] = MetadataParser.entity_requested_attributes(element)
87
        entity['contacts'] = MetadataParser.entity_contacts(element)
88
        entity['registration_policy'] = MetadataParser.registration_policy(element)
89
90
        return entity
91
92
    @staticmethod
93
    def _entity_lang_seen(entity):
94
        languages = set()
95
        for key in ['description', 'infoUrl', 'privacyUrl', 'organization', 'displayName']:
96
            if key in entity.keys() and entity[key]:
97
                languages |= set(entity[key].keys())
98
99
        return languages
100
101
    @staticmethod
102
    def _get_entity_by_id(context, entityid, details):
103
        for event, element in context:
104
            if element.attrib['entityID'] == entityid:
105
                entity = {}
106
107
                entity['entityid'] = entityid
108
                entity['file_id'] = element.get('ID', None)
109
                entity['displayName'] = MetadataParser.entity_displayname(element)
110
                reg_info = MetadataParser.registration_information(element)
111
                if reg_info and 'authority' in reg_info:
112
                   entity['registration_authority'] = reg_info['authority']
113
                if reg_info and 'instant' in reg_info:
114
                   entity['registration_instant'] = reg_info['instant']
115
                entity['entity_types'] = MetadataParser.entity_types(element)
116
                entity['protocols'] = MetadataParser.entity_protocols(element, entity['entity_types'])
117
118
                if details:
119
                    entity_details = MetadataParser._get_entity_details(element)
120
                    entity.update(entity_details)
121
                    entity = dict((k, v) for k, v in entity.iteritems() if v)
122
123
                entity['languages'] = MetadataParser._entity_lang_seen(entity)
124
                yield entity
125
126
            element.clear()
127
            while element.getprevious() is not None:
128
                del element.getparent()[0]
129
        del context
130
131
    def get_federation(self, attrs=None):
132
        assert self.is_federation
133
134
        federation = {}
135
        federation['ID'] = self.rootelem.get('ID', None)
136
        federation['Name'] = self.rootelem.get('Name', None)
137
138
        return federation
139
140
    def get_entity(self, entityid, details=True):
141
        context = etree.iterparse(self.filename, tag=addns('EntityDescriptor'), events=('end',), huge_tree=True, remove_blank_text=True)
142
        element = None
143
        for element in MetadataParser._get_entity_by_id(context, entityid, details):
144
            return element
145
146
        raise ValueError("Entity not found: %s" % entityid)
147
148
    def entity_exist(self, entityid):
149
        entity_xpath = self.rootelem.xpath("//md:EntityDescriptor[@entityID='%s']"
150
                                         % entityid, namespaces=NAMESPACES)
151
        return len(entity_xpath) > 0
152
153
    @staticmethod
154
    def _get_entities_id(context):
155
        for event, element in context:
156
            yield element.attrib['entityID']
157
            element.clear()
158
            while element.getprevious() is not None:
159
                del element.getparent()[0]
160
        del context
161
162
    def get_entities(self):
163
        # Return entityid list
164
        context = etree.iterparse(self.filename, tag=addns('EntityDescriptor'), events=('end',), huge_tree=True, remove_blank_text=True)
165
        return list(self._get_entities_id(context))
166
167
    @staticmethod
168
    def entity_types(entity):
169
        expression = "|".join([desc for desc in DESCRIPTOR_TYPES_UTIL])
170
        elements = entity.xpath(expression, namespaces=NAMESPACES)
171
        types = [element.tag.split("}")[1] for element in elements]
172
        return types
173
174
    @staticmethod
175
    def entity_protocols(entity, entity_types):
176
        if isinstance(entity_types, list) and len(entity_types) > 0:
177
            e_type = entity_types[0]
178
        else:
179
            e_type = 'IDPSSODescriptor'
180
181
        raw_protocols = entity.xpath(".//md:%s"
182
                                     "/@protocolSupportEnumeration" % e_type,
183
                                     namespaces=NAMESPACES)
184
        if raw_protocols:
185
            protocols = raw_protocols[0]
186
            return protocols.split(' ')
187
188
        return []
189
190
    @staticmethod
191
    def entity_displayname(entity):
192
        languages = {}
193
194
        names = entity.xpath(".//mdui:UIInfo"
195
                             "//mdui:DisplayName",
196
                             namespaces=NAMESPACES)
197
198
        for dn_node in names:
199
            lang = getlang(dn_node)
200
            languages[lang] = dn_node.text
201
202
        if None in languages.keys():
203
            del languages[None]
204
        return languages
205
206
    @staticmethod
207
    def entity_description(entity):
208
        languages = {}
209
210
        names = entity.xpath(".//mdui:UIInfo"
211
                             "//mdui:Description",
212
                             namespaces=NAMESPACES)
213
214
        for dn_node in names:
215
            lang = getlang(dn_node)
216
            languages[lang] = dn_node.text
217
218
        if None in languages.keys():
219
            del languages[None]
220
        return languages
221
222
    @staticmethod
223
    def entity_information_url(entity):
224
        languages = {}
225
226
        names = entity.xpath(".//mdui:UIInfo"
227
                             "//mdui:InformationURL",
228
                             namespaces=NAMESPACES)
229
230
        for dn_node in names:
231
            lang = getlang(dn_node)
232
            languages[lang] = dn_node.text
233
234
        if None in languages.keys():
235
            del languages[None]
236
        return languages
237
238
    @staticmethod
239
    def entity_privacy_url(entity):
240
        languages = {}
241
242
        names = entity.xpath(".//mdui:UIInfo"
243
                             "//mdui:PrivacyStatementURL",
244
                             namespaces=NAMESPACES)
245
246
        for dn_node in names:
247
            lang = getlang(dn_node)
248
            languages[lang] = dn_node.text
249
250
        if None in languages.keys():
251
            del languages[None]
252
        return languages
253
 
254
    @staticmethod
255
    def entity_organization(entity):
256
        orgs = entity.xpath(".//md:Organization",
257
                            namespaces=NAMESPACES)
258
        languages = {}
259
        for org_node in orgs:
260
            for attr in 'name', 'displayName', 'URL':
261
                node_name = 'Organization' + attr[0].upper() + attr[1:]
262
                for node in org_node.findall(addns(node_name)):
263
                    lang = getlang(node)
264
                    lang_dict = languages.setdefault(lang, {})
265
                    lang_dict[attr] = node.text
266
267
        if None in languages.keys():
268
            del languages[None]
269
        return languages
270
271
    @staticmethod
272
    def entity_logos(entity):
273
        xmllogos = entity.xpath(".//mdui:UIInfo"
274
                                "/mdui:Logo",
275
                                namespaces=NAMESPACES)
276
        logos = []
277
        for logo_node in xmllogos:
278
            if logo_node.text is None:
279
                continue  # the file attribute is required
280
            logo = {}
281
            logo['width'] = int(logo_node.attrib.get('width', '0'))
282
            logo['height'] = int(logo_node.attrib.get('height', '0'))
283
            logo['file'] = logo_node.text
284
            logo['lang'] = getlang(logo_node)
285
            logos.append(logo)
286
        return logos
287
288
    @staticmethod
289
    def registration_information(entity):
290
        reg_info = entity.xpath(".//md:Extensions"
291
                                "/mdrpi:RegistrationInfo",
292
                                namespaces=NAMESPACES)
293
        info = {}
294
        if reg_info:
295
            info['authority'] = reg_info[0].attrib.get('registrationAuthority')
296
            info['instant'] = reg_info[0].attrib.get('registrationInstant')
297
        return info
298
299
    @staticmethod
300
    def registration_policy(entity):
301
        reg_policy = entity.xpath(".//md:Extensions"
302
                                "/mdrpi:RegistrationInfo"
303
                                "/mdrpi:RegistrationPolicy",
304
                                namespaces=NAMESPACES)
305
        languages = {}
306
        for dn_node in reg_policy:
307
            lang = getlang(dn_node)
308
            if lang is None:
309
                continue  # the lang attribute is required
310
311
            languages[lang] = dn_node.text
312
313
        return languages
314
315
    @staticmethod
316
    def entity_attribute_scope(entity):
317
        scope_node = entity.xpath(".//md:Extensions"
318
                                  "/shibmd:Scope",
319
                                  namespaces=NAMESPACES)
320
321
        scope = []
322
        for cur_scope in scope_node:
323
            if not cur_scope.text in scope:
324
                scope.append(cur_scope.text)
325
        return scope
326
327
    @staticmethod
328
    def entity_requested_attributes(entity):
329
        xmllogos = entity.xpath(".//md:AttributeConsumingService"
330
                                "/md:RequestedAttribute",
331
                                namespaces=NAMESPACES)
332
        attrs = {}
333
        attrs['required'] = []
334
        attrs['optional'] = []
335
        for attr_node in xmllogos:
336
            required = attr_node.attrib.get('isRequired', 'false')
337
            index = 'required' if required == 'true' else 'optional'
338
            attrs[index].append([attr_node.attrib.get('Name', None), attr_node.attrib.get('FriendlyName', None)])
339
        return attrs
340
341
    @staticmethod
342
    def entity_contacts(entity):
343
        contacts = entity.xpath(".//md:ContactPerson",
344
                                namespaces=NAMESPACES)
345
        cont = []
346
        for cont_node in contacts:
347
            c_type = cont_node.attrib.get('contactType', '')
348
            name = cont_node.xpath(".//md:GivenName", namespaces=NAMESPACES)
349
            if name:
350
                name = name[0].text
351
            else:
352
                name = None
353
            surname = cont_node.xpath(".//md:SurName", namespaces=NAMESPACES)
354
            if surname:
355
                surname = surname[0].text
356
            else:
357
                surname = None
358
            email = cont_node.xpath(".//md:EmailAddress", namespaces=NAMESPACES)
359
            if email:
360
                email = email[0].text
361
            else:
362
                email = None
363
            cont.append({ 'type': c_type, 'name': name, 'surname': surname, 'email': email })
364
        return cont
365