AtramhasisView.concept_view()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 40
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 32
dl 0
loc 40
rs 8.1786
c 0
b 0
f 0
cc 6
nop 1
1
import os
2
import urllib
3
4
from pyramid.httpexceptions import HTTPFound
5
from pyramid.i18n import TranslationStringFactory
6
from pyramid.response import FileResponse
7
from pyramid.response import Response
8
from pyramid.threadlocal import get_current_registry
9
from pyramid.view import view_config
10
from pyramid.view import view_defaults
11
from skosprovider_sqlalchemy.models import Collection
12
from skosprovider_sqlalchemy.models import Concept
13
from skosprovider_sqlalchemy.models import LabelType
14
from skosprovider_sqlalchemy.models import NoteType
15
from sqlalchemy.exc import NoResultFound
16
17
from atramhasis.audit import audit
18
from atramhasis.cache import invalidate_cache
19
from atramhasis.cache import invalidate_scheme_cache
20
from atramhasis.cache import list_region
21
from atramhasis.cache import tree_region
22
from atramhasis.errors import ConceptNotFoundException
23
from atramhasis.errors import ConceptSchemeNotFoundException
24
from atramhasis.errors import SkosRegistryNotFoundException
25
from atramhasis.utils import update_last_visited_concepts
26
27
28
def labels_to_string(labels, ltype):
29
    labelstring = ''
30
    for label in (label for label in labels if label.labeltype_id == ltype):
31
        labelstring += label.label + ' (' + label.language_id + '), '
32
    return labelstring[:-2]
33
34
35
def get_definition(notes):
36
    for note in notes:
37
        if note.notetype_id == 'definition':
38
            return note.note
39
40
41
def sort_by_labels(concepts, locale, reverse=False):
42
    return sorted((x for x in concepts if x.label(locale)),
43
                  reverse=reverse,
44
                  key=lambda child: child.label(locale).label.lower()
45
                  ) + [x for x in concepts if not x.label(locale)]
46
47
def get_public_conceptschemes(skos_registry):
48
    """
49
    Get all conceptschemes that are visible through the public UI.
50
    """
51
    conceptschemes = [
52
        {'id': x.get_metadata()['id'],
53
         'conceptscheme': x.concept_scheme}
54
        for x in skos_registry.get_providers() if not any([not_shown in x.get_metadata()['subject']
55
                                                           for not_shown in ['external', 'hidden']])
56
        ]
57
58
    return conceptschemes
59
60
61
@view_defaults(accept='text/html', request_method='GET')
62
class AtramhasisView:
63
    """
64
    This object groups HTML views part of the public user interface.
65
    """
66
67
    def __init__(self, request):
68
        self.request = request
69
        self.skos_manager = self.request.data_managers['skos_manager']
70
        self.conceptscheme_manager = self.request.data_managers['conceptscheme_manager']
71
        if hasattr(request, 'skos_registry') and request.skos_registry is not None:
72
            self.skos_registry = self.request.skos_registry
73
        else:
74
            raise SkosRegistryNotFoundException()
75
76
    def _read_request_param(self, param):
77
        value = None
78
        if param in self.request.params:
79
            value = self.request.params.getone(param).strip()
80
            if not value:
81
                value = None  # pragma: no cover
82
        return value
83
84
    @view_config(name='favicon.ico')
85
    def favicon_view(self):
86
        """
87
        Return the favicon when requested from the web root.
88
        """
89
        here = os.path.dirname(__file__)
90
        icon = os.path.join(os.path.dirname(here), 'static', 'img', 'favicon.ico')
91
        response = FileResponse(
92
            icon,
93
            request=self.request,
94
            content_type='image/x-icon'
95
        )
96
        return response
97
98
99
    @view_config(route_name='home', renderer='atramhasis:templates/home.jinja2')
100
    def home_view(self):
101
        """
102
        Display the homepage.
103
        """
104
        return {'conceptschemes': get_public_conceptschemes(self.skos_registry)}
105
106
    @view_config(route_name='skosprovider.conceptschemes', renderer='atramhasis:templates/conceptschemes.jinja2')
107
    def conceptschemes_view(self):
108
        """
109
        Display a list of available conceptschemes.
110
        """
