Total Lines | 411 |
Duplicated Lines | 4.38 % |
Changes | 9 | ||
Bugs | 1 | Features | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
1 | ################################################################# |
||
87 | class Entity(Base): |
||
88 | """ |
||
89 | Model describin a federation entity (IdP, SP or AA). |
||
90 | """ |
||
91 | |||
92 | READABLE_PROTOCOLS = { |
||
93 | 'urn:oasis:names:tc:SAML:1.1:protocol': 'SAML 1.1', |
||
94 | 'urn:oasis:names:tc:SAML:2.0:protocol': 'SAML 2.0', |
||
95 | 'urn:mace:shibboleth:1.0': 'Shiboleth 1.0', |
||
96 | } |
||
97 | |||
98 | entityid = models.CharField(blank=False, max_length=200, unique=True, |
||
99 | verbose_name=_(u'EntityID'), db_index=True) |
||
100 | |||
101 | federations = models.ManyToManyField('Federation', through='Entity_Federations', |
||
102 | verbose_name=_(u'Federations')) |
||
103 | |||
104 | types = models.ManyToManyField('EntityType', verbose_name=_(u'Type')) |
||
105 | |||
106 | name = JSONField(blank=True, null=True, max_length=2000, |
||
107 | verbose_name=_(u'Display Name')) |
||
108 | |||
109 | certstats = models.CharField(blank=True, null=True, max_length=200, |
||
110 | unique=False, verbose_name=_(u'Certificate Stats')) |
||
111 | |||
112 | _display_protocols = models.CharField(blank=True, null=True, max_length=300, |
||
113 | unique=False, verbose_name=_(u'Display Protocols')) |
||
114 | |||
115 | objects = models.Manager() |
||
116 | |||
117 | longlist = EntityManager() |
||
118 | |||
119 | curfed = None |
||
120 | |||
121 | @property |
||
122 | def certificates(self): |
||
123 | return json.loads(self.certstats) |
||
124 | |||
125 | @property |
||
126 | def registration_authority_xml(self): |
||
127 | return self._get_property('registration_authority') |
||
128 | |||
129 | @property |
||
130 | def registration_policy(self): |
||
131 | return self._get_property('registration_policy') |
||
132 | |||
133 | @property |
||
134 | def registration_instant(self): |
||
135 | reginstant = self._get_property('registration_instant') |
||
136 | if reginstant is None: |
||
137 | return None |
||
138 | reginstant = "%sZ" % reginstant[0:19] |
||
139 | return datetime.strptime(reginstant, '%Y-%m-%dT%H:%M:%SZ') |
||
140 | |||
141 | @property |
||
142 | def protocols(self): |
||
143 | try: |
||
144 | return ' '.join(self._get_property('protocols')) |
||
145 | except Exception: |
||
146 | return '' |
||
147 | |||
148 | @property |
||
149 | def languages(self): |
||
150 | try: |
||
151 | return ' '.join(self._get_property('languages')) |
||
152 | except Exception: |
||
153 | return '' |
||
154 | |||
155 | @property |
||
156 | def scopes(self): |
||
157 | try: |
||
158 | return ' '.join(self._get_property('scopes')) |
||
159 | except Exception: |
||
160 | return '' |
||
161 | |||
162 | @property |
||
163 | def attributes(self): |
||
164 | try: |
||
165 | attributes = self._get_property('attr_requested') |
||
166 | if not attributes: |
||
167 | return [] |
||
168 | return attributes['required'] |
||
169 | except Exception: |
||
170 | return [] |
||
171 | |||
172 | @property |
||
173 | def attributes_optional(self): |
||
174 | try: |
||
175 | attributes = self._get_property('attr_requested') |
||
176 | if not attributes: |
||
177 | return [] |
||
178 | return attributes['optional'] |
||
179 | except Exception: |
||
180 | return [] |
||
181 | |||
182 | @property |
||
183 | def organization(self): |
||
184 | organization = self._get_property('organization') |
||
185 | if not organization: |
||
186 | return [] |
||
187 | |||
188 | vals = [] |
||
189 | for lang, data in organization.items(): |
||
190 | data['lang'] = lang |
||
191 | vals.append(data) |
||
192 | |||
193 | return vals |
||
194 | |||
195 | @property |
||
196 | def display_name(self): |
||
197 | try: |
||
198 | return self._get_property('displayName') |
||
199 | except Exception: |
||
200 | return '' |
||
201 | |||
202 | @property |
||
203 | def federations_count(self): |
||
204 | try: |
||
205 | return str(self.federations.all().count()) |
||
206 | except Exception: |
||
207 | return '' |
||
208 | |||
209 | @property |
||
210 | def description(self): |
||
211 | try: |
||
212 | return self._get_property('description') |
||
213 | except Exception: |
||
214 | return '' |
||
215 | |||
216 | @property |
||
217 | def info_url(self): |
||
218 | try: |
||
219 | return self._get_property('infoUrl') |
||
220 | except Exception: |
||
221 | return '' |
||
222 | |||
223 | @property |
||
224 | def privacy_url(self): |
||
225 | try: |
||
226 | return self._get_property('privacyUrl') |
||
227 | except Exception: |
||
228 | return '' |
||
229 | |||
230 | @property |
||
231 | def xml(self): |
||
232 | try: |
||
233 | return self._get_property('xml') |
||
234 | except Exception: |
||
235 | return '' |
||
236 | |||
237 | @property |
||
238 | def xml_types(self): |
||
239 | try: |
||
240 | return self._get_property('entity_types') |
||
241 | except Exception: |
||
242 | return [] |
||
243 | |||
244 | @property |
||
245 | def entity_categories(self): |
||
246 | if self.curfed: |
||
247 | efed = Entity_Federations.objects.get_or_create(federation=self.curfed, entity=self)[0] |
||
248 | return efed.entity_categories |
||
249 | return None |
||
250 | |||
251 | @property |
||
252 | def xml_categories(self): |
||
253 | try: |
||
254 | return self._get_property('entity_categories') |
||
255 | except Exception: |
||
256 | return [] |
||
257 | |||
258 | @property |
||
259 | def display_protocols(self): |
||
260 | protocols = [] |
||
261 | |||
262 | xml_protocols = self.protocols |
||
263 | if xml_protocols: |
||
264 | for proto in xml_protocols.split(' '): |
||
265 | protocols.append(self.READABLE_PROTOCOLS.get(proto, proto)) |
||
266 | |||
267 | return protocols |
||
268 | |||
269 | def display_attributes(self): |
||
270 | attributes = {} |
||
271 | for [attr, friendly] in self.attributes: |
||
272 | if friendly: |
||
273 | attributes[attr] = friendly |
||
274 | elif attr in attributemap.MAP['fro']: |
||
275 | attributes[attr] = attributemap.MAP['fro'][attr] |
||
276 | else: |
||
277 | attributes[attr] = '?' |
||
278 | return attributes |
||
279 | |||
280 | def display_attributes_optional(self): |
||
281 | attributes = {} |
||
282 | for [attr, friendly] in self.attributes_optional: |
||
283 | if friendly: |
||
284 | attributes[attr] = friendly |
||
285 | elif attr in attributemap.MAP['fro']: |
||
286 | attributes[attr] = attributemap.MAP['fro'][attr] |
||
287 | else: |
||
288 | attributes[attr] = '?' |
||
289 | return attributes |
||
290 | |||
291 | @property |
||
292 | def contacts(self): |
||
293 | contacts = [] |
||
294 | for cur_contact in self._get_property('contacts'): |
||
295 | if cur_contact['name'] and cur_contact['surname']: |
||
296 | contact_name = '%s %s' % ( |
||
297 | cur_contact['name'], cur_contact['surname']) |
||
298 | elif cur_contact['name']: |
||
299 | contact_name = cur_contact['name'] |
||
300 | elif cur_contact['surname']: |
||
301 | contact_name = cur_contact['surname'] |
||
302 | else: |
||
303 | contact_name = urlparse( |
||
304 | cur_contact['email']).path.partition('?')[0] |
||
305 | c_type = 'undefined' |
||
306 | if cur_contact['type']: |
||
307 | c_type = cur_contact['type'] |
||
308 | contacts.append( |
||
309 | {'name': contact_name, 'email': cur_contact['email'], 'type': c_type}) |
||
310 | return contacts |
||
311 | |||
312 | @property |
||
313 | def logos(self): |
||
314 | logos = [] |
||
315 | for cur_logo in self._get_property('logos'): |
||
316 | cur_logo['external'] = True |
||
317 | logos.append(cur_logo) |
||
318 | |||
319 | return logos |
||
320 | |||
321 | class Meta(object): |
||
322 | verbose_name = _(u'Entity') |
||
323 | verbose_name_plural = _(u'Entities') |
||
324 | |||
325 | def __unicode__(self): |
||
326 | return self.entityid |
||
327 | |||
328 | def load_metadata(self, federation=None, entity_data=None): |
||
329 | if hasattr(self, '_entity_cached'): |
||
330 | return |
||
331 | |||
332 | if self.file: |
||
333 | self._entity_cached = self.load_file().get_entity(self.entityid) |
||
334 | elif federation: |
||
335 | self._entity_cached = federation.get_entity_metadata(self.entityid) |
||
336 | elif entity_data: |
||
337 | self._entity_cached = entity_data |
||
338 | else: |
||
339 | right_fed = None |
||
340 | first_fed = None |
||
341 | for fed in self.federations.all(): |
||
342 | if fed.registration_authority == self.registration_authority: |
||
343 | right_fed = fed |
||
344 | if first_fed is None: |
||
345 | first_fed = fed |
||
346 | |||
347 | if right_fed is not None: |
||
348 | entity_cached = right_fed.get_entity_metadata(self.entityid) |
||
349 | self._entity_cached = entity_cached |
||
350 | else: |
||
351 | entity_cached = first_fed.get_entity_metadata(self.entityid) |
||
352 | self._entity_cached = entity_cached |
||
353 | |||
354 | if not hasattr(self, '_entity_cached'): |
||
355 | raise ValueError("Can't find entity metadata") |
||
356 | |||
357 | def _get_property(self, prop, federation=None): |
||
358 | try: |
||
359 | self.load_metadata(federation or self.curfed) |
||
360 | except ValueError: |
||
361 | return None |
||
362 | |||
363 | if hasattr(self, '_entity_cached'): |
||
364 | return self._entity_cached.get(prop, None) |
||
365 | else: |
||
366 | raise ValueError("Not metadata loaded") |
||
367 | |||
368 | View Code Duplication | def _get_or_create_etypes(self, cached_entity_types): |
|
|
|||
369 | entity_types = [] |
||
370 | cur_cached_types = [t.xmlname for t in self.types.all()] |
||
371 | for etype in self.xml_types: |
||
372 | if etype in cur_cached_types: |
||
373 | break |
||
374 | |||
375 | if cached_entity_types is None: |
||
376 | entity_type, _ = EntityType.objects.get_or_create(xmlname=etype, |
||
377 | name=DESCRIPTOR_TYPES_DISPLAY[etype]) |
||
378 | else: |
||
379 | if etype in cached_entity_types: |
||
380 | entity_type = cached_entity_types[etype] |
||
381 | else: |
||
382 | entity_type = EntityType.objects.create(xmlname=etype, |
||
383 | name=DESCRIPTOR_TYPES_DISPLAY[etype]) |
||
384 | entity_types.append(entity_type) |
||
385 | return entity_types |
||
386 | |||
387 | def process_metadata(self, auto_save=True, entity_data=None, cached_entity_types=None, federation=None): |
||
388 | if not entity_data: |
||
389 | self.load_metadata() |
||
390 | |||
391 | if self.entityid.lower() != entity_data.get('entityid').lower(): |
||
392 | raise ValueError("EntityID is not the same: %s != %s" % ( |
||
393 | self.entityid.lower(), entity_data.get('entityid').lower())) |
||
394 | |||
395 | self._entity_cached = entity_data |
||
396 | |||
397 | if self.xml_types: |
||
398 | entity_types = self._get_or_create_etypes(cached_entity_types) |
||
399 | if len(entity_types) > 0: |
||
400 | self.types.add(*entity_types) |
||
401 | |||
402 | newname = self._get_property('displayName') |
||
403 | if newname and newname != '': |
||
404 | self.name = newname |
||
405 | |||
406 | self.certstats = self._get_property('certstats') |
||
407 | |||
408 | newprotocols = self.protocols |
||
409 | if newprotocols and newprotocols != "": |
||
410 | self._display_protocols = newprotocols |
||
411 | |||
412 | if str(self._get_property('registration_authority')) != '': |
||
413 | self.registration_authority = self._get_property( |
||
414 | 'registration_authority') |
||
415 | |||
416 | if auto_save: |
||
417 | self.save() |
||
418 | |||
419 | def to_dict(self): |
||
420 | self.load_metadata() |
||
421 | |||
422 | entity = self._entity_cached.copy() |
||
423 | entity["types"] = [unicode(f) for f in self.types.all()] |
||
424 | entity["federations"] = [{u"name": unicode(f), u"url": f.get_absolute_url()} |
||
425 | for f in self.federations.all()] |
||
426 | |||
427 | if self.registration_authority: |
||
428 | entity["registration_authority"] = self.registration_authority |
||
429 | if self.registration_instant: |
||
430 | entity["registration_instant"] = '%s' % self.registration_instant |
||
431 | |||
432 | if "file_id" in entity.keys(): |
||
433 | del entity["file_id"] |
||
434 | if "entity_types" in entity.keys(): |
||
435 | del entity["entity_types"] |
||
436 | |||
437 | return entity |
||
438 | |||
439 | def display_etype(value, separator=', '): |
||
440 | return separator.join([unicode(item) for item in value.all()]) |
||
441 | |||
442 | @classmethod |
||
443 | def get_most_federated_entities(self, maxlength=TOP_LENGTH, cache_expire=None): |
||
444 | entities = None |
||
445 | if cache_expire: |
||
446 | entities = cache.get("most_federated_entities") |
||
447 | |||
448 | if not entities or len(entities) < maxlength: |
||
449 | # Entities with count how many federations belongs to, and sorted |
||
450 | # by most first |
||
451 | ob_entities = Entity.objects.all().annotate( |
||
452 | federationslength=Count("federations")).order_by("-federationslength") |
||
453 | ob_entities = ob_entities.prefetch_related('types', 'federations') |
||
454 | ob_entities = ob_entities[:maxlength] |
||
455 | |||
456 | entities = [] |
||
457 | for entity in ob_entities: |
||
458 | entities.append({ |
||
459 | 'entityid': entity.entityid, |
||
460 | 'name': entity.name, |
||
461 | 'absolute_url': entity.get_absolute_url(), |
||
462 | 'types': [unicode(item) for item in entity.types.all()], |
||
463 | 'federations': [(unicode(item.name), item.get_absolute_url()) for item in entity.federations.all()], |
||
464 | }) |
||
465 | |||
466 | if cache_expire: |
||
467 | cache.set("most_federated_entities", entities, cache_expire) |
||
468 | |||
469 | return entities[:maxlength] |
||
470 | |||
471 | def get_absolute_url(self): |
||
472 | return reverse('entity_view', args=[quote_plus(self.entityid.encode('utf-8'))]) |
||
473 | |||
474 | def can_edit(self, user, delete): |
||
475 | permission = 'delete_entity' if delete else 'change_entity' |
||
476 | if user.is_superuser or (user.has_perm('metadataparser.%s' % permission) and user in self.editor_users.all()): |
||
477 | return True |
||
478 | |||
479 | for federation in self.federations.all(): |
||
480 | if federation.can_edit(user, False): |
||
481 | return True |
||
482 | |||
483 | return False |
||
484 | |||
485 | def has_changed(self, entityid, name, registration_authority, certstats, display_protocols): |
||
486 | if self.entityid != entityid: |
||
487 | return True |
||
488 | if self.name != name: |
||
489 | return True |
||
490 | if self.registration_authority != registration_authority: |
||
491 | return True |
||
492 | if self.certstats != certstats: |
||
493 | return True |
||
494 | if self._display_protocols != display_protocols: |
||
495 | return True |
||
496 | |||
497 | return False |
||
498 |