atramhasis.data.datamanagers   B
last analyzed

Complexity

Total Complexity 45

Size/Duplication

Total Lines 420
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 214
dl 0
loc 420
rs 8.8
c 0
b 0
f 0
wmc 45

32 Methods

Rating   Name   Duplication   Size   Complexity  
A CountsManager.save() 0 11 1
A ProviderDataManager.get_all_providers() 0 7 1
A SkosManager.get_by_list_type() 0 7 1
A SkosManager.change_type() 0 10 2
A ConceptSchemeManager.get_all() 0 19 1
A ConceptSchemeManager.get_concepts_for_scheme_tree() 0 16 1
A AuditManager.get_most_popular_concepts_for_conceptscheme() 0 32 2
A ConceptSchemeManager.get_collections_for_scheme_tree() 0 16 1
A CountsManager.get_most_recent_count_for_scheme() 0 7 1
A SkosManager.__init__() 0 2 1
A SkosManager.get_match() 0 8 1
A ConceptSchemeManager.get() 0 9 1
A LanguagesManager.delete() 0 6 1
A ConceptSchemeManager.__init__() 0 2 1
A LanguagesManager.get() 0 4 1
A LanguagesManager.get_all_sorted() 0 18 2
A LanguagesManager.__init__() 0 2 1
A AuditManager.save() 0 9 1
A SkosManager.save() 0 13 1
A DataManager.__init__() 0 2 1
A LanguagesManager.count_languages() 0 4 1
A LanguagesManager.get_all() 0 6 1
A ProviderDataManager.get_provider_by_id() 0 4 1
A SkosManager.delete_thing() 0 6 1
A SkosManager.get_match_type() 0 4 1
A ConceptSchemeManager.save() 0 9 1
A LanguagesManager.save() 0 9 1
A SkosManager.get_thing() 0 13 1
A AuditManager._get_first_day() 0 16 5
A SkosManager.get_all_label_types() 0 2 1
A SkosManager.get_next_cid() 0 12 4
A ConceptSchemeManager.find() 0 21 4

How to fix   Complexity   

Complexity

Complex classes like atramhasis.data.datamanagers often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
This module adds DataManagers for Atramhasis. These are service layer objects
3
that abstract all interactions with the database away from the views.
4
5
:versionadded: 0.4.1
6
"""
7
8
import uuid
9
from datetime import date
10
from datetime import datetime
11
from typing import List
12
13
import dateutil.relativedelta
14
import sqlalchemy as sa
15
from skosprovider_sqlalchemy.models import Collection
16
from skosprovider_sqlalchemy.models import Concept
17
from skosprovider_sqlalchemy.models import ConceptScheme
18
from skosprovider_sqlalchemy.models import Label
19
from skosprovider_sqlalchemy.models import LabelType
20
from skosprovider_sqlalchemy.models import Language
21
from skosprovider_sqlalchemy.models import Match
22
from skosprovider_sqlalchemy.models import MatchType
23
from skosprovider_sqlalchemy.models import Thing
24
from sqlalchemy import desc
25
from sqlalchemy import func
26
from sqlalchemy import select
27
from sqlalchemy.orm import Session
28
from sqlalchemy.orm import joinedload
29
30
from atramhasis.data import popular_concepts
31
from atramhasis.data.models import ConceptVisitLog
32
from atramhasis.data.models import ConceptschemeCounts
33
from atramhasis.data.models import IDGenerationStrategy
34
from atramhasis.data.models import Provider
35
from atramhasis.scripts import delete_scheme
36
37
38
class DataManager:
39
    """
40
    A DataManager abstracts all interactions with the database for a certain model.
41
    """
42
43
    def __init__(self, session: Session) -> None:
44
        self.session: Session = session
45
46
47
class ConceptSchemeManager(DataManager):
48
    """
49
    A :class:`DataManager` for
50
    :class:`ConceptSchemes <skosprovider_sqlalchemy.models.ConceptScheme>.`
