Passed
Push — develop ( b975ec...41ab58 )
by Koen
01:00
created

pyramid_skosprovider.views   C

Complexity

Total Complexity 55

Size/Duplication

Total Lines 387
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 301
dl 0
loc 387
rs 6
c 0
b 0
f 0
wmc 55

How to fix   Complexity   

Complexity

Complex classes like pyramid_skosprovider.views often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf8 -*-
2
'''
3
This module contains the pyramid views that expose services.
4
'''
5
6
import itertools
7
8
from pyramid.view import view_config, view_defaults
9
10
from pyramid.httpexceptions import (
11
    HTTPNotFound,
12
    HTTPBadRequest
13
)
14
15
from skosprovider.exceptions import ProviderUnavailableException
16
17
from pyramid_skosprovider.utils import (
18
    parse_range_header,
19
    QueryBuilder
20
)
21
22
from skosprovider.jsonld import (
23
    MINI_CONTEXT,
24
    CONTEXT
25
)
26
27
import logging
28
log = logging.getLogger(__name__)
29
30
31
class RestView(object):
32
33
    def __init__(self, request):
34
        self.request = request
35
        self.skos_registry = self.request.skos_registry
36
37
38
@view_defaults(renderer='json')
39
class StaticView(RestView):
40
41
    @view_config(
42
        route_name='skosprovider.context',
43
        request_method='GET',
44
        accept='application/json',
45
        http_cache=(3600, {'public': True})
46
    )
47
    @view_config(
48
        route_name='skosprovider.context',
49
        request_method='GET',
50
        accept='application/ld+json',
51
        http_cache=(3600, {'public': True})
52
    )
53
    def get_context(self):
54
        if 'application/ld+json' in self.request.accept:
55
            self.request.response.content_type = 'application/ld+json'
56
        return {
57
            '@context': CONTEXT
58
        }
59
60
61
class ProviderView(RestView):
62
    '''
63
    A set of views that expose information from a certain provider.
64
    '''
65
66
    @view_config(
67
        route_name='skosprovider.uri',
68
        request_method='GET',
69
        accept='application/json+ld',
70
        renderer='skosjson'
71
    )
72
    @view_config(
73
        route_name='skosprovider.uri',
74
        request_method='GET',
75
        accept='application/json',
76
        renderer='skosjson'
77
    )
78
    @view_config(
79
        route_name='skosprovider.uri.deprecated',
80
        request_method='GET',
81
        accept='application/json',
82
        renderer='skosjson'
83
    )
84
    def get_uri(self):
85
        uri = self.request.params.get('uri', self.request.matchdict.get('uri', None))
86
        if not uri:
87
            return HTTPBadRequest()
88
        if 'application/ld+json' in self.request.accept:
89
            self.request.response.content_type = 'application/ld+json'
90
        provider = self.skos_registry.get_provider(uri)
91
        uri_context = MINI_CONTEXT
92
        uri_context['concept_scheme'] = {
93
            '@id': 'skos:inScheme',
94
            '@type': '@id'
95
        }
96
        if provider:
97
            uri_context['concept_scheme'] = 'skos:ConceptScheme'
98
            return {
99
                '@context': uri_context,
100
                'type': 'concept_scheme',
101
                'uri': provider.concept_scheme.uri,
102
                'id': provider.get_vocabulary_id()
103
            }
104
        c = self.skos_registry.get_by_uri(uri)
105
        if not c:
106
            return HTTPNotFound()
107
        return {
108
            '@context': uri_context,
109
            'type': c.type,
110
            'uri': c.uri,
111
            'id': c.id,
112
            'concept_scheme': {
113
                'type': 'skos:ConceptScheme',
114
                'uri': c.concept_scheme.uri,
115
                'id': self.skos_registry.get_provider(c.concept_scheme.uri).get_vocabulary_id()
116
            }
117
        }
118
119
    @view_config(
120
        route_name='skosprovider.conceptschemes',
121
        request_method='GET',
122
        accept='application/json',
123
        renderer='skosjson'
124
    )
125
    @view_config(
126
        route_name='skosprovider.conceptschemes',
127
        request_method='GET',
128
        accept='application/ld+json'
129
    )
130
    def get_conceptschemes(self):
131
        language = self.request.params.get('language', self.request.locale_name)
132
        if 'application/ld+json' in self.request.accept:
133
            self.request.response.content_type = 'application/ld+json'
134
        context = MINI_CONTEXT
135
        context['subject'] = {
136
            '@id': 'dct:subject',
137
            '@type': '@id'
138
        }
139
        conceptschemes = []
140
        for p in self.skos_registry.get_providers():
141
            try:
142
                if label := p.concept_scheme.label(language):
143
                    cslabel = label.label
