AtramhasisView.concept_view()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 40
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 32
dl 0
loc 40
rs 8.1786
c 0
b 0
f 0
cc 6
nop 1
1
import os
2
import urllib
3
4
from pyramid.httpexceptions import HTTPFound
5
from pyramid.i18n import TranslationStringFactory
6
from pyramid.response import FileResponse
7
from pyramid.response import Response
8
from pyramid.threadlocal import get_current_registry
9
from pyramid.view import view_config
10
from pyramid.view import view_defaults
11
from skosprovider_sqlalchemy.models import Collection
12
from skosprovider_sqlalchemy.models import Concept
13
from skosprovider_sqlalchemy.models import LabelType
14
from skosprovider_sqlalchemy.models import NoteType
15
from sqlalchemy.exc import NoResultFound
16
17
from atramhasis.audit import audit
18
from atramhasis.cache import invalidate_cache
19
from atramhasis.cache import invalidate_scheme_cache
20
from atramhasis.cache import list_region
21
from atramhasis.cache import tree_region
22
from atramhasis.errors import ConceptNotFoundException
23
from atramhasis.errors import ConceptSchemeNotFoundException
24
from atramhasis.errors import SkosRegistryNotFoundException
25
from atramhasis.utils import update_last_visited_concepts
26
27
28
def labels_to_string(labels, ltype):
29
    labelstring = ''
30
    for label in (label for label in labels if label.labeltype_id == ltype):
31
        labelstring += label.label + ' (' + label.language_id + '), '
32
    return labelstring[:-2]
33
34
35
def get_definition(notes):
36
    for note in notes:
37
        if note.notetype_id == 'definition':
38
            return note.note
39
40
41
def sort_by_labels(concepts, locale, reverse=False):
42
    return sorted((x for x in concepts if x.label(locale)),
43
                  reverse=reverse,
44
                  key=lambda child: child.label(locale).label.lower()
45
                  ) + [x for x in concepts if not x.label(locale)]
46
47
def get_public_conceptschemes(skos_registry):
48
    """
49
    Get all conceptschemes that are visible through the public UI.
50
    """
51
    conceptschemes = [
52
        {'id': x.get_metadata()['id'],
53
         'conceptscheme': x.concept_scheme}
54
        for x in skos_registry.get_providers() if not any([not_shown in x.get_metadata()['subject']
55
                                                           for not_shown in ['external', 'hidden']])
56
        ]
57
58
    return conceptschemes
59
60
61
@view_defaults(accept='text/html', request_method='GET')
62
class AtramhasisView:
63
    """
64
    This object groups HTML views part of the public user interface.
65
    """
66
67
    def __init__(self, request):
68
        self.request = request
69
        self.skos_manager = self.request.data_managers['skos_manager']
70
        self.conceptscheme_manager = self.request.data_managers['conceptscheme_manager']
71
        if hasattr(request, 'skos_registry') and request.skos_registry is not None:
72
            self.skos_registry = self.request.skos_registry
73
        else:
74
            raise SkosRegistryNotFoundException()
75
76
    def _read_request_param(self, param):
77
        value = None
78
        if param in self.request.params:
79
            value = self.request.params.getone(param).strip()
80
            if not value:
81
                value = None  # pragma: no cover
82
        return value
83
84
    @view_config(name='favicon.ico')
85
    def favicon_view(self):
86
        """
87
        Return the favicon when requested from the web root.
88
        """
89
        here = os.path.dirname(__file__)
90
        icon = os.path.join(os.path.dirname(here), 'static', 'img', 'favicon.ico')
91
        response = FileResponse(
92
            icon,
93
            request=self.request,
94
            content_type='image/x-icon'
95
        )
96
        return response
97
98
99
    @view_config(route_name='home', renderer='atramhasis:templates/home.jinja2')
100
    def home_view(self):
101
        """
102
        Display the homepage.
103
        """
104
        return {'conceptschemes': get_public_conceptschemes(self.skos_registry)}
105
106
    @view_config(route_name='skosprovider.conceptschemes', renderer='atramhasis:templates/conceptschemes.jinja2')
107
    def conceptschemes_view(self):
108
        """
109
        Display a list of available conceptschemes.
110
        """
111
        return {'conceptschemes': get_public_conceptschemes(self.skos_registry)}
