Passed
Push — develop ( 710d68...f19906 )
by Koen
01:14
created

AtramhasisView._read_request_param()   A

Complexity

Conditions 3

Size

Total Lines 7
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 7
rs 10
c 0
b 0
f 0
cc 3
nop 2
1
import os
2
3
from pyramid.httpexceptions import HTTPFound
4
from pyramid.i18n import TranslationStringFactory
5
from pyramid.response import FileResponse
6
from pyramid.response import Response
7
from pyramid.threadlocal import get_current_registry
8
from pyramid.view import view_config
9
from pyramid.view import view_defaults
10
from skosprovider_sqlalchemy.models import Collection
11
from skosprovider_sqlalchemy.models import Concept
12
from skosprovider_sqlalchemy.models import LabelType
13
from skosprovider_sqlalchemy.models import NoteType
14
from sqlalchemy.exc import NoResultFound
15
16
from atramhasis.audit import audit
17
from atramhasis.cache import invalidate_cache
18
from atramhasis.cache import invalidate_scheme_cache
19
from atramhasis.cache import list_region
20
from atramhasis.cache import tree_region
21
from atramhasis.errors import ConceptNotFoundException
22
from atramhasis.errors import ConceptSchemeNotFoundException
23
from atramhasis.errors import SkosRegistryNotFoundException
24
from atramhasis.utils import update_last_visited_concepts
25
26
27
def labels_to_string(labels, ltype):
28
    labelstring = ''
29
    for label in (label for label in labels if label.labeltype_id == ltype):
30
        labelstring += label.label + ' (' + label.language_id + '), '
31
    return labelstring[:-2]
32
33
34
def get_definition(notes):
35
    for note in notes:
36
        if note.notetype_id == 'definition':
37
            return note.note
38
39
40
def sort_by_labels(concepts, locale, reverse=False):
41
    return sorted((x for x in concepts if x.label(locale)),
42
                  reverse=reverse,
43
                  key=lambda child: child.label(locale).label.lower()
44
                  ) + [x for x in concepts if not x.label(locale)]
45
46
def get_public_conceptschemes(skos_registry):
47
    """
48
    Get all conceptschemes that are visible through the public UI.
49
    """
50
    conceptschemes = [
51
        {'id': x.get_metadata()['id'],
52
         'conceptscheme': x.concept_scheme}
53
        for x in skos_registry.get_providers() if not any([not_shown in x.get_metadata()['subject']
54
                                                           for not_shown in ['external', 'hidden']])
55
        ]
56
57
    return conceptschemes
58
59
@view_defaults(accept='text/html')
60
class AtramhasisView:
61
    """
62
    This object groups HTML views part of the public user interface.
63
    """
64
65
    def __init__(self, request):
66
        self.request = request
67
        self.skos_manager = self.request.data_managers['skos_manager']
68
        self.conceptscheme_manager = self.request.data_managers['conceptscheme_manager']
69
        if hasattr(request, 'skos_registry') and request.skos_registry is not None:
70
            self.skos_registry = self.request.skos_registry
71
        else:
72
            raise SkosRegistryNotFoundException()
73
74
    def _read_request_param(self, param):
75
        value = None
76
        if param in self.request.params:
77
            value = self.request.params.getone(param).strip()
78
            if not value:
79
                value = None  # pragma: no cover
80
        return value
81
82
    @view_config(name='favicon.ico')
83
    def favicon_view(self):
84
        """
85
        Return the favicon when requested from the web root.
86
        """
87
        here = os.path.dirname(__file__)
88
        icon = os.path.join(os.path.dirname(here), 'static', 'img', 'favicon.ico')
89
        response = FileResponse(
90
            icon,
91
            request=self.request,
92
            content_type='image/x-icon'
93
        )
94
        return response
95
96
97
    @view_config(route_name='home', renderer='atramhasis:templates/home.jinja2')
98
    def home_view(self):
99
        """
100
        Display the homepage.
101
        """
102
        return {'conceptschemes': get_public_conceptschemes(self.skos_registry)}
103
104
    @view_config(route_name='conceptschemes', renderer='atramhasis:templates/conceptschemes.jinja2')
105
    def conceptschemes_view(self):
106
        """
107
        Display a list of available conceptschemes.
108
        """
