Completed
Push — develop ( c2f10a...b3f727 )
by Bram
13s
created

atramhasis.utils   A

Complexity

Total Complexity 18

Size/Duplication

Total Lines 165
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 106
dl 0
loc 165
rs 10
c 0
b 0
f 0
wmc 18

7 Functions

Rating   Name   Duplication   Size   Complexity  
A db_provider_to_skosprovider() 0 15 1
A internal_providers_only() 0 19 3
B from_thing() 0 56 6
A update_last_visited_concepts() 0 12 2
A label_sort() 0 6 3
A provider_is_external() 0 4 1
A db_session() 0 13 2
1
"""
2
Module containing utility functions used by Atramhasis.
3
"""
4
5
import contextlib
6
import copy
7
from collections import deque
8
9
from pyramid.httpexceptions import HTTPMethodNotAllowed
10
from skosprovider.skos import Collection
11
from skosprovider.skos import Concept
12
from skosprovider.skos import ConceptScheme
13
from skosprovider.skos import Label
14
from skosprovider.skos import Note
15
from skosprovider.skos import Source
16
from skosprovider.uri import UriPatternGenerator
17
from skosprovider_sqlalchemy.providers import SQLAlchemyProvider
18
from sqlalchemy import engine_from_config
19
from sqlalchemy import orm
20
from sqlalchemy.orm import sessionmaker
21
22
from atramhasis.data.models import Provider
23
24
25
def from_thing(thing):
26
    """
27
    Map a :class:`skosprovider_sqlalchemy.models.Thing` to a
28
    :class:`skosprovider.skos.Concept` or
29
    a :class:`skosprovider.skos.Collection`, depending on the type.
30
31
    :param skosprovider_sqlalchemy.models.Thing thing: Thing to map.
32
    :rtype: :class:`~skosprovider.skos.Concept` or
33
        :class:`~skosprovider.skos.Collection`.
34
    """
35
    if thing.type and thing.type == 'collection':
36
        return Collection(
37
            id=thing.concept_id,
38
            uri=thing.uri,
39
            concept_scheme=ConceptScheme(thing.conceptscheme.uri),
40
            labels=[
41
                Label(label.label, label.labeltype_id, label.language_id)
42
                for label in thing.labels
43
            ],
44
            notes=[Note(n.note, n.notetype_id, n.language_id) for n in thing.notes],
45
            sources=[Source(s.citation) for s in thing.sources],
46
            members=[member.concept_id for member in thing.members]
47
            if hasattr(thing, 'members')
48
            else [],
49
            member_of=[c.concept_id for c in thing.member_of],
50
            superordinates=[
51
                broader_concept.concept_id for broader_concept in thing.broader_concepts
52
            ],
53
            infer_concept_relations=thing.infer_concept_relations,
54
        )
55
    else:
56
        matches = {}
57
        for m in thing.matches:
58
            key = m.matchtype.name[: m.matchtype.name.find('Match')]
59
            if key not in matches:
60
                matches[key] = []
61
            matches[key].append(m.uri)
62
        return Concept(
63
            id=thing.concept_id,
64
            uri=thing.uri,
65
            concept_scheme=ConceptScheme(thing.conceptscheme.uri),
66
            labels=[
67
                Label(label.label, label.labeltype_id, label.language_id)
68
                for label in thing.labels
69
            ],
70
            notes=[Note(n.note, n.notetype_id, n.language_id) for n in thing.notes],
71
            sources=[Source(s.citation) for s in thing.sources],
72
            broader=[c.concept_id for c in thing.broader_concepts],
73
            narrower=[c.concept_id for c in thing.narrower_concepts],
74
            related=[c.concept_id for c in thing.related_concepts],
75
            member_of=[c.concept_id for c in thing.member_of],
76
            subordinate_arrays=[
77
                narrower_collection.concept_id
78
                for narrower_collection in thing.narrower_collections
79
            ],
80
            matches=matches,
81
        )
82
83
84
def internal_providers_only(fn):
85
    """
86
    aspect oriented way to check if provider is internal when calling the decorated function
87
88
    :param fn: the decorated function
89
    :return: around advice
90
    :raises pyramid.httpexceptions.HTTPMethodNotAllowed: when provider is not internal
91
    """
92
93
    def advice(parent_object, *args, **kw):
94
        if (
95
            isinstance(parent_object.provider, SQLAlchemyProvider)
96
            and 'external' not in parent_object.provider.get_metadata()['subject']
97
        ):
98
            return fn(parent_object, *args, **kw)
99
        else:
100
            raise HTTPMethodNotAllowed()
101
102
    return advice
103
104
105
def update_last_visited_concepts(request, concept_data):
106
    deque_last_visited = deque(maxlen=4)
107
    deque_last_visited.extend(request.session.get('last_visited', []))
108
    try:
109
        # Try to remove concept from the queue to prevent double entries
110
        deque_last_visited.remove(concept_data)
111
    except ValueError:
112
        # Concept is not in the queue
113
        pass
114
    # Add concept to the queue
115
    deque_last_visited.append(concept_data)
116
    request.session['last_visited'] = list(deque_last_visited)
117
118
119
def label_sort(concepts, language='any'):
120
    if not concepts:
121
        return []
122
    return sorted(
123
        concepts,
124
        key=lambda concept: concept._sortkey(key='sortlabel', language=language),
125
    )
126
127
128
def db_provider_to_skosprovider(db_provider: Provider) -> SQLAlchemyProvider:
129
    """Create a SQLAlchemyProvider from a atramhasis.data.models.Provider.
130
131
    :param db_provider: The Provider to use as basis for the SQLAlchemyProvider.
132
    :return: An SQLAlchemyProvider with the data from the `db_provider`
133
    """
134
    metadata = copy.deepcopy(db_provider.meta)
135
    metadata['conceptscheme_id'] = db_provider.conceptscheme_id
136
    metadata['atramhasis.id_generation_strategy'] = db_provider.id_generation_strategy
137
    metadata['id'] = db_provider.id
138
    return SQLAlchemyProvider(
139
        metadata=metadata,
140
        session=orm.object_session(db_provider),
141
        expand_strategy=db_provider.expand_strategy.value,
142
        uri_generator=UriPatternGenerator(db_provider.uri_pattern),
143
    )
144
145
146
def provider_is_external(provider):
147
    """Check if a provider is marked as external via its metadata."""
148
    subjects = provider.get_metadata().get('subject') or []
149
    return any(str(subject).lower() == 'external' for subject in subjects)
150
151
152
@contextlib.contextmanager
153
def db_session(settings):  # pragma: no cover
154
    engine = engine_from_config(settings, 'sqlalchemy.')
155
    session_maker = sessionmaker(bind=engine)
156
    session = session_maker()
157
    try:
158
        yield session
159
        session.commit()
160
    except Exception:
161
        session.rollback()
162
        raise
163
    finally:
164
        session.close()
165