51
    """
52
53
    def __init__(self, session):
54
        super().__init__(session)
55
56
    def get(self, conceptscheme_id):
57
        """
58
59
        :param conceptscheme_id: a concepscheme id
60
        :return: the concepscheme for the given id
61
        """
62
        return self.session.execute(
63
            select(ConceptScheme).filter(ConceptScheme.id == conceptscheme_id)
64
        ).scalar_one()
65
66
    def find(self, conceptscheme_id, query):
67
        """
68
        Find concepts and collections in this concept scheme.
69
70
        :param conceptscheme_id: a concepscheme id
71
        :param query: A python dictionary containing query parameters.
72
        :returns: A :class:`list` of
73
            :class:`skosprovider_sqlalchemy.models.Thing` instances.
74
        """
75
        db_query = (
76
            select(Thing)
77
            .options(joinedload(Thing.labels))
78
            .filter(Thing.conceptscheme_id == conceptscheme_id)
79
        )
80
        if 'type' in query and query['type'] in ['concept', 'collection']:
81
            db_query = db_query.filter(Thing.type == query['type'])
82
        if 'label' in query:
83
            db_query = db_query.filter(
84
                Thing.labels.any(Label.label.ilike('%' + query['label'].lower() + '%'))
85
            )
86
        return self.session.execute(db_query).unique().scalars().all()
87
88
    def get_concepts_for_scheme_tree(self, conceptscheme_id):
89
        """
90
91
        :param conceptscheme_id:  a concepscheme id
92
        :return: all concepts for the scheme_tree
93
        """
94
        return (
95
            self.session.execute(
96
                select(Concept).filter(
97
                    Concept.conceptscheme_id == conceptscheme_id,
98
                    ~Concept.broader_concepts.any(),
99
                    ~Collection.member_of.any(),
100
                )
101
            )
102
            .scalars()
103
            .all()
104
        )
105
106
    def get_collections_for_scheme_tree(self, conceptscheme_id):
107
        """
108
109
        :param conceptscheme_id: a concepscheme id
110
        :return: all collections for the scheme_tree
111
        """
112
        return (
113
            self.session.execute(
114
                select(Collection).filter(
115
                    Collection.conceptscheme_id == conceptscheme_id,
116
                    ~Collection.broader_concepts.any(),
117
                    ~Collection.member_of.any(),
118
                )
119
            )
120
            .scalars()
121
            .all()
122
        )
123
124
    def get_all(self, conceptscheme_id):
125
        """
126
        Get all concepts and collections in this concept scheme.
127
128
        :param conceptscheme_id: a concepscheme id
129
        :returns: A :class:`list` of
130
            :class:`skosprovider_sqlalchemy.models.Thing` instances.
131
        """
132
        all_results = (
133
            self.session.execute(
134
                select(Thing)
135
                .options(joinedload(Thing.labels))
136
                .filter(Thing.conceptscheme_id == conceptscheme_id)
137
            )
138
            .unique()
139
            .scalars()
140
            .all()
141
        )
142
        return all_results
143
144
    def save(self, conceptscheme):
145
        """
146
147
        :param conceptscheme: conceptscheme to save
148
        :return: saved conceptscheme
149
        """
150
        self.session.merge(conceptscheme)
151
        self.session.flush()
152
        return conceptscheme
153
154
155
class SkosManager(DataManager):
156
    """
157
    A :class:`DataManager` for
158
    :class:`Concepts and Collections <skosprovider_sqlalchemy.models.Thing>.`
159
    """
160
161
    def __init__(self, session):
162
        super().__init__(session)
163
164
    def get_thing(self, concept_id, conceptscheme_id):
165
        """
166
167
        :param concept_id: a concept id
168
        :param conceptscheme_id: a conceptscheme id
169
        :return: the selected thing (Concept or Collection)