111
        return {'conceptschemes': get_public_conceptschemes(self.skos_registry)}
112
113
    @audit
114
    @view_config(route_name='skosprovider.conceptscheme', renderer='atramhasis:templates/conceptscheme.jinja2')
115
    def conceptscheme_view(self):
116
        """
117
        Display a single conceptscheme.
118
        """
119
        conceptschemes = get_public_conceptschemes(self.skos_registry)
120
121
        scheme_id = self.request.matchdict['scheme_id']
122
        provider = self.request.skos_registry.get_provider(scheme_id)
123
        conceptscheme = provider.concept_scheme
124
        if 'atramhasis.force_display_label_language' in provider.metadata:
125
            locale = provider.metadata['atramhasis.force_display_label_language']
126
        else:
127
            locale = self.request.locale_name
128
        title = (conceptscheme.label(locale).label if (conceptscheme.label())
129
                 else scheme_id)
130
131
        scheme = {
132
            'scheme_id': scheme_id,
133
            'title': title,
134
            'uri': conceptscheme.uri,
135
            'labels': conceptscheme.labels,
136
            'notes': conceptscheme.notes,
137
            'sources': conceptscheme.sources,
138
            'top_concepts': provider.get_top_concepts(),
139
        }
140
141
        return {'conceptscheme': scheme, 'conceptschemes': conceptschemes,
142
                'locale': locale}
143
144
    @audit
145
    @view_config(route_name='skosprovider.c', renderer='atramhasis:templates/concept.jinja2')
146
    def concept_view(self):
147
        """
148
        Display all about a single concept or collection.
149
        """
150
        conceptschemes = [
151
            {'id': x.get_metadata()['id'],
152
             'conceptscheme': x.concept_scheme}
153
            for x in self.skos_registry.get_providers() if not any([not_shown in x.get_metadata()['subject']
154
                                                                    for not_shown in ['external', 'hidden']])
155
            ]
156
157
        scheme_id = self.request.matchdict['scheme_id']
158
        c_id = self.request.matchdict['c_id']
159
        provider = self.request.skos_registry.get_provider(scheme_id)
160
        label = self._read_request_param('label')
161
        requested_type = self._read_request_param('type')
162
163
        if not provider:
164
            raise ConceptSchemeNotFoundException(scheme_id)
165
        if 'atramhasis.force_display_label_language' in provider.metadata:
166
            locale = provider.metadata['atramhasis.force_display_label_language']
167
        else:
168
            locale = self.request.locale_name
169
        try:
170
            c = self.skos_manager.get_thing(c_id, provider.conceptscheme_id)
171
            if isinstance(c, Concept):
172
                concept_type = "Concept"
173
            elif isinstance(c, Collection):
174
                concept_type = "Collection"
175
            else:
176
                return Response('Thing without type: ' + str(c_id), status_int=500)
177
            url = self.request.route_url('skosprovider.c', scheme_id=scheme_id, c_id=c_id)
178
            update_last_visited_concepts(self.request, {'label': c.label(locale).label, 'url': url})
179
            return {'concept': c, 'conceptType': concept_type, 'scheme_id': scheme_id,
180
                    'conceptschemes': conceptschemes, 'provider': provider,
181
                    'locale': locale, 'type': requested_type, 'label': label}
182
        except NoResultFound:
183
            raise ConceptNotFoundException(c_id)
184
185
    @view_config(route_name='skosprovider.conceptscheme.cs', renderer='atramhasis:templates/search_result.jinja2')
186
    def search_result(self):
187
        """
188
        Display search results
189
        """
190
        conceptschemes = [
191
            {'id': x.get_metadata()['id'],
192
             'conceptscheme': x.concept_scheme}
193
            for x in self.skos_registry.get_providers() if not any([not_shown in x.get_metadata()['subject']
194
                                                                    for not_shown in ['external', 'hidden']])
195
            ]
196
197
        scheme_id = self.request.matchdict['scheme_id']
198
        label = self._read_request_param('label')
199
        requested_type = self._read_request_param('type')
200
        provider = self.skos_registry.get_provider(scheme_id)
201
        if provider:
202
            if 'atramhasis.force_display_label_language' in provider.metadata:
203
                locale = provider.metadata['atramhasis.force_display_label_language']
204
            else:
205
                locale = self.request.locale_name
206
            if label is not None:
207
                concepts = provider.find(
208
                    {'label': label, 'type': requested_type},
209
                    language=locale,
210
                    sort='label'
211
                )
212
            elif (label is None) and (requested_type is not None):
213
                concepts = provider.find(
214
                    {'type': requested_type}, language=locale, sort='label'
215
                )
216
            else:
217
                concepts = provider.get_all(language=locale, sort='label')
218
            return {
219
                'concepts': concepts,
220
                'scheme_id': scheme_id,
221
                'conceptschemes': conceptschemes,
222
                'type': requested_type,
223
                'label': label
224
            }
225
        return Response(content_type='text/plain', status_int=404)
226
227
    @view_config(route_name='locale')
228
    def set_locale_cookie(self):
229
        """
230
        Set a language cookie to remember what language a user requested.
231
        """
232
        settings = get_current_registry().settings
233
        default_lang = settings.get('pyramid.default_locale_name')
234
        available_languages = settings.get('available_languages', default_lang).split()
235
        [x.lower() for x in available_languages]
236
        language = self.request.GET.get('language', default_lang).lower()
237
        if language not in available_languages:
238
            language = default_lang
239
240
        referer = self.request.referer
241
        if referer is not None:
242
            response = HTTPFound(location=referer)
243
        else:
244
            response = HTTPFound(location=self.request.route_url('home'))
245
246
        response.set_cookie('_LOCALE_',
247
                            value=language,
248
                            max_age=31536000)  # max_age = year
249
        return response
250
251
    @audit
252
    @view_config(route_name='skosprovider.conceptscheme.csv', renderer='csv')
253
    def results_csv(self):
254
        """
255
        Download search results in CSV format, allowing further processing.
256
        """
257
        header = ['conceptscheme', 'id', 'uri', 'type', 'label', 'prefLabels', 'altLabels', 'definition', 'broader',
258
                  'narrower', 'related']
259
        rows = []
260
        scheme_id = self.request.matchdict['scheme_id']
261
        label = self._read_request_param('label')
262
        requested_type = self._read_request_param('type')
263
        provider = self.skos_registry.get_provider(scheme_id)
264
        if provider:
265
            if label is not None:
266
                concepts = self.conceptscheme_manager.find(
267
                    provider.conceptscheme_id, {'label': label, 'type': requested_type}
268
                )
269
            elif (label is None) and (requested_type is not None):
270
                concepts = self.conceptscheme_manager.find(
271
                    provider.conceptscheme_id, {'type': requested_type}
272
                )
273
            else:
274
                concepts = self.conceptscheme_manager.get_all(provider.conceptscheme_id)
275
            for concept in concepts:
276
                if concept.type == 'concept':
277
                    rows.append((
278
                        scheme_id, concept.concept_id, concept.uri, concept.type,
279
                        concept.label(self.request.locale_name).label,
280
                        labels_to_string(concept.labels, 'prefLabel'), labels_to_string(concept.labels, 'altLabel'),
281
                        get_definition(concept.notes), [c.concept_id for c in concept.broader_concepts],
282
                        [c.concept_id for c in concept.narrower_concepts],
283
                        [c.concept_id for c in concept.related_concepts]))
284
                else:
285
                    rows.append((
286
                        scheme_id, concept.concept_id, concept.uri, concept.type,
287
                        concept.label(self.request.locale_name).label,
288
                        labels_to_string(concept.labels, 'prefLabel'), labels_to_string(concept.labels, 'altLabel'),
289
                        get_definition(concept.notes), '', [c.concept_id for c in concept.members], ''))
290
        return {
291
            'header': header,
292
            'rows': rows,
293
            'filename': 'atramhasis_export'
294
        }
295
296
    @view_config(route_name='scheme_tree_html', renderer='scheme_tree.jinja2')
297
    @view_config(route_name='scheme_tree', renderer='json', accept='application/json')
298
    def results_tree_json(self):
299
        scheme_id = self.request.matchdict['scheme_id']
300
        language = self.request.params.get('language') or self.request.locale_name
301
        dicts = self.get_results_tree(scheme_id, language)
302
        if dicts:
303
            if 'text/html' not in self.request.accept:
304
                return dicts
305
            else:
306
                return {'tree': dicts}
307
        else:
308
            return Response(status_int=404)
309
310
    @tree_region.cache_on_arguments()
311
    def get_results_tree(self, scheme_id, locale):
312
        skostree = self.get_scheme(scheme_id, locale)
313
        return [self.parse_thing(thing, None) for thing in skostree]
314
315
    def get_scheme(self, scheme, locale):
316
        scheme_tree = []
317
        provider = self.skos_registry.get_provider(scheme)
318
        if provider:
319
            conceptscheme_id = provider.conceptscheme_id
320
321
            tco = self.conceptscheme_manager.get_concepts_for_scheme_tree(conceptscheme_id)
322
            tcl = self.conceptscheme_manager.get_collections_for_scheme_tree(conceptscheme_id)
323
324
            scheme_tree = sort_by_labels(tco, locale) + sort_by_labels(tcl, locale)
325
326
        return scheme_tree
327
328
    def parse_thing(self, thing, parent_tree_id):
329
        tree_id = self.create_treeid(parent_tree_id, thing.concept_id)
330
        locale = self.request.params.get('language', self.request.locale_name)
331
332
        if thing.type and thing.type == 'collection':
333
            cs = [member for member in thing.members] if hasattr(thing, 'members') else []
334
        else:
335
            cs = [c for c in thing.narrower_concepts]
336
            cs = cs + [c for c in thing.narrower_collections]
337
338
        sortedcs = sort_by_labels(cs, locale)
339
        children = [self.parse_thing(c, tree_id) for index, c in enumerate(sortedcs, 1)]
340
        dict_thing = {
341
            'id': tree_id,
342
            'concept_id': thing.concept_id,
343
            'type': thing.type,
344
            'label': thing.label(locale).label if thing.label(locale) else None,
345
            'children': children
346
        }
347
348
        return dict_thing
349
350
    @staticmethod
351
    def create_treeid(parent_tree_id, concept_id):
352
        if parent_tree_id is None:
353
            return urllib.parse.quote(str(concept_id), safe="")
354
        else:
355
            return parent_tree_id + "|" + urllib.parse.quote(str(concept_id), safe="")
356
357
358
@view_defaults(accept='application/json', renderer='json')
359
class AtramhasisListView:
360
    """
361
    This object groups list views part for the user interface.
362
    """
363
364
    def __init__(self, request):
365
        self.request = request
366
        self.skos_manager = self.request.data_managers['skos_manager']
367
        self.localizer = request.localizer
368
        self._ = TranslationStringFactory('atramhasis')
369
370
    @view_config(route_name='labeltypes')
371
    def labeltype_list_view(self):
372
        return self.get_list(LabelType)
373
374
    @view_config(route_name='notetypes')
375
    def notetype_list_view(self):
376
        return self.get_list(NoteType)
377
378
    @list_region.cache_on_arguments()
379
    def get_list(self, listtype):
380
        return [{"key": ltype.name, "label": self.localizer.translate(self._(ltype.name))}
381
                for ltype in self.skos_manager.get_by_list_type(listtype)]
382
383
384
@view_defaults(accept='text/html')
385
class AtramhasisAdminView:
386
    """
387
    This object groups HTML views part of the admin user interface.
388
    """
389
390
    def __init__(self, request):
391
        self.request = request
392
        self.logged_in = request.authenticated_userid
393
        if hasattr(request, 'skos_registry') and request.skos_registry is not None:
394
            self.skos_registry = self.request.skos_registry
395
        else:
396
            raise SkosRegistryNotFoundException()
397
398
    @view_config(route_name='admin', renderer='atramhasis:templates/admin.jinja2', permission='edit')
399
    def admin_view(self):
400
        return {'admin': None}
401
402
    @view_config(route_name='scheme_tree_invalidate', renderer='json', accept='application/json', permission='edit')
403
    def invalidate_scheme_tree(self):
404
        scheme_id = self.request.matchdict['scheme_id']
405
        invalidate_scheme_cache(scheme_id)
406
        return Response(status_int=200)
407
408
    @view_config(route_name='tree_invalidate', renderer='json', accept='application/json', permission='edit')
409
    def invalidate_tree(self):
410
        invalidate_cache()
411
        return Response(status_int=200)
412