112
113
    @audit
114
    @view_config(route_name='skosprovider.conceptscheme', renderer='atramhasis:templates/conceptscheme.jinja2')
115
    def conceptscheme_view(self):
116
        """
117
        Display a single conceptscheme.
118
        """
119
        conceptschemes = get_public_conceptschemes(self.skos_registry)
120
121
        scheme_id = self.request.matchdict['scheme_id']
122
123
        provider = self.request.skos_registry.get_provider(scheme_id)
124
        if provider is False:
125
            raise ConceptSchemeNotFoundException(scheme_id)
126
127
        conceptscheme = provider.concept_scheme
128
        if 'atramhasis.force_display_label_language' in provider.metadata:
129
            locale = provider.metadata['atramhasis.force_display_label_language']
130
        else:
131
            locale = self.request.locale_name
132
        title = (conceptscheme.label(locale).label if (conceptscheme.label())
133
                 else scheme_id)
134
135
        scheme = {
136
            'scheme_id': scheme_id,
137
            'title': title,
138
            'uri': conceptscheme.uri,
139
            'labels': conceptscheme.labels,
140
            'notes': conceptscheme.notes,
141
            'sources': conceptscheme.sources,
142
            'top_concepts': provider.get_top_concepts(),
143
        }
144
145
        return {'conceptscheme': scheme, 'conceptschemes': conceptschemes,
146
                'locale': locale}
147
148
    @audit
149
    @view_config(route_name='skosprovider.c', renderer='atramhasis:templates/concept.jinja2')
150
    def concept_view(self):
151
        """
152
        Display all about a single concept or collection.
153
        """
154
        conceptschemes = [
155
            {'id': x.get_metadata()['id'],
156
             'conceptscheme': x.concept_scheme}
157
            for x in self.skos_registry.get_providers() if not any([not_shown in x.get_metadata()['subject']
158
                                                                    for not_shown in ['external', 'hidden']])
159
            ]
160
161
        scheme_id = self.request.matchdict['scheme_id']
162
        c_id = self.request.matchdict['c_id']
163
        provider = self.request.skos_registry.get_provider(scheme_id)
164
        label = self._read_request_param('label')
165
        requested_type = self._read_request_param('type')
166
167
        if not provider:
168
            raise ConceptSchemeNotFoundException(scheme_id)
169
        if 'atramhasis.force_display_label_language' in provider.metadata:
170
            locale = provider.metadata['atramhasis.force_display_label_language']
171
        else:
172
            locale = self.request.locale_name
173
        try:
174
            c = self.skos_manager.get_thing(c_id, provider.conceptscheme_id)
175
            if isinstance(c, Concept):
176
                concept_type = "Concept"
177
            elif isinstance(c, Collection):
178
                concept_type = "Collection"
179
            else:
180
                return Response('Thing without type: ' + str(c_id), status_int=500)
181
            url = self.request.route_url('skosprovider.c', scheme_id=scheme_id, c_id=c_id)
182
            update_last_visited_concepts(self.request, {'label': c.label(locale).label, 'url': url})
183
            return {'concept': c, 'conceptType': concept_type, 'scheme_id': scheme_id,
184
                    'conceptschemes': conceptschemes, 'provider': provider,
185
                    'locale': locale, 'type': requested_type, 'label': label}
186
        except NoResultFound:
187
            raise ConceptNotFoundException(c_id)
188
189
    @view_config(route_name='skosprovider.conceptscheme.cs', renderer='atramhasis:templates/search_result.jinja2')
190
    def search_result(self):
191
        """
192
        Display search results
193
        """
194
        conceptschemes = [
195
            {'id': x.get_metadata()['id'],
196
             'conceptscheme': x.concept_scheme}
197
            for x in self.skos_registry.get_providers() if not any([not_shown in x.get_metadata()['subject']
198
                                                                    for not_shown in ['external', 'hidden']])
199
            ]
200
201
        scheme_id = self.request.matchdict['scheme_id']
202
        label = self._read_request_param('label')
203
        requested_type = self._read_request_param('type')
204
        provider = self.skos_registry.get_provider(scheme_id)
205
        if provider:
206
            if 'atramhasis.force_display_label_language' in provider.metadata:
207
                locale = provider.metadata['atramhasis.force_display_label_language']
