| Total Lines | 411 |
| Duplicated Lines | 4.38 % |
| Changes | 9 | ||
| Bugs | 1 | Features | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
| 1 | ################################################################# |
||
| 87 | class Entity(Base): |
||
| 88 | """ |
||
| 89 | Model describin a federation entity (IdP, SP or AA). |
||
| 90 | """ |
||
| 91 | |||
| 92 | READABLE_PROTOCOLS = { |
||
| 93 | 'urn:oasis:names:tc:SAML:1.1:protocol': 'SAML 1.1', |
||
| 94 | 'urn:oasis:names:tc:SAML:2.0:protocol': 'SAML 2.0', |
||
| 95 | 'urn:mace:shibboleth:1.0': 'Shiboleth 1.0', |
||
| 96 | } |
||
| 97 | |||
| 98 | entityid = models.CharField(blank=False, max_length=200, unique=True, |
||
| 99 | verbose_name=_(u'EntityID'), db_index=True) |
||
| 100 | |||
| 101 | federations = models.ManyToManyField('Federation', through='Entity_Federations', |
||
| 102 | verbose_name=_(u'Federations')) |
||
| 103 | |||
| 104 | types = models.ManyToManyField('EntityType', verbose_name=_(u'Type')) |
||
| 105 | |||
| 106 | name = JSONField(blank=True, null=True, max_length=2000, |
||
| 107 | verbose_name=_(u'Display Name')) |
||
| 108 | |||
| 109 | certstats = models.CharField(blank=True, null=True, max_length=200, |
||
| 110 | unique=False, verbose_name=_(u'Certificate Stats')) |
||
| 111 | |||
| 112 | _display_protocols = models.CharField(blank=True, null=True, max_length=300, |
||
| 113 | unique=False, verbose_name=_(u'Display Protocols')) |
||
| 114 | |||
| 115 | objects = models.Manager() |
||
| 116 | |||
| 117 | longlist = EntityManager() |
||
| 118 | |||
| 119 | curfed = None |
||
| 120 | |||
| 121 | @property |
||
| 122 | def certificates(self): |
||
| 123 | return json.loads(self.certstats) |
||
| 124 | |||
| 125 | @property |
||
| 126 | def registration_authority_xml(self): |
||
| 127 | return self._get_property('registration_authority') |
||
| 128 | |||
| 129 | @property |
||
| 130 | def registration_policy(self): |
||
| 131 | return self._get_property('registration_policy') |
||
| 132 | |||
| 133 | @property |
||
| 134 | def registration_instant(self): |
||
| 135 | reginstant = self._get_property('registration_instant') |
||
| 136 | if reginstant is None: |
||
| 137 | return None |
||
| 138 | reginstant = "%sZ" % reginstant[0:19] |
||
| 139 | return datetime.strptime(reginstant, '%Y-%m-%dT%H:%M:%SZ') |
||
| 140 | |||
| 141 | @property |
||
| 142 | def protocols(self): |
||
| 143 | try: |
||
| 144 | return ' '.join(self._get_property('protocols')) |
||
| 145 | except Exception: |
||
| 146 | return '' |
||
| 147 | |||
| 148 | @property |
||
| 149 | def languages(self): |
||
| 150 | try: |
||
| 151 | return ' '.join(self._get_property('languages')) |
||
| 152 | except Exception: |
||
| 153 | return '' |
||
| 154 | |||
| 155 | @property |
||
| 156 | def scopes(self): |
||
| 157 | try: |
||
| 158 | return ' '.join(self._get_property('scopes')) |
||
| 159 | except Exception: |
||
| 160 | return '' |
||
| 161 | |||
| 162 | @property |
||
| 163 | def attributes(self): |
||
| 164 | try: |
||
| 165 | attributes = self._get_property('attr_requested') |
||
| 166 | if not attributes: |
||
| 167 | return [] |
||
| 168 | return attributes['required'] |
||
| 169 | except Exception: |
||
| 170 | return [] |
||
| 171 | |||
| 172 | @property |
||
| 173 | def attributes_optional(self): |
||
| 174 | try: |
||
| 175 | attributes = self._get_property('attr_requested') |
||
| 176 | if not attributes: |
||
| 177 | return [] |
||
| 178 | return attributes['optional'] |
||
| 179 | except Exception: |
||
| 180 | return [] |
||
| 181 | |||
| 182 | @property |
||
| 183 | def organization(self): |
||
| 184 | organization = self._get_property('organization') |
||
| 185 | if not organization: |
||
| 186 | return [] |
||
| 187 | |||
| 188 | vals = [] |
||
| 189 | for lang, data in organization.items(): |
||
| 190 | data['lang'] = lang |
||
| 191 | vals.append(data) |
||
| 192 | |||
| 193 | return vals |
||
| 194 | |||
| 195 | @property |
||
| 196 | def display_name(self): |
||
| 197 | try: |
||
| 198 | return self._get_property('displayName') |
||
| 199 | except Exception: |
||
| 200 | return '' |
||
| 201 | |||
| 202 | @property |
||
| 203 | def federations_count(self): |
||
| 204 | try: |
||
| 205 | return str(self.federations.all().count()) |
||
| 206 | except Exception: |
||
| 207 | return '' |
||
| 208 | |||
| 209 | @property |
||
| 210 | def description(self): |
||
| 211 | try: |
||
| 212 | return self._get_property('description') |
||
| 213 | except Exception: |
||
| 214 | return '' |
||
| 215 | |||
| 216 | @property |
||
| 217 | def info_url(self): |
||
| 218 | try: |
||
| 219 | return self._get_property('infoUrl') |
||
| 220 | except Exception: |
||
| 221 | return '' |
||
| 222 | |||
| 223 | @property |
||
| 224 | def privacy_url(self): |
||
| 225 | try: |
||
| 226 | return self._get_property('privacyUrl') |
||
| 227 | except Exception: |
||
| 228 | return '' |
||
| 229 | |||
| 230 | @property |
||
| 231 | def xml(self): |
||
| 232 | try: |
||
| 233 | return self._get_property('xml') |
||
| 234 | except Exception: |
||
| 235 | return '' |
||
| 236 | |||
| 237 | @property |
||
| 238 | def xml_types(self): |
||
| 239 | try: |
||
| 240 | return self._get_property('entity_types') |
||
| 241 | except Exception: |
||
| 242 | return [] |
||
| 243 | |||
| 244 | @property |
||
| 245 | def entity_categories(self): |
||
| 246 | if self.curfed: |
||
| 247 | efed = Entity_Federations.objects.get_or_create(federation=self.curfed, entity=self)[0] |
||
| 248 | return efed.entity_categories |
||
| 249 | return None |
||
| 250 | |||
| 251 | @property |
||
| 252 | def xml_categories(self): |
||
| 253 | try: |
||
| 254 | return self._get_property('entity_categories') |
||
| 255 | except Exception: |
||
| 256 | return [] |
||
| 257 | |||
| 258 | @property |
||
| 259 | def display_protocols(self): |
||
| 260 | protocols = [] |
||
| 261 | |||
| 262 | xml_protocols = self.protocols |
||
| 263 | if xml_protocols: |
||
| 264 | for proto in xml_protocols.split(' '): |
||
| 265 | protocols.append(self.READABLE_PROTOCOLS.get(proto, proto)) |
||
| 266 | |||
| 267 | return protocols |
||
| 268 | |||
| 269 | def display_attributes(self): |
||
| 270 | attributes = {} |
||
| 271 | for [attr, friendly] in self.attributes: |
||
| 272 | if friendly: |
||
| 273 | attributes[attr] = friendly |
||
| 274 | elif attr in attributemap.MAP['fro']: |
||
| 275 | attributes[attr] = attributemap.MAP['fro'][attr] |
||
| 276 | else: |
||
| 277 | attributes[attr] = '?' |
||
| 278 | return attributes |
||
| 279 | |||
| 280 | def display_attributes_optional(self): |
||
| 281 | attributes = {} |
||
| 282 | for [attr, friendly] in self.attributes_optional: |
||
| 283 | if friendly: |
||
| 284 | attributes[attr] = friendly |
||
| 285 | elif attr in attributemap.MAP['fro']: |
||
| 286 | attributes[attr] = attributemap.MAP['fro'][attr] |
||
| 287 | else: |
||
| 288 | attributes[attr] = '?' |
||
| 289 | return attributes |
||
| 290 | |||
| 291 | @property |
||
| 292 | def contacts(self): |
||
| 293 | contacts = [] |
||
| 294 | for cur_contact in self._get_property('contacts'): |
||
| 295 | if cur_contact['name'] and cur_contact['surname']: |
||
| 296 | contact_name = '%s %s' % ( |
||
| 297 | cur_contact['name'], cur_contact['surname']) |
||
| 298 | elif cur_contact['name']: |
||
| 299 | contact_name = cur_contact['name'] |
||
| 300 | elif cur_contact['surname']: |
||
| 301 | contact_name = cur_contact['surname'] |
||
| 302 | else: |
||
| 303 | contact_name = urlparse( |
||
| 304 | cur_contact['email']).path.partition('?')[0] |
||
| 305 | c_type = 'undefined' |
||
| 306 | if cur_contact['type']: |
||
| 307 | c_type = cur_contact['type'] |
||
| 308 | contacts.append( |
||
| 309 | {'name': contact_name, 'email': cur_contact['email'], 'type': c_type}) |
||
| 310 | return contacts |
||
| 311 | |||
| 312 | @property |
||
| 313 | def logos(self): |
||
| 314 | logos = [] |
||
| 315 | for cur_logo in self._get_property('logos'): |
||
| 316 | cur_logo['external'] = True |
||
| 317 | logos.append(cur_logo) |
||
| 318 | |||
| 319 | return logos |
||
| 320 | |||
| 321 | class Meta(object): |
||
| 322 | verbose_name = _(u'Entity') |
||
| 323 | verbose_name_plural = _(u'Entities') |
||
| 324 | |||
| 325 | def __unicode__(self): |
||
| 326 | return self.entityid |
||
| 327 | |||
| 328 | def load_metadata(self, federation=None, entity_data=None): |
||
| 329 | if hasattr(self, '_entity_cached'): |
||
| 330 | return |
||
| 331 | |||
| 332 | if self.file: |
||
| 333 | self._entity_cached = self.load_file().get_entity(self.entityid) |
||
| 334 | elif federation: |
||
| 335 | self._entity_cached = federation.get_entity_metadata(self.entityid) |
||
| 336 | elif entity_data: |
||
| 337 | self._entity_cached = entity_data |
||
| 338 | else: |
||
| 339 | right_fed = None |
||
| 340 | first_fed = None |
||
| 341 | for fed in self.federations.all(): |
||
| 342 | if fed.registration_authority == self.registration_authority: |
||
| 343 | right_fed = fed |
||
| 344 | if first_fed is None: |
||
| 345 | first_fed = fed |
||
| 346 | |||
| 347 | if right_fed is not None: |
||
| 348 | entity_cached = right_fed.get_entity_metadata(self.entityid) |
||
| 349 | self._entity_cached = entity_cached |
||
| 350 | else: |
||
| 351 | entity_cached = first_fed.get_entity_metadata(self.entityid) |
||
| 352 | self._entity_cached = entity_cached |
||
| 353 | |||
| 354 | if not hasattr(self, '_entity_cached'): |
||
| 355 | raise ValueError("Can't find entity metadata") |
||
| 356 | |||
| 357 | def _get_property(self, prop, federation=None): |
||
| 358 | try: |
||
| 359 | self.load_metadata(federation or self.curfed) |
||
| 360 | except ValueError: |
||
| 361 | return None |
||
| 362 | |||
| 363 | if hasattr(self, '_entity_cached'): |
||
| 364 | return self._entity_cached.get(prop, None) |
||
| 365 | else: |
||
| 366 | raise ValueError("Not metadata loaded") |
||
| 367 | |||
| 368 | View Code Duplication | def _get_or_create_etypes(self, cached_entity_types): |
|
|
|
|||
| 369 | entity_types = [] |
||
| 370 | cur_cached_types = [t.xmlname for t in self.types.all()] |
||
| 371 | for etype in self.xml_types: |
||
| 372 | if etype in cur_cached_types: |
||
| 373 | break |
||
| 374 | |||
| 375 | if cached_entity_types is None: |
||
| 376 | entity_type, _ = EntityType.objects.get_or_create(xmlname=etype, |
||
| 377 | name=DESCRIPTOR_TYPES_DISPLAY[etype]) |
||
| 378 | else: |
||
| 379 | if etype in cached_entity_types: |
||
| 380 | entity_type = cached_entity_types[etype] |
||
| 381 | else: |
||
| 382 | entity_type = EntityType.objects.create(xmlname=etype, |
||
| 383 | name=DESCRIPTOR_TYPES_DISPLAY[etype]) |
||
| 384 | entity_types.append(entity_type) |
||
| 385 | return entity_types |
||
| 386 | |||
| 387 | def process_metadata(self, auto_save=True, entity_data=None, cached_entity_types=None, federation=None): |
||
| 388 | if not entity_data: |
||
| 389 | self.load_metadata() |
||
| 390 | |||
| 391 | if self.entityid.lower() != entity_data.get('entityid').lower(): |
||
| 392 | raise ValueError("EntityID is not the same: %s != %s" % ( |
||
| 393 | self.entityid.lower(), entity_data.get('entityid').lower())) |
||
| 394 | |||
| 395 | self._entity_cached = entity_data |
||
| 396 | |||
| 397 | if self.xml_types: |
||
| 398 | entity_types = self._get_or_create_etypes(cached_entity_types) |
||
| 399 | if len(entity_types) > 0: |
||
| 400 | self.types.add(*entity_types) |
||
| 401 | |||
| 402 | newname = self._get_property('displayName') |
||
| 403 | if newname and newname != '': |
||
| 404 | self.name = newname |
||
| 405 | |||
| 406 | self.certstats = self._get_property('certstats') |
||
| 407 | |||
| 408 | newprotocols = self.protocols |
||
| 409 | if newprotocols and newprotocols != "": |
||
| 410 | self._display_protocols = newprotocols |
||
| 411 | |||
| 412 | if str(self._get_property('registration_authority')) != '': |
||
| 413 | self.registration_authority = self._get_property( |
||
| 414 | 'registration_authority') |
||
| 415 | |||
| 416 | if auto_save: |
||
| 417 | self.save() |
||
| 418 | |||
| 419 | def to_dict(self): |
||
| 420 | self.load_metadata() |
||
| 421 | |||
| 422 | entity = self._entity_cached.copy() |
||
| 423 | entity["types"] = [unicode(f) for f in self.types.all()] |
||
| 424 | entity["federations"] = [{u"name": unicode(f), u"url": f.get_absolute_url()} |
||
| 425 | for f in self.federations.all()] |
||
| 426 | |||
| 427 | if self.registration_authority: |
||
| 428 | entity["registration_authority"] = self.registration_authority |
||
| 429 | if self.registration_instant: |
||
| 430 | entity["registration_instant"] = '%s' % self.registration_instant |
||
| 431 | |||
| 432 | if "file_id" in entity.keys(): |
||
| 433 | del entity["file_id"] |
||
| 434 | if "entity_types" in entity.keys(): |
||
| 435 | del entity["entity_types"] |
||
| 436 | |||
| 437 | return entity |
||
| 438 | |||
| 439 | def display_etype(value, separator=', '): |
||
| 440 | return separator.join([unicode(item) for item in value.all()]) |
||
| 441 | |||
| 442 | @classmethod |
||
| 443 | def get_most_federated_entities(self, maxlength=TOP_LENGTH, cache_expire=None): |
||
| 444 | entities = None |
||
| 445 | if cache_expire: |
||
| 446 | entities = cache.get("most_federated_entities") |
||
| 447 | |||
| 448 | if not entities or len(entities) < maxlength: |
||
| 449 | # Entities with count how many federations belongs to, and sorted |
||
| 450 | # by most first |
||
| 451 | ob_entities = Entity.objects.all().annotate( |
||
| 452 | federationslength=Count("federations")).order_by("-federationslength") |
||
| 453 | ob_entities = ob_entities.prefetch_related('types', 'federations') |
||
| 454 | ob_entities = ob_entities[:maxlength] |
||
| 455 | |||
| 456 | entities = [] |
||
| 457 | for entity in ob_entities: |
||
| 458 | entities.append({ |
||
| 459 | 'entityid': entity.entityid, |
||
| 460 | 'name': entity.name, |
||
| 461 | 'absolute_url': entity.get_absolute_url(), |
||
| 462 | 'types': [unicode(item) for item in entity.types.all()], |
||
| 463 | 'federations': [(unicode(item.name), item.get_absolute_url()) for item in entity.federations.all()], |
||
| 464 | }) |
||
| 465 | |||
| 466 | if cache_expire: |
||
| 467 | cache.set("most_federated_entities", entities, cache_expire) |
||
| 468 | |||
| 469 | return entities[:maxlength] |
||
| 470 | |||
| 471 | def get_absolute_url(self): |
||
| 472 | return reverse('entity_view', args=[quote_plus(self.entityid.encode('utf-8'))]) |
||
| 473 | |||
| 474 | def can_edit(self, user, delete): |
||
| 475 | permission = 'delete_entity' if delete else 'change_entity' |
||
| 476 | if user.is_superuser or (user.has_perm('metadataparser.%s' % permission) and user in self.editor_users.all()): |
||
| 477 | return True |
||
| 478 | |||
| 479 | for federation in self.federations.all(): |
||
| 480 | if federation.can_edit(user, False): |
||
| 481 | return True |
||
| 482 | |||
| 483 | return False |
||
| 484 | |||
| 485 | def has_changed(self, entityid, name, registration_authority, certstats, display_protocols): |
||
| 486 | if self.entityid != entityid: |
||
| 487 | return True |
||
| 488 | if self.name != name: |
||
| 489 | return True |
||
| 490 | if self.registration_authority != registration_authority: |
||
| 491 | return True |
||
| 492 | if self.certstats != certstats: |
||
| 493 | return True |
||
| 494 | if self._display_protocols != display_protocols: |
||
| 495 | return True |
||
| 496 | |||
| 497 | return False |
||
| 498 |