| Total Complexity | 65 |
| Total Lines | 409 |
| Duplicated Lines | 6.36 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like atramhasis.views.views often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | import os |
||
| 2 | |||
| 3 | from pyramid.response import Response |
||
| 4 | from pyramid.response import FileResponse |
||
| 5 | from pyramid.view import view_config, view_defaults |
||
| 6 | from pyramid.httpexceptions import HTTPFound |
||
| 7 | from pyramid.threadlocal import get_current_registry |
||
| 8 | from pyramid.i18n import TranslationStringFactory |
||
| 9 | from sqlalchemy.orm.exc import NoResultFound |
||
| 10 | from skosprovider_sqlalchemy.models import Collection, Concept, LabelType, NoteType |
||
| 11 | |||
| 12 | from atramhasis.errors import SkosRegistryNotFoundException, ConceptSchemeNotFoundException, ConceptNotFoundException |
||
| 13 | from atramhasis.utils import update_last_visited_concepts |
||
| 14 | from atramhasis.cache import tree_region, invalidate_scheme_cache, invalidate_cache, list_region |
||
| 15 | from atramhasis.audit import audit |
||
| 16 | |||
| 17 | |||
| 18 | def labels_to_string(labels, ltype): |
||
| 19 | labelstring = '' |
||
| 20 | for label in (l for l in labels if l.labeltype_id == ltype): |
||
| 21 | labelstring += label.label + ' (' + label.language_id + '), ' |
||
| 22 | return labelstring[:-2] |
||
| 23 | |||
| 24 | |||
| 25 | def get_definition(notes): |
||
| 26 | for note in notes: |
||
| 27 | if note.notetype_id == 'definition': |
||
| 28 | return note.note |
||
| 29 | |||
| 30 | |||
| 31 | def sort_by_labels(concepts, locale, reverse=False): |
||
| 32 | return sorted((x for x in concepts if x.label(locale)), |
||
|
1 ignored issue
–
show
|
|||
| 33 | reverse=reverse, |
||
| 34 | key=lambda child: child.label(locale).label.lower() |
||
| 35 | ) + [x for x in concepts if not x.label(locale)] |
||
| 36 | |||
| 37 | |||
| 38 | @view_defaults(accept='text/html') |
||
| 39 | class AtramhasisView: |
||
| 40 | """ |
||
| 41 | This object groups HTML views part of the public user interface. |
||
| 42 | """ |
||
| 43 | |||
| 44 | def __init__(self, request): |
||
| 45 | self.request = request |
||
| 46 | self.skos_manager = self.request.data_managers['skos_manager'] |
||
| 47 | self.conceptscheme_manager = self.request.data_managers['conceptscheme_manager'] |
||
| 48 | if hasattr(request, 'skos_registry') and request.skos_registry is not None: |
||
| 49 | self.skos_registry = self.request.skos_registry |
||
| 50 | else: |
||
| 51 | raise SkosRegistryNotFoundException() |
||
| 52 | |||
| 53 | def _read_request_param(self, param): |
||
| 54 | value = None |
||
| 55 | if param in self.request.params: |
||
| 56 | value = self.request.params.getone(param).strip() |
||
| 57 | if not value: |
||
| 58 | value = None # pragma: no cover |
||
| 59 | return value |
||
| 60 | |||
| 61 | @view_config(name='favicon.ico') |
||
| 62 | def favicon_view(self): |
||
| 63 | """ |
||
| 64 | This view returns the favicon when requested from the web root. |
||
| 65 | """ |
||
| 66 | here = os.path.dirname(__file__) |
||
| 67 | icon = os.path.join(os.path.dirname(here), 'static', 'img', 'favicon.ico') |
||
| 68 | response = FileResponse( |
||
| 69 | icon, |
||
| 70 | request=self.request, |
||
| 71 | content_type='image/x-icon' |
||
| 72 | ) |
||
| 73 | return response |
||
| 74 | |||
| 75 | View Code Duplication | @view_config(route_name='home', renderer='atramhasis:templates/home.jinja2') |
|
| 76 | def home_view(self): |
||
| 77 | """ |
||
| 78 | This view displays the homepage. |
||
| 79 | """ |
||
| 80 | conceptschemes = [ |
||
| 81 | {'id': x.get_metadata()['id'], |
||
| 82 | 'conceptscheme': x.concept_scheme} |
||
| 83 | for x in self.skos_registry.get_providers() if not any([not_shown in x.get_metadata()['subject'] |
||
| 84 | for not_shown in ['external', 'hidden']]) |
||
| 85 | ] |
||
| 86 | |||
| 87 | return {'conceptschemes': conceptschemes} |
||
| 88 | |||
| 89 | View Code Duplication | @view_config(route_name='conceptschemes', renderer='atramhasis:templates/conceptschemes.jinja2') |
|
| 90 | def conceptschemes_view(self): |
||
| 91 | """ |
||
| 92 | This view displays a list of available conceptschemes. |
||
| 93 | """ |
||
| 94 | conceptschemes = [ |
||
| 95 | {'id': x.get_metadata()['id'], |
||
| 96 | 'conceptscheme': x.concept_scheme} |
||
| 97 | for x in self.skos_registry.get_providers() if not any([not_shown in x.get_metadata()['subject'] |
||
| 98 | for not_shown in ['external', 'hidden']]) |
||
| 99 | ] |
||
| 100 | |||
| 101 | return {'conceptschemes': conceptschemes} |
||
| 102 | |||
| 103 | @audit |
||
| 104 | @view_config(route_name='conceptscheme', renderer='atramhasis:templates/conceptscheme.jinja2') |
||
| 105 | def conceptscheme_view(self): |
||
| 106 | """ |
||
| 107 | This view displays conceptscheme details. |
||
| 108 | """ |
||
| 109 | conceptschemes = [ |
||
| 110 | {'id': x.get_metadata()['id'], |
||
| 111 | 'conceptscheme': x.concept_scheme} |
||
| 112 | for x in self.skos_registry.get_providers() if not any([not_shown in x.get_metadata()['subject'] |
||
| 113 | for not_shown in ['external', 'hidden']]) |
||
| 114 | ] |
||
| 115 | |||
| 116 | scheme_id = self.request.matchdict['scheme_id'] |
||
| 117 | provider = self.request.skos_registry.get_provider(scheme_id) |
||
| 118 | conceptscheme = provider.concept_scheme |
||
| 119 | if 'atramhasis.force_display_label_language' in provider.metadata: |
||
| 120 | locale = provider.metadata['atramhasis.force_display_label_language'] |
||
| 121 | else: |
||
| 122 | locale = self.request.locale_name |
||
| 123 | title = (conceptscheme.label(locale).label if (conceptscheme.label()) |
||
| 124 | else scheme_id) |
||
| 125 | |||
| 126 | scheme = { |
||
| 127 | 'scheme_id': scheme_id, |
||
| 128 | 'title': title, |
||
| 129 | 'uri': conceptscheme.uri, |
||
| 130 | 'labels': conceptscheme.labels, |
||
| 131 | 'notes': conceptscheme.notes, |
||
| 132 | 'sources': conceptscheme.sources, |
||
| 133 | 'top_concepts': provider.get_top_concepts(), |
||
| 134 | } |
||
| 135 | |||
| 136 | return {'conceptscheme': scheme, 'conceptschemes': conceptschemes, |
||
| 137 | 'locale': locale} |
||
| 138 | |||
| 139 | @audit |
||
| 140 | @view_config(route_name='concept', renderer='atramhasis:templates/concept.jinja2') |
||
| 141 | def concept_view(self): |
||
| 142 | """ |
||
| 143 | This view displays the concept details |
||
| 144 | """ |
||
| 145 | conceptschemes = [ |
||
| 146 | {'id': x.get_metadata()['id'], |
||
| 147 | 'conceptscheme': x.concept_scheme} |
||
| 148 | for x in self.skos_registry.get_providers() if not any([not_shown in x.get_metadata()['subject'] |
||
| 149 | for not_shown in ['external', 'hidden']]) |
||
| 150 | ] |
||
| 151 | |||
| 152 | scheme_id = self.request.matchdict['scheme_id'] |
||
| 153 | c_id = self.request.matchdict['c_id'] |
||
| 154 | provider = self.request.skos_registry.get_provider(scheme_id) |
||
| 155 | label = self._read_request_param('label') |
||
| 156 | requested_type = self._read_request_param('type') |
||
| 157 | |||
| 158 | if not provider: |
||
| 159 | raise ConceptSchemeNotFoundException(scheme_id) |
||
| 160 | if 'atramhasis.force_display_label_language' in provider.metadata: |
||
| 161 | locale = provider.metadata['atramhasis.force_display_label_language'] |
||
| 162 | else: |
||
| 163 | locale = self.request.locale_name |
||
| 164 | try: |
||
| 165 | c = self.skos_manager.get_thing(c_id, provider.conceptscheme_id) |
||
| 166 | if isinstance(c, Concept): |
||
| 167 | concept_type = "Concept" |
||
| 168 | elif isinstance(c, Collection): |
||
| 169 | concept_type = "Collection" |
||
| 170 | else: |
||
| 171 | return Response('Thing without type: ' + str(c_id), status_int=500) |
||
| 172 | url = self.request.route_url('concept', scheme_id=scheme_id, c_id=c_id) |
||
| 173 | update_last_visited_concepts(self.request, {'label': c.label(locale).label, 'url': url}) |
||
| 174 | return {'concept': c, 'conceptType': concept_type, 'scheme_id': scheme_id, |
||
| 175 | 'conceptschemes': conceptschemes, 'provider': provider, |
||
| 176 | 'locale': locale, 'type': requested_type, 'label': label} |
||
| 177 | except NoResultFound: |
||
| 178 | raise ConceptNotFoundException(c_id) |
||
| 179 | |||
| 180 | @view_config(route_name='search_result', renderer='atramhasis:templates/search_result.jinja2') |
||
| 181 | def search_result(self): |
||
| 182 | """ |
||
| 183 | This view displays the search results |
||
| 184 | """ |
||
| 185 | conceptschemes = [ |
||
| 186 | {'id': x.get_metadata()['id'], |
||
| 187 | 'conceptscheme': x.concept_scheme} |
||
| 188 | for x in self.skos_registry.get_providers() if not any([not_shown in x.get_metadata()['subject'] |
||
| 189 | for not_shown in ['external', 'hidden']]) |
||
| 190 | ] |
||
| 191 | |||
| 192 | scheme_id = self.request.matchdict['scheme_id'] |
||
| 193 | label = self._read_request_param('label') |
||
| 194 | requested_type = self._read_request_param('type') |
||
| 195 | provider = self.skos_registry.get_provider(scheme_id) |
||
| 196 | if provider: |
||
| 197 | if 'atramhasis.force_display_label_language' in provider.metadata: |
||
| 198 | locale = provider.metadata['atramhasis.force_display_label_language'] |
||
| 199 | else: |
||
| 200 | locale = self.request.locale_name |
||
| 201 | if label is not None: |
||
| 202 | concepts = provider.find( |
||
| 203 | {'label': label, 'type': requested_type}, |
||
| 204 | language=locale, |
||
| 205 | sort='label' |
||
| 206 | ) |
||
| 207 | elif (label is None) and (requested_type is not None): |
||
| 208 | concepts = provider.find( |
||
| 209 | {'type': requested_type}, language=locale, sort='label' |
||
| 210 | ) |
||
| 211 | else: |
||
| 212 | concepts = provider.get_all(language=locale, sort='label') |
||
| 213 | return { |
||
| 214 | 'concepts': concepts, |
||
| 215 | 'scheme_id': scheme_id, |
||
| 216 | 'conceptschemes': conceptschemes, |
||
| 217 | 'type': requested_type, |
||
| 218 | 'label': label |
||
| 219 | } |
||
| 220 | return Response(content_type='text/plain', status_int=404) |
||
| 221 | |||
| 222 | @view_config(route_name='locale') |
||
| 223 | def set_locale_cookie(self): |
||
| 224 | """ |
||
| 225 | This view will set a language cookie |
||
| 226 | """ |
||
| 227 | settings = get_current_registry().settings |
||
| 228 | default_lang = settings.get('pyramid.default_locale_name') |
||
| 229 | available_languages = settings.get('available_languages', default_lang).split() |
||
| 230 | [x.lower() for x in available_languages] |
||
| 231 | language = self.request.GET.get('language', default_lang).lower() |
||
| 232 | if language not in available_languages: |
||
| 233 | language = default_lang |
||
| 234 | |||
| 235 | referer = self.request.referer |
||
| 236 | if referer is not None: |
||
| 237 | response = HTTPFound(location=referer) |
||
| 238 | else: |
||
| 239 | response = HTTPFound(location=self.request.route_url('home')) |
||
| 240 | |||
| 241 | response.set_cookie('_LOCALE_', |
||
| 242 | value=language, |
||
| 243 | max_age=31536000) # max_age = year |
||
| 244 | return response |
||
| 245 | |||
| 246 | @audit |
||
| 247 | @view_config(route_name='search_result_export', renderer='csv') |
||
| 248 | def results_csv(self): |
||
| 249 | header = ['conceptscheme', 'id', 'uri', 'type', 'label', 'prefLabels', 'altLabels', 'definition', 'broader', |
||
| 250 | 'narrower', 'related'] |
||
| 251 | rows = [] |
||
| 252 | scheme_id = self.request.matchdict['scheme_id'] |
||
| 253 | label = self._read_request_param('label') |
||
| 254 | requested_type = self._read_request_param('type') |
||
| 255 | provider = self.skos_registry.get_provider(scheme_id) |
||
| 256 | if provider: |
||
| 257 | if label is not None: |
||
| 258 | concepts = self.conceptscheme_manager.find( |
||
| 259 | provider.conceptscheme_id, {'label': label, 'type': requested_type} |
||
| 260 | ) |
||
| 261 | elif (label is None) and (requested_type is not None): |
||
| 262 | concepts = self.conceptscheme_manager.find( |
||
| 263 | provider.conceptscheme_id, {'type': requested_type} |
||
| 264 | ) |
||
| 265 | else: |
||
| 266 | concepts = self.conceptscheme_manager.get_all(provider.conceptscheme_id) |
||
| 267 | for concept in concepts: |
||
| 268 | if concept.type == 'concept': |
||
| 269 | rows.append(( |
||
| 270 | scheme_id, concept.concept_id, concept.uri, concept.type, |
||
| 271 | concept.label(self.request.locale_name).label, |
||
| 272 | labels_to_string(concept.labels, 'prefLabel'), labels_to_string(concept.labels, 'altLabel'), |
||
| 273 | get_definition(concept.notes), [c.concept_id for c in concept.broader_concepts], |
||
| 274 | [c.concept_id for c in concept.narrower_concepts], |
||
| 275 | [c.concept_id for c in concept.related_concepts])) |
||
| 276 | else: |
||
| 277 | rows.append(( |
||
| 278 | scheme_id, concept.concept_id, concept.uri, concept.type, |
||
| 279 | concept.label(self.request.locale_name).label, |
||
| 280 | labels_to_string(concept.labels, 'prefLabel'), labels_to_string(concept.labels, 'altLabel'), |
||
| 281 | get_definition(concept.notes), '', [c.concept_id for c in concept.members], '')) |
||
| 282 | return { |
||
| 283 | 'header': header, |
||
| 284 | 'rows': rows, |
||
| 285 | 'filename': 'atramhasis_export' |
||
| 286 | } |
||
| 287 | |||
| 288 | @view_config(route_name='scheme_tree_html', renderer='scheme_tree.jinja2') |
||
| 289 | @view_config(route_name='scheme_tree', renderer='json', accept='application/json') |
||
| 290 | def results_tree_json(self): |
||
| 291 | scheme_id = self.request.matchdict['scheme_id'] |
||
| 292 | language = self.request.params.get('language') or self.request.locale_name |
||
| 293 | dicts = self.get_results_tree(scheme_id, language) |
||
| 294 | if dicts: |
||
| 295 | if 'text/html' not in self.request.accept: |
||
| 296 | return dicts |
||
| 297 | else: |
||
| 298 | return {'tree': dicts} |
||
| 299 | else: |
||
| 300 | return Response(status_int=404) |
||
| 301 | |||
| 302 | @tree_region.cache_on_arguments() |
||
| 303 | def get_results_tree(self, scheme_id, locale): |
||
| 304 | skostree = self.get_scheme(scheme_id, locale) |
||
| 305 | return [self.parse_thing(thing, None) for thing in skostree] |
||
| 306 | |||
| 307 | def get_scheme(self, scheme, locale): |
||
| 308 | scheme_tree = [] |
||
| 309 | provider = self.skos_registry.get_provider(scheme) |
||
| 310 | if provider: |
||
| 311 | conceptscheme_id = provider.conceptscheme_id |
||
| 312 | |||
| 313 | tco = self.conceptscheme_manager.get_concepts_for_scheme_tree(conceptscheme_id) |
||
| 314 | tcl = self.conceptscheme_manager.get_collections_for_scheme_tree(conceptscheme_id) |
||
| 315 | |||
| 316 | scheme_tree = sort_by_labels(tco, locale) + sort_by_labels(tcl, locale) |
||
| 317 | |||
| 318 | return scheme_tree |
||
| 319 | |||
| 320 | def parse_thing(self, thing, parent_tree_id): |
||
| 321 | tree_id = self.create_treeid(parent_tree_id, thing.concept_id) |
||
| 322 | locale = self.request.params.get('language', self.request.locale_name) |
||
| 323 | |||
| 324 | if thing.type and thing.type == 'collection': |
||
| 325 | cs = [member for member in thing.members] if hasattr(thing, 'members') else [] |
||
| 326 | else: |
||
| 327 | cs = [c for c in thing.narrower_concepts] |
||
| 328 | cs = cs + [c for c in thing.narrower_collections] |
||
| 329 | |||
| 330 | sortedcs = sort_by_labels(cs, locale) |
||
| 331 | children = [self.parse_thing(c, tree_id) for index, c in enumerate(sortedcs, 1)] |
||
| 332 | dict_thing = { |
||
| 333 | 'id': tree_id, |
||
| 334 | 'concept_id': thing.concept_id, |
||
| 335 | 'type': thing.type, |
||
| 336 | 'label': thing.label(locale).label if thing.label(locale) else None, |
||
| 337 | 'children': children |
||
| 338 | } |
||
| 339 | |||
| 340 | return dict_thing |
||
| 341 | |||
| 342 | @staticmethod |
||
| 343 | def create_treeid(parent_tree_id, concept_id): |
||
| 344 | if parent_tree_id is None: |
||
| 345 | return str(concept_id) |
||
| 346 | else: |
||
| 347 | return parent_tree_id + "." + str(concept_id) |
||
| 348 | |||
| 349 | @view_config(route_name='scheme_root', renderer='atramhasis:templates/concept.jinja2') |
||
| 350 | def results_tree_html(self): |
||
| 351 | scheme_id = self.request.matchdict['scheme_id'] |
||
| 352 | return {'concept': None, 'conceptType': None, 'scheme_id': scheme_id} |
||
| 353 | |||
| 354 | |||
| 355 | @view_defaults(accept='application/json', renderer='json') |
||
| 356 | class AtramhasisListView: |
||
| 357 | """ |
||
| 358 | This object groups list views part for the user interface. |
||
| 359 | """ |
||
| 360 | |||
| 361 | def __init__(self, request): |
||
| 362 | self.request = request |
||
| 363 | self.skos_manager = self.request.data_managers['skos_manager'] |
||
| 364 | self.localizer = request.localizer |
||
| 365 | self._ = TranslationStringFactory('atramhasis') |
||
| 366 | |||
| 367 | @view_config(route_name='labeltypes') |
||
| 368 | def labeltype_list_view(self): |
||
| 369 | return self.get_list(LabelType) |
||
| 370 | |||
| 371 | @view_config(route_name='notetypes') |
||
| 372 | def notetype_list_view(self): |
||
| 373 | return self.get_list(NoteType) |
||
| 374 | |||
| 375 | @list_region.cache_on_arguments() |
||
| 376 | def get_list(self, listtype): |
||
| 377 | return [{"key": ltype.name, "label": self.localizer.translate(self._(ltype.name))} |
||
| 378 | for ltype in self.skos_manager.get_by_list_type(listtype)] |
||
| 379 | |||
| 380 | |||
| 381 | @view_defaults(accept='text/html') |
||
| 382 | class AtramhasisAdminView: |
||
| 383 | """ |
||
| 384 | This object groups HTML views part of the admin user interface. |
||
| 385 | """ |
||
| 386 | |||
| 387 | def __init__(self, request): |
||
| 388 | self.request = request |
||
| 389 | self.logged_in = request.authenticated_userid |
||
| 390 | if hasattr(request, 'skos_registry') and request.skos_registry is not None: |
||
| 391 | self.skos_registry = self.request.skos_registry |
||
| 392 | else: |
||
| 393 | raise SkosRegistryNotFoundException() |
||
| 394 | |||
| 395 | @view_config(route_name='admin', renderer='atramhasis:templates/admin.jinja2', permission='edit') |
||
| 396 | def admin_view(self): |
||
| 397 | return {'admin': None} |
||
| 398 | |||
| 399 | @view_config(route_name='scheme_tree_invalidate', renderer='json', accept='application/json', permission='edit') |
||
| 400 | def invalidate_scheme_tree(self): |
||
| 401 | scheme_id = self.request.matchdict['scheme_id'] |
||
| 402 | invalidate_scheme_cache(scheme_id) |
||
| 403 | return Response(status_int=200) |
||
| 404 | |||
| 405 | @view_config(route_name='tree_invalidate', renderer='json', accept='application/json', permission='edit') |
||
| 406 | def invalidate_tree(self): |
||
| 407 | invalidate_cache() |
||
| 408 | return Response(status_int=200) |
||
| 409 |