MetadataParser   F
last analyzed

Complexity

Total Complexity 80

Size/Duplication

Total Lines 357
Duplicated Lines 0 %

Importance

Changes 10
Bugs 2 Features 0
Metric Value
wmc 80
c 10
b 2
f 0
dl 0
loc 357
rs 1.5789

25 Methods

Rating   Name   Duplication   Size   Complexity  
A registration_information() 0 10 2
A entity_protocols() 0 15 4
F _get_entity_by_id() 0 35 11
A __init__() 0 11 3
A entity_types() 0 8 4
A _chunkstring() 0 3 2
A _entity_lang_seen() 0 8 4
B entity_organization() 0 16 5
A entity_exist() 0 4 1
A entity_information_url() 0 15 3
A _get_entity_details() 0 19 1
A _get_entities_id() 0 8 3
A get_entities() 0 5 1
A entity_description() 0 15 3
A entity_displayname() 0 15 3
A entity_logos() 0 16 3
B get_certstats() 0 24 4
A get_federation() 0 8 2
A entity_categories() 0 8 2
A entity_requested_attributes() 0 14 3
A get_entity() 0 8 2
A entity_privacy_url() 0 15 3
A registration_policy() 0 15 3
B entity_contacts() 0 26 5
A entity_attribute_scope() 0 11 3

How to fix   Complexity   

Complex Class

Complex classes like MetadataParser often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#################################################################
2
# MET v2 Metadate Explorer Tool
3
#
4
# This Software is Open Source. See License: https://github.com/TERENA/met/blob/master/LICENSE.md
5
# Copyright (c) 2012, TERENA All rights reserved.
6
#
7
# This Software is based on MET v1 developed for TERENA by Yaco Sistemas, http://www.yaco.es/
8
# MET v2 was developed for TERENA by Tamim Ziai, DAASI International GmbH, http://www.daasi.de
9
# Current version of MET has been revised for performance improvements by Andrea Biancini,
10
# Consortium GARR, http://www.garr.it
11
##########################################################################
12
13
from lxml import etree
14
from cryptography import x509
15
from cryptography.hazmat.backends import default_backend
16
import simplejson as json
17
18
NAMESPACES = {
19
    'xml': 'http://www.w3.org/XML/1998/namespace',
20
    'xs': 'http://www.w3.org/2001/XMLSchema',
21
    'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
22
    'md': 'urn:oasis:names:tc:SAML:2.0:metadata',
23
    'mdui': 'urn:oasis:names:tc:SAML:metadata:ui',
24
    'ds': 'http://www.w3.org/2000/09/xmldsig#',
25
    'saml': 'urn:oasis:names:tc:SAML:2.0:assertion',
26
    'samlp': 'urn:oasis:names:tc:SAML:2.0:protocol',
27
    'mdrpi': 'urn:oasis:names:tc:SAML:metadata:rpi',
28
    'shibmd': 'urn:mace:shibboleth:metadata:1.0',
29
    'mdattr': 'urn:oasis:names:tc:SAML:metadata:attribute',
30
}
31
32
SAML_METADATA_NAMESPACE = NAMESPACES['md']
33
34
XML_NAMESPACE = NAMESPACES['xml']
35
XMLDSIG_NAMESPACE = NAMESPACES['ds']
36
MDUI_NAMESPACE = NAMESPACES['mdui']
37
38
DESCRIPTOR_TYPES = ('IDPSSODescriptor', 'SPSSODescriptor', 'AASSODescriptor')
39
DESCRIPTOR_TYPES_DISPLAY = {}
40
for item in DESCRIPTOR_TYPES:
41
    DESCRIPTOR_TYPES_DISPLAY[item] = item.replace('SSODescriptor', '')
42
43
DESCRIPTOR_TYPES_UTIL = ["md:%s" % item for item in DESCRIPTOR_TYPES]
44
45
46
def addns(node_name, namespace=SAML_METADATA_NAMESPACE):
47
    '''Return a node name qualified with the XML namespace'''
48
    return '{' + namespace + '}' + node_name
49
50
51
def delns(node, namespace=SAML_METADATA_NAMESPACE):
52
    return node.replace('{' + namespace + '}', '')
53
54
55
def getlang(node):
56
    if 'lang' in node.attrib:
57
        return node.attrib['lang']
58
    elif addns('lang', NAMESPACES['xml']) in node.attrib:
59
        return node.attrib[addns('lang', NAMESPACES['xml'])]