144
                else:
145
                    cslabel = p.get_vocabulary_uri()
146
            except ProviderUnavailableException as e:
147
                log.error(f'Could not fetch label for {p.get_vocabulary_uri()}: %s', e)
148
                cslabel = p.get_vocabulary_uri()
149
            cs = {
150
                '@context': context,
151
                'type': 'skos:ConceptScheme',
152
                'id': p.get_vocabulary_id(),
153
                'uri': p.get_vocabulary_uri(),
154
                'label': cslabel,
155
                'subject': p.metadata['subject'] if p.metadata['subject'] else []
156
            }
157
            conceptschemes.append(cs)
158
        return conceptschemes
159
160
    @view_config(
161
        route_name='skosprovider.conceptscheme',
162
        request_method='GET',
163
        accept='application/json',
164
        renderer='skosjson'
165
    )
166
    def get_conceptscheme(self):
167
        scheme_id = self.request.matchdict['scheme_id']
168
        provider = self.skos_registry.get_provider(scheme_id)
169
        if not provider:
170
            return HTTPNotFound()
171
        language = self.request.params.get('language', self.request.locale_name)
172
        return {
173
            'id': provider.get_vocabulary_id(),
174
            'uri': provider.concept_scheme.uri,
175
            'label': provider.concept_scheme.label(language).label if provider.concept_scheme.label(language) else None,
176
            'subject': provider.metadata['subject'] if provider.metadata['subject'] else [],
177
            'labels': provider.concept_scheme.labels,
178
            'notes': provider.concept_scheme.notes,
179
            'sources': provider.concept_scheme.sources,
180
            'languages': provider.concept_scheme.languages
181
        }
182
183
    @view_config(
184
        route_name='skosprovider.conceptscheme',
185
        request_method='GET',
186
        renderer='skosjsonld',
187
        accept='application/ld+json'
188
    )
189
    @view_config(
190
        route_name='skosprovider.conceptscheme.jsonld',
191
        request_method='GET',
192
        renderer='skosjsonld'
193
    )
194
    def get_conceptscheme_jsonld(self):
195
        scheme_id = self.request.matchdict['scheme_id']
196
        provider = self.skos_registry.get_provider(scheme_id)
197
        if not provider:
198
            return HTTPNotFound()
199
        return provider.concept_scheme
200
201
    @view_config(
202
        route_name='skosprovider.conceptscheme.tc',
203
        request_method='GET',
204
        accept='application/json',
205
        renderer='skosjson'
206
    )
207
    def get_conceptscheme_top_concepts(self):
208
        scheme_id = self.request.matchdict['scheme_id']
209
        provider = self.skos_registry.get_provider(scheme_id)
210
        if not provider:
211
            return HTTPNotFound()
212
        language = self.request.params.get('language', self.request.locale_name)
213
        return provider.get_top_concepts(language=language)
214
215
    @view_config(
216
        route_name='skosprovider.conceptscheme.display_top',
217
        request_method='GET',
218
        accept='application/json',
219
        renderer='skosjson'
220
    )
221
    def get_conceptscheme_display_top(self):
222
        scheme_id = self.request.matchdict['scheme_id']
223
        provider = self.skos_registry.get_provider(scheme_id)
224
        if not provider:
225
            return HTTPNotFound()
226
        language = self.request.params.get('language', self.request.locale_name)
227
        return provider.get_top_display(language=language)
228
229
    def _build_providers(self, request):
230
        '''
231
        :param pyramid.request.Request request:
232
        :rtype: :class:`dict`
233
        '''
234
        # determine targets
235
        providers = {}
236
        ids = request.params.get('providers.ids', None)
237
        if ids:
238
            ids = ids.split(',')
239
            providers['ids'] = ids
240
        subject = self.request.params.get('providers.subject', None)
241
        if subject:
242
            providers['subject'] = subject
243
        return providers
244
245
    @view_config(
246
        route_name='skosprovider.cs',
247
        request_method='GET',
248
        accept='application/json',
249
        renderer='skosjson'
250
    )
251
    def get_concepts(self):
252
        qb = QueryBuilder(self.request)
253
        query = qb()
254
        kwargs = {"language": qb.language, "providers": self._build_providers(self.request)}
255
        kwargs.update(self._get_sort_params())
256
        if qb.no_result:
257
            concepts = []
258
        else:
259
            concepts = self.skos_registry.find(query, **kwargs)
260
            # Flatten it all
261
            concepts = list(itertools.chain.from_iterable([c['concepts'] for c in concepts]))
262
263
        if qb.postprocess:
264
            concepts = self._postprocess_wildcards(concepts, qb.label)
265
266
        return self._page_results(concepts)
