| Total Complexity | 70 |
| Total Lines | 304 |
| Duplicated Lines | 0 % |
Complex classes like met.metadataparser.MetadataParser often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | ################################################################# |
||
| 61 | class MetadataParser(object): |
||
| 62 | def __init__(self, filename=None): |
||
| 63 | if filename is None: |
||
| 64 | raise ValueError('filename is required') |
||
| 65 | |||
| 66 | self.filename = filename |
||
| 67 | context = etree.iterparse(self.filename, events=('start',), huge_tree=True, remove_blank_text=True) |
||
| 68 | context = iter(context) |
||
| 69 | event, self.rootelem = context.next() |
||
| 70 | self.file_id = self.rootelem.get('ID', None) |
||
| 71 | self.is_federation = self.rootelem.tag == FEDERATION_ROOT_TAG |
||
| 72 | self.is_entity = not self.is_federation |
||
| 73 | |||
| 74 | @staticmethod |
||
| 75 | def _get_entity_details(element): |
||
| 76 | entity = {} |
||
| 77 | |||
| 78 | entity['xml'] = etree.tostring(element, pretty_print=True) |
||
| 79 | |||
| 80 | entity['description'] = MetadataParser.entity_description(element) |
||
| 81 | entity['infoUrl'] = MetadataParser.entity_information_url(element) |
||
| 82 | entity['privacyUrl'] = MetadataParser.entity_privacy_url(element) |
||
| 83 | entity['organization'] = MetadataParser.entity_organization(element) |
||
| 84 | entity['logos'] = MetadataParser.entity_logos(element) |
||
| 85 | entity['scopes'] = MetadataParser.entity_attribute_scope(element) |
||
| 86 | entity['attr_requested'] = MetadataParser.entity_requested_attributes(element) |
||
| 87 | entity['contacts'] = MetadataParser.entity_contacts(element) |
||
| 88 | entity['registration_policy'] = MetadataParser.registration_policy(element) |
||
| 89 | |||
| 90 | return entity |
||
| 91 | |||
| 92 | @staticmethod |
||
| 93 | def _entity_lang_seen(entity): |
||
| 94 | languages = set() |
||
| 95 | for key in ['description', 'infoUrl', 'privacyUrl', 'organization', 'displayName']: |
||
| 96 | if key in entity.keys() and entity[key]: |
||
| 97 | languages |= set(entity[key].keys()) |
||
| 98 | |||
| 99 | return languages |
||
| 100 | |||
| 101 | @staticmethod |
||
| 102 | def _get_entity_by_id(context, entityid, details): |
||
| 103 | for event, element in context: |
||
| 104 | if element.attrib['entityID'] == entityid: |
||
| 105 | entity = {} |
||
| 106 | |||
| 107 | entity['entityid'] = entityid |
||
| 108 | entity['file_id'] = element.get('ID', None) |
||
| 109 | entity['displayName'] = MetadataParser.entity_displayname(element) |
||
| 110 | reg_info = MetadataParser.registration_information(element) |
||
| 111 | if reg_info and 'authority' in reg_info: |
||
| 112 | entity['registration_authority'] = reg_info['authority'] |
||
| 113 | if reg_info and 'instant' in reg_info: |
||
| 114 | entity['registration_instant'] = reg_info['instant'] |
||
| 115 | entity['entity_types'] = MetadataParser.entity_types(element) |
||
| 116 | entity['protocols'] = MetadataParser.entity_protocols(element, entity['entity_types']) |
||
| 117 | |||
| 118 | if details: |
||
| 119 | entity_details = MetadataParser._get_entity_details(element) |
||
| 120 | entity.update(entity_details) |
||
| 121 | entity = dict((k, v) for k, v in entity.iteritems() if v) |
||
| 122 | |||
| 123 | entity['languages'] = MetadataParser._entity_lang_seen(entity) |
||
| 124 | yield entity |
||
| 125 | |||
| 126 | element.clear() |
||
| 127 | while element.getprevious() is not None: |
||
| 128 | del element.getparent()[0] |
||
| 129 | del context |
||
| 130 | |||
| 131 | def get_federation(self, attrs=None): |
||
| 132 | assert self.is_federation |
||
| 133 | |||
| 134 | federation = {} |
||
| 135 | federation['ID'] = self.rootelem.get('ID', None) |
||
| 136 | federation['Name'] = self.rootelem.get('Name', None) |
||
| 137 | |||
| 138 | return federation |
||
| 139 | |||
| 140 | def get_entity(self, entityid, details=True): |
||
| 141 | context = etree.iterparse(self.filename, tag=addns('EntityDescriptor'), events=('end',), huge_tree=True, remove_blank_text=True) |
||
| 142 | element = None |
||
| 143 | for element in MetadataParser._get_entity_by_id(context, entityid, details): |
||
| 144 | return element |
||
| 145 | |||
| 146 | raise ValueError("Entity not found: %s" % entityid) |
||
| 147 | |||
| 148 | def entity_exist(self, entityid): |
||
| 149 | entity_xpath = self.rootelem.xpath("//md:EntityDescriptor[@entityID='%s']" |
||
| 150 | % entityid, namespaces=NAMESPACES) |
||
| 151 | return len(entity_xpath) > 0 |
||
| 152 | |||
| 153 | @staticmethod |
||
| 154 | def _get_entities_id(context): |
||
| 155 | for event, element in context: |
||
| 156 | yield element.attrib['entityID'] |
||
| 157 | element.clear() |
||
| 158 | while element.getprevious() is not None: |
||
| 159 | del element.getparent()[0] |
||
| 160 | del context |
||
| 161 | |||
| 162 | def get_entities(self): |
||
| 163 | # Return entityid list |
||
| 164 | context = etree.iterparse(self.filename, tag=addns('EntityDescriptor'), events=('end',), huge_tree=True, remove_blank_text=True) |
||
| 165 | return list(self._get_entities_id(context)) |
||
| 166 | |||
| 167 | @staticmethod |
||
| 168 | def entity_types(entity): |
||
| 169 | expression = "|".join([desc for desc in DESCRIPTOR_TYPES_UTIL]) |
||
| 170 | elements = entity.xpath(expression, namespaces=NAMESPACES) |
||
| 171 | types = [element.tag.split("}")[1] for element in elements] |
||
| 172 | return types |
||
| 173 | |||
| 174 | @staticmethod |
||
| 175 | def entity_protocols(entity, entity_types): |
||
| 176 | if isinstance(entity_types, list) and len(entity_types) > 0: |
||
| 177 | e_type = entity_types[0] |
||
| 178 | else: |
||
| 179 | e_type = 'IDPSSODescriptor' |
||
| 180 | |||
| 181 | raw_protocols = entity.xpath(".//md:%s" |
||
| 182 | "/@protocolSupportEnumeration" % e_type, |
||
| 183 | namespaces=NAMESPACES) |
||
| 184 | if raw_protocols: |
||
| 185 | protocols = raw_protocols[0] |
||
| 186 | return protocols.split(' ') |
||
| 187 | |||
| 188 | return [] |
||
| 189 | |||
| 190 | @staticmethod |
||
| 191 | def entity_displayname(entity): |
||
| 192 | languages = {} |
||
| 193 | |||
| 194 | names = entity.xpath(".//mdui:UIInfo" |
||
| 195 | "//mdui:DisplayName", |
||
| 196 | namespaces=NAMESPACES) |
||
| 197 | |||
| 198 | for dn_node in names: |
||
| 199 | lang = getlang(dn_node) |
||
| 200 | languages[lang] = dn_node.text |
||
| 201 | |||
| 202 | if None in languages.keys(): |
||
| 203 | del languages[None] |
||
| 204 | return languages |
||
| 205 | |||
| 206 | @staticmethod |
||
| 207 | def entity_description(entity): |
||
| 208 | languages = {} |
||
| 209 | |||
| 210 | names = entity.xpath(".//mdui:UIInfo" |
||
| 211 | "//mdui:Description", |
||
| 212 | namespaces=NAMESPACES) |
||
| 213 | |||
| 214 | for dn_node in names: |
||
| 215 | lang = getlang(dn_node) |
||
| 216 | languages[lang] = dn_node.text |
||
| 217 | |||
| 218 | if None in languages.keys(): |
||
| 219 | del languages[None] |
||
| 220 | return languages |
||
| 221 | |||
| 222 | @staticmethod |
||
| 223 | def entity_information_url(entity): |
||
| 224 | languages = {} |
||
| 225 | |||
| 226 | names = entity.xpath(".//mdui:UIInfo" |
||
| 227 | "//mdui:InformationURL", |
||
| 228 | namespaces=NAMESPACES) |
||
| 229 | |||
| 230 | for dn_node in names: |
||
| 231 | lang = getlang(dn_node) |
||
| 232 | languages[lang] = dn_node.text |
||
| 233 | |||
| 234 | if None in languages.keys(): |
||
| 235 | del languages[None] |
||
| 236 | return languages |
||
| 237 | |||
| 238 | @staticmethod |
||
| 239 | def entity_privacy_url(entity): |
||
| 240 | languages = {} |
||
| 241 | |||
| 242 | names = entity.xpath(".//mdui:UIInfo" |
||
| 243 | "//mdui:PrivacyStatementURL", |
||
| 244 | namespaces=NAMESPACES) |
||
| 245 | |||
| 246 | for dn_node in names: |
||
| 247 | lang = getlang(dn_node) |
||
| 248 | languages[lang] = dn_node.text |
||
| 249 | |||
| 250 | if None in languages.keys(): |
||
| 251 | del languages[None] |
||
| 252 | return languages |
||
| 253 | |||
| 254 | @staticmethod |
||
| 255 | def entity_organization(entity): |
||
| 256 | orgs = entity.xpath(".//md:Organization", |
||
| 257 | namespaces=NAMESPACES) |
||
| 258 | languages = {} |
||
| 259 | for org_node in orgs: |
||
| 260 | for attr in 'name', 'displayName', 'URL': |
||
| 261 | node_name = 'Organization' + attr[0].upper() + attr[1:] |
||
| 262 | for node in org_node.findall(addns(node_name)): |
||
| 263 | lang = getlang(node) |
||
| 264 | lang_dict = languages.setdefault(lang, {}) |
||
| 265 | lang_dict[attr] = node.text |
||
| 266 | |||
| 267 | if None in languages.keys(): |
||
| 268 | del languages[None] |
||
| 269 | return languages |
||
| 270 | |||
| 271 | @staticmethod |
||
| 272 | def entity_logos(entity): |
||
| 273 | xmllogos = entity.xpath(".//mdui:UIInfo" |
||
| 274 | "/mdui:Logo", |
||
| 275 | namespaces=NAMESPACES) |
||
| 276 | logos = [] |
||
| 277 | for logo_node in xmllogos: |
||
| 278 | if logo_node.text is None: |
||
| 279 | continue # the file attribute is required |
||
| 280 | logo = {} |
||
| 281 | logo['width'] = int(logo_node.attrib.get('width', '0')) |
||
| 282 | logo['height'] = int(logo_node.attrib.get('height', '0')) |
||
| 283 | logo['file'] = logo_node.text |
||
| 284 | logo['lang'] = getlang(logo_node) |
||
| 285 | logos.append(logo) |
||
| 286 | return logos |
||
| 287 | |||
| 288 | @staticmethod |
||
| 289 | def registration_information(entity): |
||
| 290 | reg_info = entity.xpath(".//md:Extensions" |
||
| 291 | "/mdrpi:RegistrationInfo", |
||
| 292 | namespaces=NAMESPACES) |
||
| 293 | info = {} |
||
| 294 | if reg_info: |
||
| 295 | info['authority'] = reg_info[0].attrib.get('registrationAuthority') |
||
| 296 | info['instant'] = reg_info[0].attrib.get('registrationInstant') |
||
| 297 | return info |
||
| 298 | |||
| 299 | @staticmethod |
||
| 300 | def registration_policy(entity): |
||
| 301 | reg_policy = entity.xpath(".//md:Extensions" |
||
| 302 | "/mdrpi:RegistrationInfo" |
||
| 303 | "/mdrpi:RegistrationPolicy", |
||
| 304 | namespaces=NAMESPACES) |
||
| 305 | languages = {} |
||
| 306 | for dn_node in reg_policy: |
||
| 307 | lang = getlang(dn_node) |
||
| 308 | if lang is None: |
||
| 309 | continue # the lang attribute is required |
||
| 310 | |||
| 311 | languages[lang] = dn_node.text |
||
| 312 | |||
| 313 | return languages |
||
| 314 | |||
| 315 | @staticmethod |
||
| 316 | def entity_attribute_scope(entity): |
||
| 317 | scope_node = entity.xpath(".//md:Extensions" |
||
| 318 | "/shibmd:Scope", |
||
| 319 | namespaces=NAMESPACES) |
||
| 320 | |||
| 321 | scope = [] |
||
| 322 | for cur_scope in scope_node: |
||
| 323 | if not cur_scope.text in scope: |
||
| 324 | scope.append(cur_scope.text) |
||
| 325 | return scope |
||
| 326 | |||
| 327 | @staticmethod |
||
| 328 | def entity_requested_attributes(entity): |
||
| 329 | xmllogos = entity.xpath(".//md:AttributeConsumingService" |
||
| 330 | "/md:RequestedAttribute", |
||
| 331 | namespaces=NAMESPACES) |
||
| 332 | attrs = {} |
||
| 333 | attrs['required'] = [] |
||
| 334 | attrs['optional'] = [] |
||
| 335 | for attr_node in xmllogos: |
||
| 336 | required = attr_node.attrib.get('isRequired', 'false') |
||
| 337 | index = 'required' if required == 'true' else 'optional' |
||
| 338 | attrs[index].append([attr_node.attrib.get('Name', None), attr_node.attrib.get('FriendlyName', None)]) |
||
| 339 | return attrs |
||
| 340 | |||
| 341 | @staticmethod |
||
| 342 | def entity_contacts(entity): |
||
| 343 | contacts = entity.xpath(".//md:ContactPerson", |
||
| 344 | namespaces=NAMESPACES) |
||
| 345 | cont = [] |
||
| 346 | for cont_node in contacts: |
||
| 347 | c_type = cont_node.attrib.get('contactType', '') |
||
| 348 | name = cont_node.xpath(".//md:GivenName", namespaces=NAMESPACES) |
||
| 349 | if name: |
||
| 350 | name = name[0].text |
||
| 351 | else: |
||
| 352 | name = None |
||
| 353 | surname = cont_node.xpath(".//md:SurName", namespaces=NAMESPACES) |
||
| 354 | if surname: |
||
| 355 | surname = surname[0].text |
||
| 356 | else: |
||
| 357 | surname = None |
||
| 358 | email = cont_node.xpath(".//md:EmailAddress", namespaces=NAMESPACES) |
||
| 359 | if email: |
||
| 360 | email = email[0].text |
||
| 361 | else: |
||
| 362 | email = None |
||
| 363 | cont.append({ 'type': c_type, 'name': name, 'surname': surname, 'email': email }) |
||
| 364 | return cont |
||
| 365 |