Completed
Pull Request — master (#39)
by Koen
33s
created

RDFProvider   F

Complexity

Total Complexity 62

Size/Duplication

Total Lines 246
Duplicated Lines 0 %

Importance

Changes 7
Bugs 0 Features 2
Metric Value
c 7
b 0
f 2
dl 0
loc 246
rs 3.44
wmc 62

16 Methods

Rating   Name   Duplication   Size   Complexity  
A _scrub_language() 0 6 2
A _create_languages() 0 13 3
A _get_in_scheme() 0 13 3
A _create_from_subject_typelist() 0 6 2
A _get_id_for_subject() 0 9 4
A to_text() 0 7 1
A _create_sources() 0 17 4
A __init__() 0 9 2
B _fill_member_of() 0 7 6
A _create_label() 0 6 2
C _cs_from_graph() 0 39 10
A _get_language_from_literal() 0 6 3
A _create_from_subject_predicate() 0 12 4
A _create_note() 0 7 2
B _read_markupped_literal() 0 14 6
C _from_graph() 0 43 8

How to fix   Complexity   

Complex Class

Complex classes like RDFProvider often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
3
'''
4
This module contains an RDFProvider, an implementation of the 
5
:class:`skosprovider.providers.VocabularyProvider` interface that uses a 
6
:class:`rdflib.graph.Graph` as input.
7
'''
8
9
import logging
10
import rdflib
11
from rdflib.term import Literal, URIRef
12
from skosprovider_rdf.utils import text_, _df_writexml
13
14
log = logging.getLogger(__name__)
15
16
from skosprovider.providers import MemoryProvider
17
from skosprovider.uri import (
18
    DefaultConceptSchemeUrnGenerator
19
)
20
from skosprovider.skos import (
21
    Concept,
22
    Collection,
23
    ConceptScheme,
24
    Label,
25
    Note,
26
    Source
27
)
28
29
from rdflib.namespace import RDF, SKOS, DC, DCTERMS
30
SKOS_THES = rdflib.Namespace('http://purl.org/iso25964/skos-thes#')
31
32
from language_tags import tags
33
34
from xml.dom.minidom import DocumentFragment
35
DocumentFragment.writexml = _df_writexml
36
37
38
class RDFProvider(MemoryProvider):
39
40
    '''
41
    Should the provider only take concepts into account explicitly linked 
42
    to the conceptscheme?
43
    '''
44
    check_in_scheme = False
45
46
    '''
47
    A simple vocabulary provider that use an :class:`rdflib.graph.Graph`
48
    as input. The provider expects a RDF graph with elements that represent
49
    the SKOS concepts and collections.
50
51
    Please be aware that this provider needs to load the entire graph in memory.
52
    '''
53
54
    def __init__(self, metadata, graph, **kwargs):
55
        self.graph = graph
56
        self.check_in_scheme = False
57
        if not 'concept_scheme' in kwargs:
58
            kwargs['concept_scheme'] = self._cs_from_graph(metadata, **kwargs)
59
        else:
60
            self.check_in_scheme = True
61
        super(RDFProvider, self).__init__(metadata, [], **kwargs)
62
        self.list = self._from_graph()
63
64
    def _cs_from_graph(self, metadata, **kwargs):
65
        cslist = []
66
        for sub in self.graph.subjects(RDF.type, SKOS.ConceptScheme):
67
            uri = self.to_text(sub)
68
            cs = ConceptScheme(
69
                uri=uri,
70
                labels = self._create_from_subject_typelist(sub, Label.valid_types),
71
                notes = self._create_from_subject_typelist(sub, Note.valid_types),
72
                sources = self._create_sources(sub),
73
                languages = self._create_languages(sub)
74
            )
75
            cslist.append(cs)
76
        if len(cslist) == 0:
77
            return ConceptScheme(
78
                uri=DefaultConceptSchemeUrnGenerator().generate(
79
                    id=metadata.get('id')
80
                )
81
            )
82
        elif len(cslist) == 1:
83
            return cslist[0]
84
        else:
85
            if not 'concept_scheme_uri' in kwargs:
86
                raise RuntimeError(
87
                    'This RDF file contains more than one ConceptScheme. \
88
                    Please specify one. The following schemes were found: \
89
                    %s' % (", ".join([str(cs.uri) for cs in cslist]))
90
                )
91
            else:
92
                self.check_in_scheme = True
93
                csuri = kwargs['concept_scheme_uri']
94
                filteredcslist = [cs for cs in cslist if cs.uri == csuri]
95
                if len(filteredcslist) == 0:
96
                    raise RuntimeError(
97
                        'This RDF file contains more than one ConceptScheme. \
98
                        You specified an unexisting one. The following schemes \
99
                        were found: %s' % (", ".join([str(cs.uri) for cs in cslist]))
100
                    )
101
                else:
102
                    return filteredcslist[0]
103
104
    def _from_graph(self):
105
        clist = []
106
        for sub, pred, obj in self.graph.triples((None, RDF.type, SKOS.Concept)):
107
            if self.check_in_scheme and self._get_in_scheme(sub) != self.concept_scheme.uri:
108
                    continue
109
            uri = self.to_text(sub)
110
            matches = {}
111
            for k in Concept.matchtypes:
112
                matches[k] = self._create_from_subject_predicate(sub, URIRef(SKOS + k +'Match'))
113
            con = Concept(
114
                id = self._get_id_for_subject(sub, uri), 
115
                uri=uri,
116
                concept_scheme = self.concept_scheme,
117
                labels = self._create_from_subject_typelist(sub, Label.valid_types),
118
                notes = self._create_from_subject_typelist(sub, Note.valid_types),
119
                sources = self._create_sources(sub),
120
                broader = self._create_from_subject_predicate(sub, SKOS.broader),
121
                narrower = self._create_from_subject_predicate(sub, SKOS.narrower),
122
                related = self._create_from_subject_predicate(sub, SKOS.related),
123
                member_of = [],
124
                subordinate_arrays = self._create_from_subject_predicate(sub, SKOS_THES.subordinateArray),
125
                matches = matches
126
            )
127
            clist.append(con)
128
129
        for sub, pred, obj in self.graph.triples((None, RDF.type, SKOS.Collection)):
130
            if self.check_in_scheme and self._get_in_scheme(sub) != self.concept_scheme.uri:
131
                    continue
132
            uri = self.to_text(sub)
133
            col = Collection(
134
                id=self._get_id_for_subject(sub, uri), 
135
                uri=uri,
136
                concept_scheme = self.concept_scheme,
137
                labels = self._create_from_subject_typelist(sub, Label.valid_types),
138
                notes = self._create_from_subject_typelist(sub, (Note.valid_types)),
139
                sources = self._create_sources(sub),
140
                members = self._create_from_subject_predicate(sub, SKOS.member),
141
                member_of = [],
142
                superordinates = self._create_from_subject_predicate(sub, SKOS_THES.superOrdinate)
143
            )
144
            clist.append(col)
145
        self._fill_member_of(clist)
146
        return clist
147
148
    def _get_in_scheme(self, subject):
149
        '''
150
        Determine if a subject is part of a scheme.
151
152
        :param subject: Subject to get the sources for.
153
        :returns: A URI for the scheme a subject is part of or None if
154
            it's not part of a scheme.
155
        '''
156
        scheme = None
157
        scheme = self.graph.value(subject, SKOS.inScheme)
158
        if not scheme:
159
            scheme = self.graph.value(subject, SKOS.topConceptOf)
160
        return self.to_text(scheme) if scheme else None
161
162
    def _fill_member_of(self, clist):
163
        collections = list(set([c for c in clist if isinstance(c, Collection)]))
164
        for col in collections:
165
            for c in clist:
166
                if c.id in col.members:
167
                    c.member_of.append(col.id)
168
        return
169
170
    def _create_from_subject_typelist(self,subject,typelist):
171
        list = []
172
        for p in typelist:
173
            term=SKOS.term(p)
174
            list.extend(self._create_from_subject_predicate(subject,term))
175
        return list
176
177
    def _get_id_for_subject(self, subject, uri):
178
        for stmt in self.graph:
179
            print(stmt)
180
        if (subject, DCTERMS.identifier, None) in self.graph:
181
            return self.to_text(self.graph.value(subject=subject, predicate=DCTERMS.identifier, any=False))
182
        elif (subject, DC.identifier, None) in self.graph:
183
            return self.to_text(self.graph.value(subject=subject, predicate=DC.identifier, any=False))
184
        else:
185
            return uri
186
187
    def _create_from_subject_predicate(self, subject, predicate):
188
        list = []
189
        for s, p, o in self.graph.triples((subject, predicate, None)):
190
            type = predicate.split('#')[-1]
191
            if Label.is_valid_type(type):
192
                o = self._create_label(o, type)
193
            elif Note.is_valid_type(type):
194
                o = self._create_note(o, type)
195
            else:
196
                o = self._get_id_for_subject(o, self.to_text(o))
197
            list.append(o)
198
        return list
199
200
    def _create_label(self, literal, type):
201
        if not Label.is_valid_type(type):
202
            raise ValueError(
203
                'Type of Label is not valid.'
204
            )
205
        return Label(self.to_text(literal), type, self._get_language_from_literal(literal))
206
207
    def _read_markupped_literal(self, literal):
208
        if literal.datatype is None:
209
            return (literal, self._get_language_from_literal(literal), None)
210
        elif literal.datatype == RDF.HTML:
211
            df = literal.value.cloneNode(True)
212
            if df.firstChild and df.firstChild.attributes and 'xml:lang' in df.firstChild.attributes.keys():
213
                lang = self._scrub_language(df.firstChild.attributes.get('xml:lang').value)
214
                del df.firstChild.attributes['xml:lang']
215
            else:
216
                lang = 'und'
217
            return(df.toxml(), lang, 'HTML')
218
        else:
219
            raise ValueError(
220
                'Unable to process literal of type %s.' % literal.datatype
221
            )
222
223
    def _create_note(self, literal, type):
224
        if not Note.is_valid_type(type):
225
            raise ValueError(
226
                'Type of Note is not valid.'
227
            )
228
        l = self._read_markupped_literal(literal)
229
        return Note(self.to_text(l[0]), type, l[1], l[2])
230
231
    def _create_sources(self, subject):
232
        '''
233
        Create the sources for this subject.
234
235
        :param subject: Subject to get the sources for.
236
        :returns: A :class:`list` of :class:`skosprovider.skos.Source` objects.
237
        '''
238
        ret = []
239
        for s, p, o in self.graph.triples((subject, DCTERMS.source, None)):
240
            for si, pi, oi in self.graph.triples((o, DCTERMS.bibliographicCitation, None)):
241
                ret.append(
242
                    Source(
243
                        self.to_text(oi),
244
                        'HTML' if oi.datatype == RDF.HTML else None
245
                )
246
                )
247
        return ret
248
249
    def _create_languages(self, subject):
250
        '''
251
        Create the languages for this subject.
252
253
        :param subject: Subject to get the sources for.
254
        :returns: A :class:`list` of IANA language tags.
255
        '''
256
        ret = set()
257
        for s, p, o in self.graph.triples((subject, DCTERMS.language, None)):
258
            ret.add(self.to_text(self._scrub_language(o)))
259
        for s, p, o in self.graph.triples((subject, DC.language, None)):
260
            ret.add(self.to_text(self._scrub_language(o)))
261
        return ret
262
263
    def _scrub_language(self, language):
264
        if tags.check(language):
265
            return language
266
        else:
267
            log.warn('Encountered an invalid language %s. Falling back to "und".' % language)
268
            return 'und'
269
270
    def _get_language_from_literal(self, data):
271
        if not isinstance(data, Literal):
272
            return None
273
        if data.language is None:
274
            return None
275
        return self.to_text(self._scrub_language(data.language))
276
277
    def to_text(self, data):
278
        """
279
        data of binary type or literal type that needs to be converted to text.
280
        :param data
281
        :return: text representation of the data
282
        """
283
        return text_(data.encode('utf-8'), 'utf-8')
284