60
61
62
FEDERATION_ROOT_TAG = addns('EntitiesDescriptor')
63
ENTITY_ROOT_TAG = addns('EntityDescriptor')
64
65
66
class MetadataParser(object):
67
    def __init__(self, filename=None):
68
        if filename is None:
69
            raise ValueError('filename is required')
70
71
        self.filename = filename
72
        with open(filename, 'r') as myfile:
73
            data = myfile.read().replace('\n', '')
74
        self.rootelem = etree.fromstring(data)
75
        self.file_id = self.rootelem.get('ID', None)
76
        self.is_federation = self.rootelem.tag == FEDERATION_ROOT_TAG
77
        self.is_entity = not self.is_federation
78
79
    @staticmethod
80
    def _get_entity_details(element):
81
        entity = {}
82
83
        entity['xml'] = etree.tostring(element, pretty_print=True)
84
85
        entity['description'] = MetadataParser.entity_description(element)
86
        entity['infoUrl'] = MetadataParser.entity_information_url(element)
87
        entity['privacyUrl'] = MetadataParser.entity_privacy_url(element)
88
        entity['organization'] = MetadataParser.entity_organization(element)
89
        entity['logos'] = MetadataParser.entity_logos(element)
90
        entity['scopes'] = MetadataParser.entity_attribute_scope(element)
91
        entity['attr_requested'] = MetadataParser.entity_requested_attributes(
92
            element)
93
        entity['contacts'] = MetadataParser.entity_contacts(element)
94
        entity['registration_policy'] = MetadataParser.registration_policy(
95
            element)
96
97
        return entity
98
99
    @staticmethod
100
    def _entity_lang_seen(entity):
101
        languages = set()
102
        for key in ['description', 'infoUrl', 'privacyUrl', 'organization', 'displayName']:
103
            if key in entity.keys() and entity[key]:
104
                languages |= set(entity[key].keys())
105
106
        return languages
107
108
    @staticmethod
109
    def _get_entity_by_id(context, entityid, details):
110
        for _, element in context:
111
            if element.attrib['entityID'] == entityid:
112
                entity = {}
113
114
                entity['entityid'] = entityid
115
                entity['file_id'] = element.get('ID', None)
116
                entity['displayName'] = MetadataParser.entity_displayname(
117
                    element)
118
                reg_info = MetadataParser.registration_information(element)
119
                if reg_info and 'authority' in reg_info:
120
                    entity['registration_authority'] = reg_info['authority']
121
                if reg_info and 'instant' in reg_info:
122
                    entity['registration_instant'] = reg_info['instant']
123
                entity['entity_categories'] = MetadataParser.entity_categories(
124
                    element)
125
                entity['entity_types'] = MetadataParser.entity_types(element)
126
                entity['protocols'] = MetadataParser.entity_protocols(
127
                    element, entity['entity_types'])
128
                entity['certstats'] = MetadataParser.get_certstats(element)
129
130
                if details:
131
                    entity_details = MetadataParser._get_entity_details(
132
                        element)
133
                    entity.update(entity_details)
134
                    entity = dict((k, v) for k, v in entity.iteritems() if v)
135
136
                entity['languages'] = MetadataParser._entity_lang_seen(entity)
137
                yield entity
138
139
            element.clear()
140
            while element.getprevious() is not None:
141
                del element.getparent()[0]
142
        del context
143
144
    def get_federation(self):
145
        assert self.is_federation
146
147
        federation = {}
148
        federation['ID'] = self.rootelem.get('ID', None)
149
        federation['Name'] = self.rootelem.get('Name', None)
150
151
        return federation
152
153
    @staticmethod
154
    def _chunkstring(string, length):
155
        return (string[0 + i:length + i] for i in range(0, len(string), length))
156
157
    def get_entity(self, entityid, details=True):
158
        context = etree.iterparse(self.filename, tag=addns(
159
            'EntityDescriptor'), events=('end',), huge_tree=True, remove_blank_text=True)
160
        element = None
161
        for element in MetadataParser._get_entity_by_id(context, entityid, details):
162
            return element
163
164
        raise ValueError("Entity not found: %s" % entityid)
165
166
    def entity_exist(self, entityid):
167
        entity_xpath = self.rootelem.xpath("//md:EntityDescriptor[@entityID='%s']"
168
                                           % entityid, namespaces=NAMESPACES)
169
        return len(entity_xpath) > 0
170
171
    @staticmethod
172
    def _get_entities_id(context):