109
        return {'conceptschemes': get_public_conceptschemes(self.skos_registry)}
110
111
    @audit
112
    @view_config(route_name='conceptscheme', renderer='atramhasis:templates/conceptscheme.jinja2')
113
    def conceptscheme_view(self):
114
        """
115
        Display a single conceptscheme.
116
        """
117
        conceptschemes = get_public_conceptschemes(self.skos_registry)
118
119
        scheme_id = self.request.matchdict['scheme_id']
120
        provider = self.request.skos_registry.get_provider(scheme_id)
121
        conceptscheme = provider.concept_scheme
122
        if 'atramhasis.force_display_label_language' in provider.metadata:
123
            locale = provider.metadata['atramhasis.force_display_label_language']
124
        else:
125
            locale = self.request.locale_name
126
        title = (conceptscheme.label(locale).label if (conceptscheme.label())
127
                 else scheme_id)
128
129
        scheme = {
130
            'scheme_id': scheme_id,
131
            'title': title,
132
            'uri': conceptscheme.uri,
133
            'labels': conceptscheme.labels,
134
            'notes': conceptscheme.notes,
135
            'sources': conceptscheme.sources,
136
            'top_concepts': provider.get_top_concepts(),
137
        }
138
139
        return {'conceptscheme': scheme, 'conceptschemes': conceptschemes,
140
                'locale': locale}
141
142
    @audit
143
    @view_config(route_name='concept', renderer='atramhasis:templates/concept.jinja2')
144
    def concept_view(self):
145
        """
146
        Display all about a single concept or collection.
147
        """
148
        conceptschemes = [
149
            {'id': x.get_metadata()['id'],
150
             'conceptscheme': x.concept_scheme}
151
            for x in self.skos_registry.get_providers() if not any([not_shown in x.get_metadata()['subject']
152
                                                                    for not_shown in ['external', 'hidden']])
153
            ]
154
155
        scheme_id = self.request.matchdict['scheme_id']
156
        c_id = self.request.matchdict['c_id']
157
        provider = self.request.skos_registry.get_provider(scheme_id)
158
        label = self._read_request_param('label')
159
        requested_type = self._read_request_param('type')
160
161
        if not provider:
162
            raise ConceptSchemeNotFoundException(scheme_id)
163
        if 'atramhasis.force_display_label_language' in provider.metadata:
164
            locale = provider.metadata['atramhasis.force_display_label_language']
165
        else:
166
            locale = self.request.locale_name
167
        try:
168
            c = self.skos_manager.get_thing(c_id, provider.conceptscheme_id)
169
            if isinstance(c, Concept):
170
                concept_type = "Concept"
171
            elif isinstance(c, Collection):
172
                concept_type = "Collection"
173
            else:
174
                return Response('Thing without type: ' + str(c_id), status_int=500)
175
            url = self.request.route_url('concept', scheme_id=scheme_id, c_id=c_id)
176
            update_last_visited_concepts(self.request, {'label': c.label(locale).label, 'url': url})
177
            return {'concept': c, 'conceptType': concept_type, 'scheme_id': scheme_id,
178
                    'conceptschemes': conceptschemes, 'provider': provider,
179
                    'locale': locale, 'type': requested_type, 'label': label}
180
        except NoResultFound:
181
            raise ConceptNotFoundException(c_id)
182
183
    @view_config(route_name='search_result', renderer='atramhasis:templates/search_result.jinja2')
184
    def search_result(self):
185
        """
186
        Display search results
187
        """
188
        conceptschemes = [
189
            {'id': x.get_metadata()['id'],
190
             'conceptscheme': x.concept_scheme}
191
            for x in self.skos_registry.get_providers() if not any([not_shown in x.get_metadata()['subject']
192
                                                                    for not_shown in ['external', 'hidden']])
193
            ]
194
195
        scheme_id = self.request.matchdict['scheme_id']
196
        label = self._read_request_param('label')
197
        requested_type = self._read_request_param('type')
198
        provider = self.skos_registry.get_provider(scheme_id)
199
        if provider:
200
            if 'atramhasis.force_display_label_language' in provider.metadata:
201
                locale = provider.metadata['atramhasis.force_display_label_language']
202
            else:
203
                locale = self.request.locale_name
204
            if label is not None:
205
                concepts = provider.find(
206
                    {'label': label, 'type': requested_type},
207
                    language=locale,
208
                    sort='label'
209
                )
210
            elif (label is None) and (requested_type is not None):
211
                concepts = provider.find(
212
                    {'type': requested_type}, language=locale, sort='label'
213
                )
214
            else:
215
                concepts = provider.get_all(language=locale, sort='label')
216
            return {
217
                'concepts': concepts,
218
                'scheme_id': scheme_id,
219
                'conceptschemes': conceptschemes,
220
                'type': requested_type,
221
                'label': label
222
            }
223
        return Response(content_type='text/plain', status_int=404)
224
225
    @view_config(route_name='locale')
226
    def set_locale_cookie(self):
227
        """
228
        Set a language cookie to remember what language a user requested.
229
        """
230
        settings = get_current_registry().settings
231
        default_lang = settings.get('pyramid.default_locale_name')
232
        available_languages = settings.get('available_languages', default_lang).split()
233
        [x.lower() for x in available_languages]
234
        language = self.request.GET.get('language', default_lang).lower()
235
        if language not in available_languages:
236
            language = default_lang
237
238
        referer = self.request.referer
239
        if referer is not None:
240
            response = HTTPFound(location=referer)
241
        else:
242
            response = HTTPFound(location=self.request.route_url('home'))
243
244
        response.set_cookie('_LOCALE_',
245
                            value=language,
246
                            max_age=31536000)  # max_age = year
247
        return response
248
249
    @audit
250
    @view_config(route_name='search_result_export', renderer='csv')
251
    def results_csv(self):
252
        """
253
        Download search results in CSV format, allowing further processing.
254
        """
255
        header = ['conceptscheme', 'id', 'uri', 'type', 'label', 'prefLabels', 'altLabels', 'definition', 'broader',
256
                  'narrower', 'related']
257
        rows = []
258
        scheme_id = self.request.matchdict['scheme_id']
259
        label = self._read_request_param('label')
260
        requested_type = self._read_request_param('type')
261
        provider = self.skos_registry.get_provider(scheme_id)
262
        if provider:
263
            if label is not None:
264
                concepts = self.conceptscheme_manager.find(
265
                    provider.conceptscheme_id, {'label': label, 'type': requested_type}
266
                )
267
            elif (label is None) and (requested_type is not None):
268
                concepts = self.conceptscheme_manager.find(
269
                    provider.conceptscheme_id, {'type': requested_type}
270
                )
271
            else:
272
                concepts = self.conceptscheme_manager.get_all(provider.conceptscheme_id)
273
            for concept in concepts:
274
                if concept.type == 'concept':
275
                    rows.append((
276
                        scheme_id, concept.concept_id, concept.uri, concept.type,
277
                        concept.label(self.request.locale_name).label,
278
                        labels_to_string(concept.labels, 'prefLabel'), labels_to_string(concept.labels, 'altLabel'),
279
                        get_definition(concept.notes), [c.concept_id for c in concept.broader_concepts],
280
                        [c.concept_id for c in concept.narrower_concepts],
281
                        [c.concept_id for c in concept.related_concepts]))
282
                else:
283
                    rows.append((
284
                        scheme_id, concept.concept_id, concept.uri, concept.type,
285
                        concept.label(self.request.locale_name).label,
286
                        labels_to_string(concept.labels, 'prefLabel'), labels_to_string(concept.labels, 'altLabel'),
287
                        get_definition(concept.notes), '', [c.concept_id for c in concept.members], ''))
288
        return {
289
            'header': header,
290
            'rows': rows,
291
            'filename': 'atramhasis_export'
292
        }
293
294
    @view_config(route_name='scheme_tree_html', renderer='scheme_tree.jinja2')
295
    @view_config(route_name='scheme_tree', renderer='json', accept='application/json')
296
    def results_tree_json(self):
297
        scheme_id = self.request.matchdict['scheme_id']
298
        language = self.request.params.get('language') or self.request.locale_name
299
        dicts = self.get_results_tree(scheme_id, language)
300
        if dicts:
301
            if 'text/html' not in self.request.accept:
302
                return dicts
303
            else:
304
                return {'tree': dicts}
305
        else:
306
            return Response(status_int=404)
307
308
    @tree_region.cache_on_arguments()
309
    def get_results_tree(self, scheme_id, locale):