170
        """
171
        return self.session.execute(
172
            select(Thing).filter(
173
                Thing.concept_id == concept_id,
174
                Thing.conceptscheme_id == conceptscheme_id,
175
            )
176
        ).scalar_one()
177
178
    def save(self, thing):
179
        """
180
181
        :param thing: thing to save
182
        :return: saved thing
183
        """
184
        self.session.add(thing)
185
        self.session.flush()
186
        # We refresh to guarantee the data types are correct
187
        # For example, concept_id might be an integer when generation is set to
188
        # numeric, but this should be a string.
189
        self.session.refresh(thing)
190
        return thing
191
192
    def change_type(self, thing, concept_id, conceptscheme_id, new_type, uri):
193
        self.delete_thing(thing)
194
        self.session.flush()
195
        thing = Concept() if new_type == 'concept' else Collection()
196
        thing.type = new_type
197
        thing.concept_id = concept_id
198
        thing.conceptscheme_id = conceptscheme_id
199
        thing.uri = uri
200
        self.save(thing)
201
        return thing
202
203
    def delete_thing(self, thing):
204
        """
205
206
        :param thing: the thing to delete
207
        """
208
        self.session.delete(thing)
209
210
    def get_by_list_type(self, list_type):
211
        """
212
213
        :param list_type: a specific list type
214
        :return: all results for the specific list type
215
        """
216
        return self.session.execute(select(list_type)).scalars().all()
217
218
    def get_match_type(self, match_type):
219
        return self.session.execute(
220
            select(MatchType).filter(MatchType.name == match_type)
221
        ).scalar_one()
222
223
    def get_match(self, uri, matchtype_id, concept_id):
224
        return self.session.execute(
225
            select(Match).filter(
226
                Match.uri == uri,
227
                Match.matchtype_id == matchtype_id,
228
                Match.concept_id == concept_id,
229
            )
230
        ).scalar_one()
231
232
    def get_all_label_types(self):
233
        return self.session.execute(select(LabelType)).scalars().all()
234
235
    def get_next_cid(self, conceptscheme_id, id_generation_strategy):
236
        if id_generation_strategy == IDGenerationStrategy.NUMERIC:
237
            max_id = self.session.execute(
238
                select(func.max(sa.cast(Thing.concept_id, sa.Integer))).filter_by(
239
                    conceptscheme_id=conceptscheme_id
240
                )
241
            ).scalar_one()
242
            return max_id + 1 if max_id else 1
243
        elif id_generation_strategy == IDGenerationStrategy.GUID:
244
            return str(uuid.uuid4())
245
        else:
246
            raise ValueError('unsupported id_generation_strategy')
247
248
249
class LanguagesManager(DataManager):
250
    """
251
    A :class:`DataManager` for
252
    :class:`Languages <skosprovider_sqlalchemy.models.Language>.`
253
    """
254
255
    def __init__(self, session):
256
        super().__init__(session)
257
258
    def get(self, language_id):
259
        return self.session.execute(
260
            select(Language).filter(Language.id == language_id)
261
        ).scalar_one()
262
263
    def save(self, language):
264
        """
265
266
        :param language: language to save
267
        :return: saved language
268
        """
269
        self.session.add(language)
270
        self.session.flush()
271
        return language
272
273
    def delete(self, language):
274
        """
275
276
        :param language: the language to delete
277
        """
278
        self.session.delete(language)
279
280
    def get_all(self):
281
        """
282
283
        :return: list of all languages
284
        """
285
        return self.session.execute(select(Language)).scalars().all()
286
287
    def get_all_sorted(self, sort_coll, sort_desc):
288
        """
289
290
        :param sort_coll: sort on this column
291
        :param sort_desc: descending or not
292
        :return: sorted list of languages