173
        for _, element in context:
174
            yield element.attrib['entityID']
175
            element.clear()
176
            while element.getprevious() is not None:
177
                del element.getparent()[0]
178
        del context
179
180
    def get_entities(self):
181
        # Return entityid list
182
        context = etree.iterparse(self.filename, tag=addns(
183
            'EntityDescriptor'), events=('end',), huge_tree=True, remove_blank_text=True)
184
        return list(self._get_entities_id(context))
185
186
    @staticmethod
187
    def entity_types(entity):
188
        expression = "|".join([desc for desc in DESCRIPTOR_TYPES_UTIL])
189
        elements = entity.xpath(expression, namespaces=NAMESPACES)
190
        types = [element.tag.split("}")[1] for element in elements]
191
        if len(types) == 0:
192
            types = ['AASSODescriptor']
193
        return types
194
195
    @staticmethod
196
    def entity_categories(entity):
197
        elements = entity.xpath(".//mdattr:EntityAttributes"
198
                                "//saml:Attribute[@Name='http://macedir.org/entity-category-support' or @Name='http://macedir.org/entity-category' or @Name='urn:oasis:names:tc:SAML:attribute:assurance-certification']"
199
                                "//saml:AttributeValue",
200
                                namespaces=NAMESPACES)
201
        categories = [dnnode.text.strip() for dnnode in elements]
202
        return categories
203
204
    @staticmethod
205
    def entity_protocols(entity, entity_types):
206
        if isinstance(entity_types, list) and len(entity_types) > 0:
207
            e_type = entity_types[0]
208
        else:
209
            e_type = 'IDPSSODescriptor'
210
211
        raw_protocols = entity.xpath(".//md:%s"
212
                                     "/@protocolSupportEnumeration" % e_type,
213
                                     namespaces=NAMESPACES)
214
        if raw_protocols:
215
            protocols = raw_protocols[0]
216
            return protocols.split(' ')
217
218
        return []
219
220
    @staticmethod
221
    def get_certstats(element):
222
        hashes = {}
223
224
        for x in element.xpath(".//ds:X509Certificate", namespaces=NAMESPACES):
225
            certName = 'invalid'
226
227
            try:
228
                text = x.text.replace("\n", "").replace(
229
                    " ", "").replace("\t", "")
230
                text = "\n".join(MetadataParser._chunkstring(text, 64))
231
                certText = "\n".join(
232
                    ["-----BEGIN CERTIFICATE-----", text, '-----END CERTIFICATE-----'])
233
                cert = x509.load_pem_x509_certificate(
234
                    certText, default_backend())
235
                certName = cert.signature_hash_algorithm.name
236
            except Exception, e:
237
                pass
238
239
            if certName not in hashes:
240
                hashes[certName] = 0
241
            hashes[certName] += 1
242
243
        return json.dumps(hashes)
244
245
    @staticmethod
246
    def entity_displayname(entity):
247
        languages = {}
248
249
        names = entity.xpath(".//mdui:UIInfo"
250
                             "//mdui:DisplayName",
251
                             namespaces=NAMESPACES)
252
253
        for dn_node in names:
254
            lang = getlang(dn_node)
255
            languages[lang] = dn_node.text
256
257
        if None in languages.keys():
258
            del languages[None]
259
        return languages
260
261
    @staticmethod
262
    def entity_description(entity):
263
        languages = {}
264
265
        names = entity.xpath(".//mdui:UIInfo"
266
                             "//mdui:Description",
267
                             namespaces=NAMESPACES)
268
269
        for dn_node in names:
270
            lang = getlang(dn_node)
271
            languages[lang] = dn_node.text
272
273
        if None in languages.keys():
274
            del languages[None]
275
        return languages
276
277
    @staticmethod
278
    def entity_information_url(entity):
279
        languages = {}
280
281
        names = entity.xpath(".//mdui:UIInfo"
282
                             "//mdui:InformationURL",
283
                             namespaces=NAMESPACES)
284
285
        for dn_node in names:
286
            lang = getlang(dn_node)
287
            languages[lang] = dn_node.text
288
289
        if None in languages.keys():
290
            del languages[None]
291
        return languages
292
293
    @staticmethod
294
    def entity_privacy_url(entity):
295
        languages = {}
296
297
        names = entity.xpath(".//mdui:UIInfo"
298
                             "//mdui:PrivacyStatementURL",
299
                             namespaces=NAMESPACES)
300
301
        for dn_node in names:
302
            lang = getlang(dn_node)
303
            languages[lang] = dn_node.text
304
305
        if None in languages.keys():
306
            del languages[None]
307
        return languages
308
309
    @staticmethod
310
    def entity_organization(entity):
311
        orgs = entity.xpath(".//md:Organization",
312
                            namespaces=NAMESPACES)
313
        languages = {}
314
        for org_node in orgs:
315
            for attr in 'name', 'displayName', 'URL':
316
                node_name = 'Organization' + attr[0].upper() + attr[1:]
317
                for node in org_node.findall(addns(node_name)):
318
                    lang = getlang(node)
319
                    lang_dict = languages.setdefault(lang, {})
320
                    lang_dict[attr] = node.text
321
322
        if None in languages.keys():
323
            del languages[None]
324
        return languages
325
326
    @staticmethod
327
    def entity_logos(entity):
328
        xmllogos = entity.xpath(".//mdui:UIInfo"
329
                                "//mdui:Logo",
330
                                namespaces=NAMESPACES)
331
        logos = []
332
        for logo_node in xmllogos:
333
            if logo_node.text is None:
334
                continue  # the file attribute is required
335
            logo = {}
336
            logo['width'] = int(logo_node.attrib.get('width', '0'))
337
            logo['height'] = int(logo_node.attrib.get('height', '0'))
338
            logo['file'] = logo_node.text
339
            logo['lang'] = getlang(logo_node)
340
            logos.append(logo)
341
        return logos
342
343
    @staticmethod
344
    def registration_information(entity):
345
        reg_info = entity.xpath(".//md:Extensions"
346
                                "//mdrpi:RegistrationInfo",
347
                                namespaces=NAMESPACES)
348
        info = {}
349
        if reg_info:
350
            info['authority'] = reg_info[0].attrib.get('registrationAuthority')
351
            info['instant'] = reg_info[0].attrib.get('registrationInstant')
352
        return info
353
354
    @staticmethod
355
    def registration_policy(entity):
356
        reg_policy = entity.xpath(".//md:Extensions"
357
                                  "//mdrpi:RegistrationInfo"
358
                                  "//mdrpi:RegistrationPolicy",
359
                                  namespaces=NAMESPACES)
360
        languages = {}
361
        for dn_node in reg_policy:
362
            lang = getlang(dn_node)
363
            if lang is None:
364
                continue  # the lang attribute is required
365
366
            languages[lang] = dn_node.text
367
368
        return languages
369
370
    @staticmethod
371
    def entity_attribute_scope(entity):
372
        scope_node = entity.xpath(".//md:Extensions"
373
                                  "//shibmd:Scope",
374
                                  namespaces=NAMESPACES)
375
376
        scope = []
377
        for cur_scope in scope_node:
378
            if not cur_scope.text in scope:
379
                scope.append(cur_scope.text)
380
        return scope
381
382
    @staticmethod
383
    def entity_requested_attributes(entity):
384
        xmllogos = entity.xpath(".//md:AttributeConsumingService"
385
                                "//md:RequestedAttribute",
386
                                namespaces=NAMESPACES)
387
        attrs = {}
388
        attrs['required'] = []
389
        attrs['optional'] = []
390
        for attr_node in xmllogos:
391
            required = attr_node.attrib.get('isRequired', 'false')
392
            index = 'required' if required == 'true' else 'optional'
393
            attrs[index].append([attr_node.attrib.get(
394
                'Name', None), attr_node.attrib.get('FriendlyName', None)])
395
        return attrs
396
397
    @staticmethod
398
    def entity_contacts(entity):
399
        contacts = entity.xpath(".//md:ContactPerson",
400
                                namespaces=NAMESPACES)
401
        cont = []
402
        for cont_node in contacts:
403
            c_type = cont_node.attrib.get('contactType', '')
404
            name = cont_node.xpath(".//md:GivenName", namespaces=NAMESPACES)
405
            if name:
406
                name = name[0].text
407
            else:
408
                name = None
409
            surname = cont_node.xpath(".//md:SurName", namespaces=NAMESPACES)
410
            if surname:
411
                surname = surname[0].text
412
            else:
413
                surname = None
414
            email = cont_node.xpath(
415
                ".//md:EmailAddress", namespaces=NAMESPACES)
416
            if email:
417
                email = email[0].text
418
            else:
419
                email = None
420
            cont.append({'type': c_type, 'name': name,
421
                         'surname': surname, 'email': email})
422
        return cont
423