208
            else:
209
                locale = self.request.locale_name
210
            if label is not None:
211
                concepts = provider.find(
212
                    {'label': label, 'type': requested_type},
213
                    language=locale,
214
                    sort='label'
215
                )
216
            elif (label is None) and (requested_type is not None):
217
                concepts = provider.find(
218
                    {'type': requested_type}, language=locale, sort='label'
219
                )
220
            else:
221
                concepts = provider.get_all(language=locale, sort='label')
222
            return {
223
                'concepts': concepts,
224
                'scheme_id': scheme_id,
225
                'conceptschemes': conceptschemes,
226
                'type': requested_type,
227
                'label': label
228
            }
229
        return Response(content_type='text/plain', status_int=404)
230
231
    @view_config(route_name='locale')
232
    def set_locale_cookie(self):
233
        """
234
        Set a language cookie to remember what language a user requested.
235
        """
236
        settings = get_current_registry().settings
237
        default_lang = settings.get('pyramid.default_locale_name')
238
        available_languages = settings.get('available_languages', default_lang).split()
239
        [x.lower() for x in available_languages]
240
        language = self.request.GET.get('language', default_lang).lower()
241
        if language not in available_languages:
242
            language = default_lang
243
244
        referer = self.request.referer
245
        if referer is not None:
246
            response = HTTPFound(location=referer)
247
        else:
248
            response = HTTPFound(location=self.request.route_url('home'))
249
250
        response.set_cookie('_LOCALE_',
251
                            value=language,
252
                            max_age=31536000)  # max_age = year
253
        return response
254
255
    @audit
256
    @view_config(route_name='skosprovider.conceptscheme.csv', renderer='csv')
257
    def results_csv(self):
258
        """
259
        Download search results in CSV format, allowing further processing.
260
        """
261
        header = ['conceptscheme', 'id', 'uri', 'type', 'label', 'prefLabels', 'altLabels', 'definition', 'broader',
262
                  'narrower', 'related']
263
        rows = []
264
        scheme_id = self.request.matchdict['scheme_id']
265
        label = self._read_request_param('label')
266
        requested_type = self._read_request_param('type')
267
        provider = self.skos_registry.get_provider(scheme_id)
268
        if provider:
269
            if label is not None:
270
                concepts = self.conceptscheme_manager.find(
271
                    provider.conceptscheme_id, {'label': label, 'type': requested_type}
272
                )
273
            elif (label is None) and (requested_type is not None):
274
                concepts = self.conceptscheme_manager.find(
275
                    provider.conceptscheme_id, {'type': requested_type}
276
                )
277
            else:
278
                concepts = self.conceptscheme_manager.get_all(provider.conceptscheme_id)
279
            for concept in concepts:
280
                if concept.type == 'concept':
281
                    rows.append((
282
                        scheme_id, concept.concept_id, concept.uri, concept.type,
283
                        concept.label(self.request.locale_name).label,
284
                        labels_to_string(concept.labels, 'prefLabel'), labels_to_string(concept.labels, 'altLabel'),
285
                        get_definition(concept.notes), [c.concept_id for c in concept.broader_concepts],
286
                        [c.concept_id for c in concept.narrower_concepts],
287
                        [c.concept_id for c in concept.related_concepts]))
288
                else:
289
                    rows.append((
290
                        scheme_id, concept.concept_id, concept.uri, concept.type,
291
                        concept.label(self.request.locale_name).label,
292
                        labels_to_string(concept.labels, 'prefLabel'), labels_to_string(concept.labels, 'altLabel'),
293
                        get_definition(concept.notes), '', [c.concept_id for c in concept.members], ''))
294
        return {
295
            'header': header,
296
            'rows': rows,
297
            'filename': 'atramhasis_export'
298
        }
299
300
    @view_config(route_name='scheme_tree_html', renderer='scheme_tree.jinja2')
301
    @view_config(route_name='scheme_tree', renderer='json', accept='application/json')
302
    def results_tree_json(self):
303
        scheme_id = self.request.matchdict['scheme_id']
304
        language = self.request.params.get('language') or self.request.locale_name
305
        dicts = self.get_results_tree(scheme_id, language)
306
        if dicts:
307
            if 'text/html' not in self.request.accept:
308
                return dicts