310
        skostree = self.get_scheme(scheme_id, locale)
311
        return [self.parse_thing(thing, None) for thing in skostree]
312
313
    def get_scheme(self, scheme, locale):
314
        scheme_tree = []
315
        provider = self.skos_registry.get_provider(scheme)
316
        if provider:
317
            conceptscheme_id = provider.conceptscheme_id
318
319
            tco = self.conceptscheme_manager.get_concepts_for_scheme_tree(conceptscheme_id)
320
            tcl = self.conceptscheme_manager.get_collections_for_scheme_tree(conceptscheme_id)
321
322
            scheme_tree = sort_by_labels(tco, locale) + sort_by_labels(tcl, locale)
323
324
        return scheme_tree
325
326
    def parse_thing(self, thing, parent_tree_id):
327
        tree_id = self.create_treeid(parent_tree_id, thing.concept_id)
328
        locale = self.request.params.get('language', self.request.locale_name)
329
330
        if thing.type and thing.type == 'collection':
331
            cs = [member for member in thing.members] if hasattr(thing, 'members') else []
332
        else:
333
            cs = [c for c in thing.narrower_concepts]
334
            cs = cs + [c for c in thing.narrower_collections]
335
336
        sortedcs = sort_by_labels(cs, locale)
337
        children = [self.parse_thing(c, tree_id) for index, c in enumerate(sortedcs, 1)]
338
        dict_thing = {
339
            'id': tree_id,
340
            'concept_id': thing.concept_id,
341
            'type': thing.type,
342
            'label': thing.label(locale).label if thing.label(locale) else None,
343
            'children': children
344
        }
345
346
        return dict_thing
347
348
    @staticmethod
349
    def create_treeid(parent_tree_id, concept_id):
350
        if parent_tree_id is None:
351
            return str(concept_id)
352
        else:
353
            return parent_tree_id + "." + str(concept_id)
354
355
    @view_config(route_name='scheme_root', renderer='atramhasis:templates/concept.jinja2')
356
    def results_tree_html(self):
357
        scheme_id = self.request.matchdict['scheme_id']
358
        return {'concept': None, 'conceptType': None, 'scheme_id': scheme_id}
359
360
361
@view_defaults(accept='application/json', renderer='json')
362
class AtramhasisListView:
363
    """
364
    This object groups list views part for the user interface.
365
    """
366
367
    def __init__(self, request):
368
        self.request = request
369
        self.skos_manager = self.request.data_managers['skos_manager']
370
        self.localizer = request.localizer
371
        self._ = TranslationStringFactory('atramhasis')
372
373
    @view_config(route_name='labeltypes')
374
    def labeltype_list_view(self):
375
        return self.get_list(LabelType)
376
377
    @view_config(route_name='notetypes')
378
    def notetype_list_view(self):
379
        return self.get_list(NoteType)
380
381
    @list_region.cache_on_arguments()
382
    def get_list(self, listtype):
383
        return [{"key": ltype.name, "label": self.localizer.translate(self._(ltype.name))}
384
                for ltype in self.skos_manager.get_by_list_type(listtype)]
385
386
387
@view_defaults(accept='text/html')
388
class AtramhasisAdminView:
389
    """
390
    This object groups HTML views part of the admin user interface.
391
    """
392
393
    def __init__(self, request):
394
        self.request = request
395
        self.logged_in = request.authenticated_userid
396
        if hasattr(request, 'skos_registry') and request.skos_registry is not None:
397
            self.skos_registry = self.request.skos_registry
398
        else:
399
            raise SkosRegistryNotFoundException()
400
401
    @view_config(route_name='admin', renderer='atramhasis:templates/admin.jinja2', permission='edit')
402
    def admin_view(self):
403
        return {'admin': None}
404
405
    @view_config(route_name='scheme_tree_invalidate', renderer='json', accept='application/json', permission='edit')
406
    def invalidate_scheme_tree(self):
407
        scheme_id = self.request.matchdict['scheme_id']
408
        invalidate_scheme_cache(scheme_id)
409
        return Response(status_int=200)
410
411
    @view_config(route_name='tree_invalidate', renderer='json', accept='application/json', permission='edit')
412
    def invalidate_tree(self):
413
        invalidate_cache()
414
        return Response(status_int=200)
415