267
268
    @view_config(
269
        route_name='skosprovider.conceptscheme.cs',
270
        request_method='GET',
271
        accept='application/json',
272
        renderer='skosjson'
273
    )
274
    def get_conceptscheme_concepts(self):
275
        scheme_id = self.request.matchdict['scheme_id']
276
        provider = self.skos_registry.get_provider(scheme_id)
277
        if not provider:
278
            return HTTPNotFound()
279
        qb = QueryBuilder(self.request)
280
        query = qb()
281
        kwargs = {"language": qb.language}
282
        kwargs.update(self._get_sort_params())
283
        if qb.no_result:
284
            concepts = []
285
        else:
286
            concepts = provider.find(query, **kwargs)
287
288
        if qb.postprocess:
289
            concepts = self._postprocess_wildcards(concepts, qb.label)
290
291
        return self._page_results(concepts)
292
293
    @staticmethod
294
    def _postprocess_wildcards(concepts, label):
295
        # We need to refine results further
296
        if label.startswith('*') and not label.endswith('*'):
297
            concepts = [
298
                c for c in concepts if c['label'].lower().endswith(label[1:].lower())
299
            ]
300
        elif label.endswith('*') and not label.startswith('*'):
301
            concepts = [
302
                c for c in concepts if c['label'].lower().startswith(label[:-1].lower())
303
            ]
304
        return concepts
305
306
    def _get_sort_params(self):
307
        sort = self.request.params.get('sort', None)
308
        # Result sorting
309
        if sort:
310
            sort_order = 'desc' if sort[0:1] == '-' else 'asc'
311
            sort = sort[1:] if sort[0:1] in ['-', '+'] else sort
312
            sort = sort.strip().lower()  # dojo store does not encode '+'
313
            return {"sort": sort, "sort_order": sort_order}
314
        return {}
315
316
    def _page_results(self, concepts):
317
        # Result paging
318
        paging_data = False
319
        if 'Range' in self.request.headers:
320
            paging_data = parse_range_header(self.request.headers['Range'])
321
        count = len(concepts)
322
        if not paging_data:
323
            paging_data = {
324
                'start': 0,
325
                'finish': count - 1 if count > 0 else 0,
326
                'number': count
327
            }
328
        cslice = concepts[paging_data['start']:paging_data['finish']+1]
329
        if len(cslice):
330
            cslice[0]['@context'] = self.request.route_url('skosprovider.context')
331
        self.request.response.headers['Content-Range'] = \
332
            'items %d-%d/%d' % (
333
                paging_data['start'], paging_data['finish'], count
334
            )
335
        return cslice
336
337
    @view_config(
338
        route_name='skosprovider.c',
339
        request_method='GET',
340
        accept='application/json',
341
        renderer='skosjson'
342
    )
343
    @view_config(
344
        route_name='skosprovider.c',
345
        request_method='GET',
346
        renderer='skosjsonld',
347
        accept='application/ld+json'
348
    )
349
    @view_config(
350
        route_name='skosprovider.c.jsonld',
351
        request_method='GET',
352
        renderer='skosjsonld'
353
    )
354
    def get_concept(self):
355
        scheme_id = self.request.matchdict['scheme_id']
356
        concept_id = self.request.matchdict['c_id']
357
        provider = self.skos_registry.get_provider(scheme_id)
358
        if not provider:
359
            return HTTPNotFound()
360
        concept = provider.get_by_id(concept_id)
361
        if not concept:
362
            return HTTPNotFound()
363
        return concept
364
365
    @view_config(
366
        route_name='skosprovider.c.display_children',
367
        request_method='GET',
368
        accept='application/json',
369
        renderer='skosjson'
370
    )
371
    def get_concept_display_children(self):
372
        scheme_id = self.request.matchdict['scheme_id']
373
        concept_id = self.request.matchdict['c_id']
374
        provider = self.skos_registry.get_provider(scheme_id)
375
        if not provider:
376
            return HTTPNotFound()
377
        language = self.request.params.get('language', self.request.locale_name)
378
        children = provider.get_children_display(concept_id, language=language)
379
        if children is False:
380
            return HTTPNotFound()
381
        return children
382
383
    @view_config(
384
        route_name='skosprovider.c.expand',
385
        request_method='GET',
386
        accept='application/json',
387
        renderer='skosjson'
388
    )
389
    def get_expand(self):
390
        scheme_id = self.request.matchdict['scheme_id']
391
        concept_id = self.request.matchdict['c_id']
392
        provider = self.skos_registry.get_provider(scheme_id)
393
        if not provider:
394
            return HTTPNotFound()
395
        expanded = provider.expand(concept_id)
396
        if not expanded:
397
            return HTTPNotFound()
398
        return expanded
399