Total Complexity | 66 |
Total Lines | 419 |
Duplicated Lines | 0 % |
Changes | 0 |
Complex classes like atramhasis.views.views often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | import os |
||
2 | |||
3 | from pyramid.httpexceptions import HTTPFound |
||
4 | from pyramid.i18n import TranslationStringFactory |
||
5 | from pyramid.response import FileResponse |
||
6 | from pyramid.response import Response |
||
7 | from pyramid.threadlocal import get_current_registry |
||
8 | from pyramid.view import view_config |
||
9 | from pyramid.view import view_defaults |
||
10 | from skosprovider_sqlalchemy.models import Collection |
||
11 | from skosprovider_sqlalchemy.models import Concept |
||
12 | from skosprovider_sqlalchemy.models import LabelType |
||
13 | from skosprovider_sqlalchemy.models import NoteType |
||
14 | from sqlalchemy.exc import NoResultFound |
||
15 | |||
16 | from atramhasis.audit import audit |
||
17 | from atramhasis.cache import invalidate_cache |
||
18 | from atramhasis.cache import invalidate_scheme_cache |
||
19 | from atramhasis.cache import list_region |
||
20 | from atramhasis.cache import tree_region |
||
21 | from atramhasis.errors import ConceptNotFoundException |
||
22 | from atramhasis.errors import ConceptSchemeNotFoundException |
||
23 | from atramhasis.errors import SkosRegistryNotFoundException |
||
24 | from atramhasis.utils import update_last_visited_concepts |
||
25 | |||
26 | |||
27 | def labels_to_string(labels, ltype): |
||
28 | labelstring = '' |
||
29 | for label in (label for label in labels if label.labeltype_id == ltype): |
||
30 | labelstring += label.label + ' (' + label.language_id + '), ' |
||
31 | return labelstring[:-2] |
||
32 | |||
33 | |||
34 | def get_definition(notes): |
||
35 | for note in notes: |
||
36 | if note.notetype_id == 'definition': |
||
37 | return note.note |
||
38 | |||
39 | |||
40 | def sort_by_labels(concepts, locale, reverse=False): |
||
41 | return sorted((x for x in concepts if x.label(locale)), |
||
1 ignored issue
–
show
|
|||
42 | reverse=reverse, |
||
43 | key=lambda child: child.label(locale).label.lower() |
||
44 | ) + [x for x in concepts if not x.label(locale)] |
||
45 | |||
46 | |||
47 | @view_defaults(accept='text/html') |
||
48 | class AtramhasisView: |
||
49 | """ |
||
50 | This object groups HTML views part of the public user interface. |
||
51 | """ |
||
52 | |||
53 | def __init__(self, request): |
||
54 | self.request = request |
||
55 | self.skos_manager = self.request.data_managers['skos_manager'] |
||
56 | self.conceptscheme_manager = self.request.data_managers['conceptscheme_manager'] |
||
57 | if hasattr(request, 'skos_registry') and request.skos_registry is not None: |
||
58 | self.skos_registry = self.request.skos_registry |
||
59 | else: |
||
60 | raise SkosRegistryNotFoundException() |
||
61 | |||
62 | def _read_request_param(self, param): |
||
63 | value = None |
||
64 | if param in self.request.params: |
||
65 | value = self.request.params.getone(param).strip() |
||
66 | if not value: |
||
67 | value = None # pragma: no cover |
||
68 | return value |
||
69 | |||
70 | @view_config(name='favicon.ico') |
||
71 | def favicon_view(self): |
||
72 | """ |
||
73 | Return the favicon when requested from the web root. |
||
74 | """ |
||
75 | here = os.path.dirname(__file__) |
||
76 | icon = os.path.join(os.path.dirname(here), 'static', 'img', 'favicon.ico') |
||
77 | response = FileResponse( |
||
78 | icon, |
||
79 | request=self.request, |
||
80 | content_type='image/x-icon' |
||
81 | ) |
||
82 | return response |
||
83 | |||
84 | def _get_public_conceptschemes(self): |
||
85 | """ |
||
86 | Get all conceptschemes that are visible through the public UI. |
||
87 | """ |
||
88 | conceptschemes = [ |
||
89 | {'id': x.get_metadata()['id'], |
||
90 | 'conceptscheme': x.concept_scheme} |
||
91 | for x in self.skos_registry.get_providers() if not any([not_shown in x.get_metadata()['subject'] |
||
92 | for not_shown in ['external', 'hidden']]) |
||
93 | ] |
||
94 | |||
95 | return conceptschemes |
||
96 | |||
97 | @view_config(route_name='home', renderer='atramhasis:templates/home.jinja2') |
||
98 | def home_view(self): |
||
99 | """ |
||
100 | Display the homepage. |
||
101 | """ |
||
102 | conceptschemes = self._get_public_conceptschemes() |
||
103 | |||
104 | return {'conceptschemes': conceptschemes} |
||
105 | |||
106 | @view_config(route_name='conceptschemes', renderer='atramhasis:templates/conceptschemes.jinja2') |
||
107 | def conceptschemes_view(self): |
||
108 | """ |
||
109 | Display a list of available conceptschemes. |
||
110 | """ |
||
111 | conceptschemes = self._get_public_conceptschemes() |
||
112 | |||
113 | return {'conceptschemes': conceptschemes} |
||
114 | |||
115 | @audit |
||
116 | @view_config(route_name='conceptscheme', renderer='atramhasis:templates/conceptscheme.jinja2') |
||
117 | def conceptscheme_view(self): |
||
118 | """ |
||
119 | Display a single conceptscheme. |
||
120 | """ |
||
121 | conceptschemes = self._get_public_conceptschemes() |
||
122 | |||
123 | scheme_id = self.request.matchdict['scheme_id'] |
||
124 | provider = self.request.skos_registry.get_provider(scheme_id) |
||
125 | conceptscheme = provider.concept_scheme |
||
126 | if 'atramhasis.force_display_label_language' in provider.metadata: |
||
127 | locale = provider.metadata['atramhasis.force_display_label_language'] |
||
128 | else: |
||
129 | locale = self.request.locale_name |
||
130 | title = (conceptscheme.label(locale).label if (conceptscheme.label()) |
||
131 | else scheme_id) |
||
132 | |||
133 | scheme = { |
||
134 | 'scheme_id': scheme_id, |
||
135 | 'title': title, |
||
136 | 'uri': conceptscheme.uri, |
||
137 | 'labels': conceptscheme.labels, |
||
138 | 'notes': conceptscheme.notes, |
||
139 | 'sources': conceptscheme.sources, |
||
140 | 'top_concepts': provider.get_top_concepts(), |
||
141 | } |
||
142 | |||
143 | return {'conceptscheme': scheme, 'conceptschemes': conceptschemes, |
||
144 | 'locale': locale} |
||
145 | |||
146 | @audit |
||
147 | @view_config(route_name='concept', renderer='atramhasis:templates/concept.jinja2') |
||
148 | def concept_view(self): |
||
149 | """ |
||
150 | Display all about a single concept or collection. |
||
151 | """ |
||
152 | conceptschemes = [ |
||
153 | {'id': x.get_metadata()['id'], |
||
154 | 'conceptscheme': x.concept_scheme} |
||
155 | for x in self.skos_registry.get_providers() if not any([not_shown in x.get_metadata()['subject'] |
||
156 | for not_shown in ['external', 'hidden']]) |
||
157 | ] |
||
158 | |||
159 | scheme_id = self.request.matchdict['scheme_id'] |
||
160 | c_id = self.request.matchdict['c_id'] |
||
161 | provider = self.request.skos_registry.get_provider(scheme_id) |
||
162 | label = self._read_request_param('label') |
||
163 | requested_type = self._read_request_param('type') |
||
164 | |||
165 | if not provider: |
||
166 | raise ConceptSchemeNotFoundException(scheme_id) |
||
167 | if 'atramhasis.force_display_label_language' in provider.metadata: |
||
168 | locale = provider.metadata['atramhasis.force_display_label_language'] |
||
169 | else: |
||
170 | locale = self.request.locale_name |
||
171 | try: |
||
172 | c = self.skos_manager.get_thing(c_id, provider.conceptscheme_id) |
||
173 | if isinstance(c, Concept): |
||
174 | concept_type = "Concept" |
||
175 | elif isinstance(c, Collection): |
||
176 | concept_type = "Collection" |
||
177 | else: |
||
178 | return Response('Thing without type: ' + str(c_id), status_int=500) |
||
179 | url = self.request.route_url('concept', scheme_id=scheme_id, c_id=c_id) |
||
180 | update_last_visited_concepts(self.request, {'label': c.label(locale).label, 'url': url}) |
||
181 | return {'concept': c, 'conceptType': concept_type, 'scheme_id': scheme_id, |
||
182 | 'conceptschemes': conceptschemes, 'provider': provider, |
||
183 | 'locale': locale, 'type': requested_type, 'label': label} |
||
184 | except NoResultFound: |
||
185 | raise ConceptNotFoundException(c_id) |
||
186 | |||
187 | @view_config(route_name='search_result', renderer='atramhasis:templates/search_result.jinja2') |
||
188 | def search_result(self): |
||
189 | """ |
||
190 | Display search results |
||
191 | """ |
||
192 | conceptschemes = [ |
||
193 | {'id': x.get_metadata()['id'], |
||
194 | 'conceptscheme': x.concept_scheme} |
||
195 | for x in self.skos_registry.get_providers() if not any([not_shown in x.get_metadata()['subject'] |
||
196 | for not_shown in ['external', 'hidden']]) |
||
197 | ] |
||
198 | |||
199 | scheme_id = self.request.matchdict['scheme_id'] |
||
200 | label = self._read_request_param('label') |
||
201 | requested_type = self._read_request_param('type') |
||
202 | provider = self.skos_registry.get_provider(scheme_id) |
||
203 | if provider: |
||
204 | if 'atramhasis.force_display_label_language' in provider.metadata: |
||
205 | locale = provider.metadata['atramhasis.force_display_label_language'] |
||
206 | else: |
||
207 | locale = self.request.locale_name |
||
208 | if label is not None: |
||
209 | concepts = provider.find( |
||
210 | {'label': label, 'type': requested_type}, |
||
211 | language=locale, |
||
212 | sort='label' |
||
213 | ) |
||
214 | elif (label is None) and (requested_type is not None): |
||
215 | concepts = provider.find( |
||
216 | {'type': requested_type}, language=locale, sort='label' |
||
217 | ) |
||
218 | else: |
||
219 | concepts = provider.get_all(language=locale, sort='label') |
||
220 | return { |
||
221 | 'concepts': concepts, |
||
222 | 'scheme_id': scheme_id, |
||
223 | 'conceptschemes': conceptschemes, |
||
224 | 'type': requested_type, |
||
225 | 'label': label |
||
226 | } |
||
227 | return Response(content_type='text/plain', status_int=404) |
||
228 | |||
229 | @view_config(route_name='locale') |
||
230 | def set_locale_cookie(self): |
||
231 | """ |
||
232 | Set a language cookie to remember what language a user requested. |
||
233 | """ |
||
234 | settings = get_current_registry().settings |
||
235 | default_lang = settings.get('pyramid.default_locale_name') |
||
236 | available_languages = settings.get('available_languages', default_lang).split() |
||
237 | [x.lower() for x in available_languages] |
||
238 | language = self.request.GET.get('language', default_lang).lower() |
||
239 | if language not in available_languages: |
||
240 | language = default_lang |
||
241 | |||
242 | referer = self.request.referer |
||
243 | if referer is not None: |
||
244 | response = HTTPFound(location=referer) |
||
245 | else: |
||
246 | response = HTTPFound(location=self.request.route_url('home')) |
||
247 | |||
248 | response.set_cookie('_LOCALE_', |
||
249 | value=language, |
||
250 | max_age=31536000) # max_age = year |
||
251 | return response |
||
252 | |||
253 | @audit |
||
254 | @view_config(route_name='search_result_export', renderer='csv') |
||
255 | def results_csv(self): |
||
256 | """ |
||
257 | Download search results in CSV format, allowing further processing. |
||
258 | """ |
||
259 | header = ['conceptscheme', 'id', 'uri', 'type', 'label', 'prefLabels', 'altLabels', 'definition', 'broader', |
||
260 | 'narrower', 'related'] |
||
261 | rows = [] |
||
262 | scheme_id = self.request.matchdict['scheme_id'] |
||
263 | label = self._read_request_param('label') |
||
264 | requested_type = self._read_request_param('type') |
||
265 | provider = self.skos_registry.get_provider(scheme_id) |
||
266 | if provider: |
||
267 | if label is not None: |
||
268 | concepts = self.conceptscheme_manager.find( |
||
269 | provider.conceptscheme_id, {'label': label, 'type': requested_type} |
||
270 | ) |
||
271 | elif (label is None) and (requested_type is not None): |
||
272 | concepts = self.conceptscheme_manager.find( |
||
273 | provider.conceptscheme_id, {'type': requested_type} |
||
274 | ) |
||
275 | else: |
||
276 | concepts = self.conceptscheme_manager.get_all(provider.conceptscheme_id) |
||
277 | for concept in concepts: |
||
278 | if concept.type == 'concept': |
||
279 | rows.append(( |
||
280 | scheme_id, concept.concept_id, concept.uri, concept.type, |
||
281 | concept.label(self.request.locale_name).label, |
||
282 | labels_to_string(concept.labels, 'prefLabel'), labels_to_string(concept.labels, 'altLabel'), |
||
283 | get_definition(concept.notes), [c.concept_id for c in concept.broader_concepts], |
||
284 | [c.concept_id for c in concept.narrower_concepts], |
||
285 | [c.concept_id for c in concept.related_concepts])) |
||
286 | else: |
||
287 | rows.append(( |
||
288 | scheme_id, concept.concept_id, concept.uri, concept.type, |
||
289 | concept.label(self.request.locale_name).label, |
||
290 | labels_to_string(concept.labels, 'prefLabel'), labels_to_string(concept.labels, 'altLabel'), |
||
291 | get_definition(concept.notes), '', [c.concept_id for c in concept.members], '')) |
||
292 | return { |
||
293 | 'header': header, |
||
294 | 'rows': rows, |
||
295 | 'filename': 'atramhasis_export' |
||
296 | } |
||
297 | |||
298 | @view_config(route_name='scheme_tree_html', renderer='scheme_tree.jinja2') |
||
299 | @view_config(route_name='scheme_tree', renderer='json', accept='application/json') |
||
300 | def results_tree_json(self): |
||
301 | scheme_id = self.request.matchdict['scheme_id'] |
||
302 | language = self.request.params.get('language') or self.request.locale_name |
||
303 | dicts = self.get_results_tree(scheme_id, language) |
||
304 | if dicts: |
||
305 | if 'text/html' not in self.request.accept: |
||
306 | return dicts |
||
307 | else: |
||
308 | return {'tree': dicts} |
||
309 | else: |
||
310 | return Response(status_int=404) |
||
311 | |||
312 | @tree_region.cache_on_arguments() |
||
313 | def get_results_tree(self, scheme_id, locale): |
||
314 | skostree = self.get_scheme(scheme_id, locale) |
||
315 | return [self.parse_thing(thing, None) for thing in skostree] |
||
316 | |||
317 | def get_scheme(self, scheme, locale): |
||
318 | scheme_tree = [] |
||
319 | provider = self.skos_registry.get_provider(scheme) |
||
320 | if provider: |
||
321 | conceptscheme_id = provider.conceptscheme_id |
||
322 | |||
323 | tco = self.conceptscheme_manager.get_concepts_for_scheme_tree(conceptscheme_id) |
||
324 | tcl = self.conceptscheme_manager.get_collections_for_scheme_tree(conceptscheme_id) |
||
325 | |||
326 | scheme_tree = sort_by_labels(tco, locale) + sort_by_labels(tcl, locale) |
||
327 | |||
328 | return scheme_tree |
||
329 | |||
330 | def parse_thing(self, thing, parent_tree_id): |
||
331 | tree_id = self.create_treeid(parent_tree_id, thing.concept_id) |
||
332 | locale = self.request.params.get('language', self.request.locale_name) |
||
333 | |||
334 | if thing.type and thing.type == 'collection': |
||
335 | cs = [member for member in thing.members] if hasattr(thing, 'members') else [] |
||
336 | else: |
||
337 | cs = [c for c in thing.narrower_concepts] |
||
338 | cs = cs + [c for c in thing.narrower_collections] |
||
339 | |||
340 | sortedcs = sort_by_labels(cs, locale) |
||
341 | children = [self.parse_thing(c, tree_id) for index, c in enumerate(sortedcs, 1)] |
||
342 | dict_thing = { |
||
343 | 'id': tree_id, |
||
344 | 'concept_id': thing.concept_id, |
||
345 | 'type': thing.type, |
||
346 | 'label': thing.label(locale).label if thing.label(locale) else None, |
||
347 | 'children': children |
||
348 | } |
||
349 | |||
350 | return dict_thing |
||
351 | |||
352 | @staticmethod |
||
353 | def create_treeid(parent_tree_id, concept_id): |
||
354 | if parent_tree_id is None: |
||
355 | return str(concept_id) |
||
356 | else: |
||
357 | return parent_tree_id + "." + str(concept_id) |
||
358 | |||
359 | @view_config(route_name='scheme_root', renderer='atramhasis:templates/concept.jinja2') |
||
360 | def results_tree_html(self): |
||
361 | scheme_id = self.request.matchdict['scheme_id'] |
||
362 | return {'concept': None, 'conceptType': None, 'scheme_id': scheme_id} |
||
363 | |||
364 | |||
365 | @view_defaults(accept='application/json', renderer='json') |
||
366 | class AtramhasisListView: |
||
367 | """ |
||
368 | This object groups list views part for the user interface. |
||
369 | """ |
||
370 | |||
371 | def __init__(self, request): |
||
372 | self.request = request |
||
373 | self.skos_manager = self.request.data_managers['skos_manager'] |
||
374 | self.localizer = request.localizer |
||
375 | self._ = TranslationStringFactory('atramhasis') |
||
376 | |||
377 | @view_config(route_name='labeltypes') |
||
378 | def labeltype_list_view(self): |
||
379 | return self.get_list(LabelType) |
||
380 | |||
381 | @view_config(route_name='notetypes') |
||
382 | def notetype_list_view(self): |
||
383 | return self.get_list(NoteType) |
||
384 | |||
385 | @list_region.cache_on_arguments() |
||
386 | def get_list(self, listtype): |
||
387 | return [{"key": ltype.name, "label": self.localizer.translate(self._(ltype.name))} |
||
388 | for ltype in self.skos_manager.get_by_list_type(listtype)] |
||
389 | |||
390 | |||
391 | @view_defaults(accept='text/html') |
||
392 | class AtramhasisAdminView: |
||
393 | """ |
||
394 | This object groups HTML views part of the admin user interface. |
||
395 | """ |
||
396 | |||
397 | def __init__(self, request): |
||
398 | self.request = request |
||
399 | self.logged_in = request.authenticated_userid |
||
400 | if hasattr(request, 'skos_registry') and request.skos_registry is not None: |
||
401 | self.skos_registry = self.request.skos_registry |
||
402 | else: |
||
403 | raise SkosRegistryNotFoundException() |
||
404 | |||
405 | @view_config(route_name='admin', renderer='atramhasis:templates/admin.jinja2', permission='edit') |
||
406 | def admin_view(self): |
||
407 | return {'admin': None} |
||
408 | |||
409 | @view_config(route_name='scheme_tree_invalidate', renderer='json', accept='application/json', permission='edit') |
||
410 | def invalidate_scheme_tree(self): |
||
411 | scheme_id = self.request.matchdict['scheme_id'] |
||
412 | invalidate_scheme_cache(scheme_id) |
||
413 | return Response(status_int=200) |
||
414 | |||
415 | @view_config(route_name='tree_invalidate', renderer='json', accept='application/json', permission='edit') |
||
416 | def invalidate_tree(self): |
||
417 | invalidate_cache() |
||
418 | return Response(status_int=200) |
||
419 |