293
        """
294
        if sort_desc:
295
            return (
296
                self.session.execute(select(Language).order_by(desc(sort_coll)))
297
                .scalars()
298
                .all()
299
            )
300
        else:
301
            return (
302
                self.session.execute(select(Language).order_by(sort_coll))
303
                .scalars()
304
                .all()
305
            )
306
307
    def count_languages(self, language_tag):
308
        return self.session.execute(
309
            select(func.count(Language.id)).filter(Language.id == language_tag)
310
        ).scalar_one()
311
312
313
class AuditManager(DataManager):
314
    """
315
    A data manager for logging the visit.
316
    """
317
318
    def save(self, visit_log):
319
        """
320
        save a certain visit
321
        :param visit_log: log of visit to save
322
        :return: The saved visit log
323
        """
324
        self.session.add(visit_log)
325
        self.session.flush()
326
        return visit_log
327
328
    @popular_concepts.cache_on_arguments(expiration_time=86400)
329
    def get_most_popular_concepts_for_conceptscheme(
330
        self, conceptscheme_id, max_results=5, period='last_month'
331
    ):
332
        """
333
        get the most popular concepts for a conceptscheme
334
        :param conceptscheme_id: id of the conceptscheme
335
        :param max_results: maximum number of results, default 5
336
        :param period: 'last_day' or 'last_week' or 'last_month' or 'last_year', default 'last_mont h'
337
        :return: List of the most popular concepts of a conceptscheme over a certain period
338
        """
339
340
        start_date = self._get_first_day(period)
341
        rows = self.session.execute(
342
            select(
343
                ConceptVisitLog.concept_id,
344
                func.count(ConceptVisitLog.concept_id).label('count'),
345
            )
346
            .filter(
347
                ConceptVisitLog.conceptscheme_id == str(conceptscheme_id),
348
                ConceptVisitLog.visited_at >= start_date,
349
            )
350
            .group_by(ConceptVisitLog.concept_id)
351
            .order_by(desc('count'))
352
            .limit(max_results)
353
        ).all()
354
        results = []
355
        for row in rows:
356
            results.append(
357
                {'concept_id': row.concept_id, 'scheme_id': conceptscheme_id}
358
            )
359
        return results
360
361
    @staticmethod
362
    def _get_first_day(period):
363
        """
364
        get the first day of a certain period until now
365
        :param period: 'last_day' or 'last_week' or 'last_month' or 'last_year'
366
        :return: (string) the first day of the period
367
        """
368
        d = date.today()
369
        datetime.combine(d, datetime.min.time())
370
        start_date = d - dateutil.relativedelta.relativedelta(
371
            days=1 if period == 'last_day' else 0,
372
            weeks=1 if period == 'last_week' else 0,
373
            months=1 if period == 'last_month' else 0,
374
            years=1 if period == 'last_year' else 0,
375
        )
376
        return start_date.strftime('%Y-%m-%d')
377
378
379
class CountsManager(DataManager):
380
    """
381
    A data manager that deals with triple counts.
382
    """
383
384
    def save(self, counts):
385
        """
386
        Save a certain counts object
387
388
        :param atramhasis.data.models.ConceptschemeCounts counts: Counts object to save
389
390
        :return: The saved count
391
        """
392
        self.session.add(counts)
393
        self.session.flush()
394
        return counts
395
396
    def get_most_recent_count_for_scheme(self, conceptscheme_id):
397
        recent = self.session.execute(
398
            select(ConceptschemeCounts)
399
            .filter(ConceptschemeCounts.conceptscheme_id == conceptscheme_id)
400
            .order_by(desc('counted_at'))
401
        ).scalar_one()
402
        return recent
403
404
405
class ProviderDataManager(DataManager):
406
    """A data manager for managing Providers."""
407
408
    def get_provider_by_id(self, provider_id) -> Provider:
409
        return self.session.execute(
410
            select(Provider).filter(Provider.id == provider_id)
411
        ).scalar_one()
412
413
    def get_all_providers(self) -> List[Provider]:
414
        """
415
        Retrieve all providers from the database.
416
417
        :return: All providers
418
        """
419
        return self.session.execute(select(Provider)).scalars().all()
420