309
            else:
310
                return {'tree': dicts}
311
        else:
312
            return Response(status_int=404)
313
314
    @tree_region.cache_on_arguments()
315
    def get_results_tree(self, scheme_id, locale):
316
        skostree = self.get_scheme(scheme_id, locale)
317
        return [self.parse_thing(thing, None) for thing in skostree]
318
319
    def get_scheme(self, scheme, locale):
320
        scheme_tree = []
321
        provider = self.skos_registry.get_provider(scheme)
322
        if provider:
323
            conceptscheme_id = provider.conceptscheme_id
324
325
            tco = self.conceptscheme_manager.get_concepts_for_scheme_tree(conceptscheme_id)
326
            tcl = self.conceptscheme_manager.get_collections_for_scheme_tree(conceptscheme_id)
327
328
            scheme_tree = sort_by_labels(tco, locale) + sort_by_labels(tcl, locale)
329
330
        return scheme_tree
331
332
    def parse_thing(self, thing, parent_tree_id):
333
        tree_id = self.create_treeid(parent_tree_id, thing.concept_id)
334
        locale = self.request.params.get('language', self.request.locale_name)
335
336
        if thing.type and thing.type == 'collection':
337
            cs = [member for member in thing.members] if hasattr(thing, 'members') else []
338
        else:
339
            cs = [c for c in thing.narrower_concepts]
340
            cs = cs + [c for c in thing.narrower_collections]
341
342
        sortedcs = sort_by_labels(cs, locale)
343
        children = [self.parse_thing(c, tree_id) for index, c in enumerate(sortedcs, 1)]
344
        dict_thing = {
345
            'id': tree_id,
346
            'concept_id': thing.concept_id,
347
            'type': thing.type,
348
            'label': thing.label(locale).label if thing.label(locale) else None,
349
            'children': children
350
        }
351
352
        return dict_thing
353
354
    @staticmethod
355
    def create_treeid(parent_tree_id, concept_id):
356
        if parent_tree_id is None:
357
            return urllib.parse.quote(str(concept_id), safe="")
358
        else:
359
            return parent_tree_id + "|" + urllib.parse.quote(str(concept_id), safe="")
360
361
362
@view_defaults(accept='application/json', renderer='json')
363
class AtramhasisListView:
364
    """
365
    This object groups list views part for the user interface.
366
    """
367
368
    def __init__(self, request):
369
        self.request = request
370
        self.skos_manager = self.request.data_managers['skos_manager']
371
        self.localizer = request.localizer
372
        self._ = TranslationStringFactory('atramhasis')
373
374
    @view_config(route_name='labeltypes')
375
    def labeltype_list_view(self):
376
        return self.get_list(LabelType)
377
378
    @view_config(route_name='notetypes')
379
    def notetype_list_view(self):
380
        return self.get_list(NoteType)
381
382
    @list_region.cache_on_arguments()
383
    def get_list(self, listtype):
384
        return [{"key": ltype.name, "label": self.localizer.translate(self._(ltype.name))}
385
                for ltype in self.skos_manager.get_by_list_type(listtype)]
386
387
388
@view_defaults(accept='text/html')
389
class AtramhasisAdminView:
390
    """
391
    This object groups HTML views part of the admin user interface.
392
    """
393
394
    def __init__(self, request):
395
        self.request = request
396
        self.logged_in = request.authenticated_userid
397
        if hasattr(request, 'skos_registry') and request.skos_registry is not None:
398
            self.skos_registry = self.request.skos_registry
399
        else:
400
            raise SkosRegistryNotFoundException()
401
402
    @view_config(route_name='admin', renderer='atramhasis:templates/admin.jinja2', permission='edit')
403
    def admin_view(self):
404
        return {'admin': None}
405
406
    @view_config(route_name='scheme_tree_invalidate', renderer='json', accept='application/json', permission='edit')
407
    def invalidate_scheme_tree(self):
408
        scheme_id = self.request.matchdict['scheme_id']
409
        invalidate_scheme_cache(scheme_id)
410
        return Response(status_int=200)
411
412
    @view_config(route_name='tree_invalidate', renderer='json', accept='application/json', permission='edit')
413
    def invalidate_tree(self):
414
        invalidate_cache()
415
        return Response(status_int=200)
416