1
|
|
|
""" |
2
|
|
|
Module that validates incoming JSON. |
3
|
|
|
""" |
4
|
|
|
|
5
|
|
|
import copy |
6
|
|
|
|
7
|
|
|
import bleach |
8
|
|
|
import colander |
9
|
|
|
from language_tags import tags |
10
|
|
|
from skosprovider_sqlalchemy.models import ( |
11
|
|
|
Language |
12
|
|
|
) |
13
|
|
|
from sqlalchemy.exc import NoResultFound |
14
|
|
|
|
15
|
|
|
from atramhasis.errors import ValidationError |
16
|
|
|
from atramhasis.data.models import IDGenerationStrategy |
17
|
|
|
|
18
|
|
|
|
19
|
|
|
class ForcedString(colander.String): |
20
|
|
|
""" |
21
|
|
|
Acts just like colander.String but anything it gets will be turned into a string. |
22
|
|
|
|
23
|
|
|
eg. Passing a number 1 will treat it as "1". null/None remains null/None |
24
|
|
|
""" |
25
|
|
|
|
26
|
|
|
def deserialize(self, node, cstruct): |
27
|
|
|
if ( |
28
|
|
|
not isinstance(cstruct, str) |
29
|
|
|
and not isinstance(cstruct, bytes) |
30
|
|
|
and cstruct is not colander.null |
31
|
|
|
): |
32
|
|
|
cstruct = str(cstruct) |
33
|
|
|
return super().deserialize(node, cstruct) |
34
|
|
|
|
35
|
|
|
|
36
|
|
|
class Label(colander.MappingSchema): |
37
|
|
|
label = colander.SchemaNode( |
38
|
|
|
colander.String() |
39
|
|
|
) |
40
|
|
|
type = colander.SchemaNode( |
41
|
|
|
colander.String() |
42
|
|
|
) |
43
|
|
|
language = colander.SchemaNode( |
44
|
|
|
colander.String() |
45
|
|
|
) |
46
|
|
|
|
47
|
|
|
|
48
|
|
|
def html_preparer(value): |
49
|
|
|
""" |
50
|
|
|
Prepare the value by stripping all html except certain tags. |
51
|
|
|
|
52
|
|
|
:param value: The value to be cleaned. |
53
|
|
|
:rtype: str |
54
|
|
|
""" |
55
|
|
|
try: |
56
|
|
|
return bleach.clean(value, tags=['strong', 'em', 'a'], strip=True) |
57
|
|
|
except TypeError: |
58
|
|
|
# Trying to clean a non-string |
59
|
|
|
# Ignore for now so it can be caught later on |
60
|
|
|
return value |
61
|
|
|
|
62
|
|
|
|
63
|
|
|
class Note(colander.MappingSchema): |
64
|
|
|
note = colander.SchemaNode( |
65
|
|
|
colander.String(), |
66
|
|
|
preparer=html_preparer |
67
|
|
|
) |
68
|
|
|
type = colander.SchemaNode( |
69
|
|
|
colander.String() |
70
|
|
|
) |
71
|
|
|
language = colander.SchemaNode( |
72
|
|
|
colander.String() |
73
|
|
|
) |
74
|
|
|
|
75
|
|
|
|
76
|
|
|
class Source(colander.MappingSchema): |
77
|
|
|
citation = colander.SchemaNode( |
78
|
|
|
colander.String(), |
79
|
|
|
preparer=html_preparer |
80
|
|
|
) |
81
|
|
|
|
82
|
|
|
|
83
|
|
|
class Labels(colander.SequenceSchema): |
84
|
|
|
label = Label() |
85
|
|
|
|
86
|
|
|
|
87
|
|
|
class Notes(colander.SequenceSchema): |
88
|
|
|
note = Note() |
89
|
|
|
|
90
|
|
|
|
91
|
|
|
class Sources(colander.SequenceSchema): |
92
|
|
|
source = Source() |
93
|
|
|
|
94
|
|
|
|
95
|
|
|
class RelatedConcept(colander.MappingSchema): |
96
|
|
|
id = colander.SchemaNode( |
97
|
|
|
ForcedString() |
98
|
|
|
) |
99
|
|
|
|
100
|
|
|
|
101
|
|
|
class Concepts(colander.SequenceSchema): |
102
|
|
|
concept = RelatedConcept() |
103
|
|
|
|
104
|
|
|
|
105
|
|
|
class MatchList(colander.SequenceSchema): |
106
|
|
|
match = colander.SchemaNode( |
107
|
|
|
colander.String(), |
108
|
|
|
missing=None |
109
|
|
|
) |
110
|
|
|
|
111
|
|
|
|
112
|
|
|
class Matches(colander.MappingSchema): |
113
|
|
|
broad = MatchList(missing=[]) |
114
|
|
|
close = MatchList(missing=[]) |
115
|
|
|
exact = MatchList(missing=[]) |
116
|
|
|
narrow = MatchList(missing=[]) |
117
|
|
|
related = MatchList(missing=[]) |
118
|
|
|
|
119
|
|
|
|
120
|
|
|
class Concept(colander.MappingSchema): |
121
|
|
|
id = colander.SchemaNode( |
122
|
|
|
ForcedString(), |
123
|
|
|
missing=None |
124
|
|
|
) |
125
|
|
|
type = colander.SchemaNode( |
126
|
|
|
colander.String(), |
127
|
|
|
missing='concept' |
128
|
|
|
) |
129
|
|
|
labels = Labels(missing=[]) |
130
|
|
|
notes = Notes(missing=[]) |
131
|
|
|
sources = Sources(missing=[]) |
132
|
|
|
broader = Concepts(missing=[]) |
133
|
|
|
narrower = Concepts(missing=[]) |
134
|
|
|
related = Concepts(missing=[]) |
135
|
|
|
members = Concepts(missing=[]) |
136
|
|
|
member_of = Concepts(missing=[]) |
137
|
|
|
subordinate_arrays = Concepts(missing=[]) |
138
|
|
|
superordinates = Concepts(missing=[]) |
139
|
|
|
matches = Matches(missing={}) |
140
|
|
|
infer_concept_relations = colander.SchemaNode( |
141
|
|
|
colander.Boolean(), |
142
|
|
|
missing=colander.drop |
143
|
|
|
) |
144
|
|
|
|
145
|
|
|
def preparer(self, concept): |
146
|
|
|
return concept |
147
|
|
|
|
148
|
|
|
|
149
|
|
|
class ConceptScheme(colander.MappingSchema): |
150
|
|
|
labels = Labels(missing=[]) |
151
|
|
|
notes = Notes(missing=[]) |
152
|
|
|
sources = Sources(missing=[]) |
153
|
|
|
|
154
|
|
|
|
155
|
|
|
class LanguageTag(colander.MappingSchema): |
156
|
|
|
id = colander.SchemaNode( |
157
|
|
|
colander.String() |
158
|
|
|
) |
159
|
|
|
name = colander.SchemaNode( |
160
|
|
|
colander.String() |
161
|
|
|
) |
162
|
|
|
|
163
|
|
|
|
164
|
|
|
def concept_schema_validator(node, cstruct): |
165
|
|
|
""" |
166
|
|
|
This validator validates an incoming concept or collection |
167
|
|
|
|
168
|
|
|
This validator will run a list of rules against the concept or collection |
169
|
|
|
to see that there are no validation rules being broken. |
170
|
|
|
|
171
|
|
|
:param colander.SchemaNode node: The schema that's being used while validating. |
172
|
|
|
:param cstruct: The concept or collection being validated. |
173
|
|
|
""" |
174
|
|
|
request = node.bindings['request'] |
175
|
|
|
validate_id_generation = node.bindings['validate_id_generation'] |
176
|
|
|
skos_manager = request.data_managers['skos_manager'] |
177
|
|
|
languages_manager = request.data_managers['languages_manager'] |
178
|
|
|
provider = node.bindings['provider'] |
179
|
|
|
conceptscheme_id = provider.conceptscheme_id |
180
|
|
|
concept_type = cstruct['type'] |
181
|
|
|
collection_id = cstruct['id'] |
182
|
|
|
narrower = None |
183
|
|
|
broader = None |
184
|
|
|
related = None |
185
|
|
|
members = None |
186
|
|
|
member_of = None |
187
|
|
|
r_validated = False |
188
|
|
|
n_validated = False |
189
|
|
|
b_validated = False |
190
|
|
|
m_validated = False |
191
|
|
|
o_validated = False |
192
|
|
|
errors = [] |
193
|
|
|
min_labels_rule(errors, node, cstruct) |
194
|
|
|
if 'labels' in cstruct: |
195
|
|
|
labels = copy.deepcopy(cstruct['labels']) |
196
|
|
|
label_type_rule(errors, node, skos_manager, labels) |
197
|
|
|
label_lang_rule(errors, node, languages_manager, labels) |
198
|
|
|
max_preflabels_rule(errors, node, labels) |
199
|
|
|
if 'related' in cstruct: |
200
|
|
|
related = copy.deepcopy(cstruct['related']) |
201
|
|
|
related = [m['id'] for m in related] |
202
|
|
|
r_validated = semantic_relations_rule(errors, node['related'], skos_manager, |
203
|
|
|
conceptscheme_id, related, collection_id) |
204
|
|
|
concept_relations_rule(errors, node['related'], related, concept_type) |
205
|
|
|
if 'narrower' in cstruct: |
206
|
|
|
narrower = copy.deepcopy(cstruct['narrower']) |
207
|
|
|
narrower = [m['id'] for m in narrower] |
208
|
|
|
n_validated = semantic_relations_rule(errors, node['narrower'], skos_manager, |
209
|
|
|
conceptscheme_id, narrower, collection_id) |
210
|
|
|
concept_relations_rule(errors, node['narrower'], narrower, concept_type) |
211
|
|
|
if 'broader' in cstruct: |
212
|
|
|
broader = copy.deepcopy(cstruct['broader']) |
213
|
|
|
broader = [m['id'] for m in broader] |
214
|
|
|
b_validated = semantic_relations_rule(errors, node['broader'], skos_manager, |
215
|
|
|
conceptscheme_id, broader, collection_id) |
216
|
|
|
concept_relations_rule(errors, node['broader'], broader, concept_type) |
217
|
|
|
if 'members' in cstruct: |
218
|
|
|
members = copy.deepcopy(cstruct['members']) |
219
|
|
|
members = [m['id'] for m in members] |
220
|
|
|
m_validated = semantic_relations_rule(errors, node['members'], skos_manager, |
221
|
|
|
conceptscheme_id, members, collection_id) |
222
|
|
|
if 'member_of' in cstruct: |
223
|
|
|
member_of = copy.deepcopy(cstruct['member_of']) |
224
|
|
|
member_of = [m['id'] for m in member_of] |
225
|
|
|
o_validated = semantic_relations_rule(errors, node['member_of'], skos_manager, |
226
|
|
|
conceptscheme_id, member_of, collection_id) |
227
|
|
|
if r_validated and n_validated and b_validated: |
228
|
|
|
concept_type_rule(errors, node['narrower'], skos_manager, conceptscheme_id, narrower) |
229
|
|
|
narrower_hierarchy_rule(errors, node['narrower'], skos_manager, conceptscheme_id, cstruct) |
230
|
|
|
concept_type_rule(errors, node['broader'], skos_manager, conceptscheme_id, broader) |
231
|
|
|
broader_hierarchy_rule(errors, node['broader'], skos_manager, conceptscheme_id, cstruct) |
232
|
|
|
concept_type_rule(errors, node['related'], skos_manager, conceptscheme_id, related) |
233
|
|
|
|
234
|
|
|
if m_validated and o_validated: |
235
|
|
|
members_only_in_collection_rule(errors, node['members'], concept_type, members) |
236
|
|
|
collection_members_unique_rule(errors, node['members'], members) |
237
|
|
|
collection_type_rule(errors, node['member_of'], skos_manager, conceptscheme_id, member_of) |
238
|
|
|
memberof_hierarchy_rule(errors, node['member_of'], skos_manager, conceptscheme_id, cstruct) |
239
|
|
|
members_hierarchy_rule(errors, node['members'], skos_manager, conceptscheme_id, cstruct) |
240
|
|
|
|
241
|
|
|
if 'matches' in cstruct: |
242
|
|
|
matches = copy.deepcopy(cstruct['matches']) |
243
|
|
|
concept_matches_rule(errors, node['matches'], matches, concept_type) |
244
|
|
|
concept_matches_unique_rule(errors, node['matches'], matches) |
245
|
|
|
|
246
|
|
|
if 'subordinate_arrays' in cstruct: |
247
|
|
|
subordinate_arrays = copy.deepcopy(cstruct['subordinate_arrays']) |
248
|
|
|
subordinate_arrays = [m['id'] for m in subordinate_arrays] |
249
|
|
|
subordinate_arrays_only_in_concept_rule(errors, node['subordinate_arrays'], concept_type, subordinate_arrays) |
250
|
|
|
subordinate_arrays_type_rule(errors, node['subordinate_arrays'], skos_manager, conceptscheme_id, |
251
|
|
|
subordinate_arrays) |
252
|
|
|
subordinate_arrays_hierarchy_rule(errors, node['subordinate_arrays'], skos_manager, conceptscheme_id, cstruct) |
253
|
|
|
|
254
|
|
|
if 'superordinates' in cstruct: |
255
|
|
|
superordinates = copy.deepcopy(cstruct['superordinates']) |
256
|
|
|
superordinates = [m['id'] for m in superordinates] |
257
|
|
|
superordinates_only_in_concept_rule(errors, node['superordinates'], concept_type, superordinates) |
258
|
|
|
superordinates_type_rule(errors, node['superordinates'], skos_manager, conceptscheme_id, superordinates) |
259
|
|
|
superordinates_hierarchy_rule(errors, node['superordinates'], skos_manager, conceptscheme_id, cstruct) |
260
|
|
|
|
261
|
|
|
if cstruct['type'] == 'concept' and 'infer_concept_relations' in cstruct: |
262
|
|
|
msg = "'infer_concept_relations' can only be set for collections." |
263
|
|
|
errors.append(colander.Invalid(node['infer_concept_relations'], msg=msg)) |
264
|
|
|
|
265
|
|
|
if validate_id_generation: |
266
|
|
|
id_generation_strategy = provider.metadata.get( |
267
|
|
|
"atramhasis.id_generation_strategy", IDGenerationStrategy.NUMERIC |
268
|
|
|
) |
269
|
|
|
if id_generation_strategy == IDGenerationStrategy.MANUAL: |
270
|
|
|
if not cstruct.get("id"): |
271
|
|
|
msg = "Required for this provider." |
272
|
|
|
errors.append(colander.Invalid(node["id"], msg=msg)) |
273
|
|
|
else: |
274
|
|
|
try: |
275
|
|
|
skos_manager.get_thing(cstruct["id"], conceptscheme_id) |
276
|
|
|
except NoResultFound: |
277
|
|
|
# this is desired |
278
|
|
|
pass |
279
|
|
|
else: |
280
|
|
|
msg = f"{cstruct['id']} already exists." |
281
|
|
|
errors.append(colander.Invalid(node["id"], msg=msg)) |
282
|
|
|
|
283
|
|
|
if len(errors) > 0: |
284
|
|
|
raise ValidationError( |
285
|
|
|
'Concept could not be validated', |
286
|
|
|
[e.asdict() for e in errors] |
287
|
|
|
) |
288
|
|
|
|
289
|
|
|
|
290
|
|
|
def conceptscheme_schema_validator(node, cstruct): |
291
|
|
|
""" |
292
|
|
|
This validator validates the incoming conceptscheme labels |
293
|
|
|
|
294
|
|
|
:param colander.SchemaNode node: The schema that's being used while validating. |
295
|
|
|
:param cstruct: The conceptscheme being validated. |
296
|
|
|
""" |
297
|
|
|
request = node.bindings['request'] |
298
|
|
|
skos_manager = request.data_managers['skos_manager'] |
299
|
|
|
languages_manager = request.data_managers['languages_manager'] |
300
|
|
|
errors = [] |
301
|
|
|
min_labels_rule(errors, node, cstruct) |
302
|
|
|
if 'labels' in cstruct: |
303
|
|
|
labels = copy.deepcopy(cstruct['labels']) |
304
|
|
|
label_type_rule(errors, node, skos_manager, labels) |
305
|
|
|
label_lang_rule(errors, node, languages_manager, labels) |
306
|
|
|
max_preflabels_rule(errors, node, labels) |
307
|
|
|
if len(errors) > 0: |
308
|
|
|
raise ValidationError( |
309
|
|
|
'ConceptScheme could not be validated', |
310
|
|
|
[e.asdict() for e in errors] |
311
|
|
|
) |
312
|
|
|
|
313
|
|
|
|
314
|
|
|
def concept_relations_rule(errors, node_location, relations, concept_type): |
315
|
|
|
""" |
316
|
|
|
Checks that only concepts have narrower, broader and related relations. |
317
|
|
|
""" |
318
|
|
|
if relations is not None and len(relations) > 0 and concept_type != 'concept': |
319
|
|
|
errors.append(colander.Invalid( |
320
|
|
|
node_location, |
321
|
|
|
'Only concepts can have narrower/broader/related relations' |
322
|
|
|
)) |
323
|
|
|
|
324
|
|
|
|
325
|
|
|
def max_preflabels_rule(errors, node, labels): |
326
|
|
|
""" |
327
|
|
|
Checks that there's only one prefLabel for a certain language. |
328
|
|
|
""" |
329
|
|
|
preflabel_found = [] |
330
|
|
|
for label in labels: |
331
|
|
|
if label['type'] == 'prefLabel': |
332
|
|
|
if label['language'] in preflabel_found: |
333
|
|
|
errors.append(colander.Invalid( |
334
|
|
|
node['labels'], |
335
|
|
|
'Only one prefLabel per language allowed.' |
336
|
|
|
)) |
337
|
|
|
else: |
338
|
|
|
preflabel_found.append(label['language']) |
339
|
|
|
|
340
|
|
|
|
341
|
|
|
def min_labels_rule(errors, node, cstruct): |
342
|
|
|
""" |
343
|
|
|
Checks that a label or collection always has a least one label. |
344
|
|
|
""" |
345
|
|
|
if 'labels' in cstruct: |
346
|
|
|
labels = copy.deepcopy(cstruct['labels']) |
347
|
|
|
if len(labels) == 0: |
348
|
|
|
errors.append(colander.Invalid( |
349
|
|
|
node['labels'], |
350
|
|
|
'At least one label is necessary' |
351
|
|
|
)) |
352
|
|
|
|
353
|
|
|
|
354
|
|
|
def label_type_rule(errors, node, skos_manager, labels): |
355
|
|
|
""" |
356
|
|
|
Checks that a label has the correct type. |
357
|
|
|
""" |
358
|
|
|
label_types = skos_manager.get_all_label_types() |
359
|
|
|
label_types = [label_type.name for label_type in label_types] |
360
|
|
|
for label in labels: |
361
|
|
|
if label['type'] not in label_types: |
362
|
|
|
errors.append(colander.Invalid( |
363
|
|
|
node['labels'], |
364
|
|
|
'Invalid labeltype.' |
365
|
|
|
)) |
366
|
|
|
|
367
|
|
|
|
368
|
|
|
def label_lang_rule(errors, node, languages_manager, labels): |
369
|
|
|
""" |
370
|
|
|
Checks that languages of a label are valid. |
371
|
|
|
|
372
|
|
|
Checks that they are valid IANA language tags. If the language tag was not |
373
|
|
|
already present in the database, it adds them. |
374
|
|
|
""" |
375
|
|
|
for label in labels: |
376
|
|
|
language_tag = label['language'] |
377
|
|
|
if not tags.check(language_tag): |
378
|
|
|
errors.append(colander.Invalid( |
379
|
|
|
node['labels'], |
380
|
|
|
'Invalid language tag: %s' % ", ".join([err.message for err in tags.tag(language_tag).errors]) |
381
|
|
|
)) |
382
|
|
|
else: |
383
|
|
|
languages_present = languages_manager.count_languages(language_tag) |
384
|
|
|
if not languages_present: |
385
|
|
|
descriptions = ', '.join(tags.description(language_tag)) |
386
|
|
|
language_item = Language(id=language_tag, name=descriptions) |
387
|
|
|
languages_manager.save(language_item) |
388
|
|
|
|
389
|
|
|
|
390
|
|
|
def concept_type_rule(errors, node_location, skos_manager, conceptscheme_id, items): |
391
|
|
|
""" |
392
|
|
|
Checks that the targets of narrower, broader and related are concepts and |
393
|
|
|
not collections. |
394
|
|
|
""" |
395
|
|
|
for item_concept_id in items: |
396
|
|
|
item_concept = skos_manager.get_thing(item_concept_id, conceptscheme_id) |
397
|
|
|
if item_concept.type != 'concept': |
398
|
|
|
errors.append(colander.Invalid( |
399
|
|
|
node_location, |
400
|
|
|
'A narrower, broader or related concept should always be a concept, not a collection' |
401
|
|
|
)) |
402
|
|
|
|
403
|
|
|
|
404
|
|
|
def collection_type_rule(errors, node_location, skos_manager, conceptscheme_id, members): |
405
|
|
|
""" |
406
|
|
|
Checks that the targets of member_of are collections and not concepts. |
407
|
|
|
""" |
408
|
|
|
for member_collection_id in members: |
409
|
|
|
member_collection = skos_manager.get_thing(member_collection_id, conceptscheme_id) |
410
|
|
|
if member_collection.type != 'collection': |
411
|
|
|
errors.append(colander.Invalid( |
412
|
|
|
node_location, |
413
|
|
|
'A member_of parent should always be a collection' |
414
|
|
|
)) |
415
|
|
|
|
416
|
|
|
|
417
|
|
|
def semantic_relations_rule(errors, node_location, skos_manager, conceptscheme_id, members, collection_id): |
418
|
|
|
""" |
419
|
|
|
Checks that the elements in a group of concepts or collections are not the |
420
|
|
|
the group itself, that they actually exist and are within |
421
|
|
|
the same conceptscheme. |
422
|
|
|
""" |
423
|
|
|
for member_concept_id in members: |
424
|
|
|
if member_concept_id == collection_id: |
425
|
|
|
errors.append(colander.Invalid( |
426
|
|
|
node_location, |
427
|
|
|
'A concept or collection cannot be related to itself' |
428
|
|
|
)) |
429
|
|
|
return False |
430
|
|
|
try: |
431
|
|
|
skos_manager.get_thing(member_concept_id, conceptscheme_id) |
432
|
|
|
except NoResultFound: |
433
|
|
|
errors.append(colander.Invalid( |
434
|
|
|
node_location, |
435
|
|
|
'Concept not found, check concept_id. Please be aware members should be within one scheme' |
436
|
|
|
)) |
437
|
|
|
return False |
438
|
|
|
return True |
439
|
|
|
|
440
|
|
|
|
441
|
|
|
def hierarchy_build(skos_manager, conceptscheme_id, property_list, property_hierarchy, property_concept_type, |
442
|
|
|
property_list_name): |
443
|
|
|
for property_concept_id in property_list: |
444
|
|
|
try: |
445
|
|
|
property_concept = skos_manager.get_thing(property_concept_id, conceptscheme_id) |
446
|
|
|
except NoResultFound: |
447
|
|
|
property_concept = None |
448
|
|
|
if property_concept is not None and ( |
449
|
|
|
property_concept.type == property_concept_type or property_concept_type is None): |
450
|
|
|
property_concepts = [n.concept_id for n in getattr(property_concept, property_list_name)] |
451
|
|
|
for members_id in property_concepts: |
452
|
|
|
property_hierarchy.append(members_id) |
453
|
|
|
hierarchy_build(skos_manager, conceptscheme_id, property_concepts, property_hierarchy, |
454
|
|
|
property_concept_type, property_list_name) |
455
|
|
|
|
456
|
|
|
|
457
|
|
|
def hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, property1, property2, |
458
|
|
|
property2_list_name, concept_type, error_message): |
459
|
|
|
""" |
460
|
|
|
Checks that the property1 of a concept are not already in property2 hierarchy |
461
|
|
|
|
462
|
|
|
""" |
463
|
|
|
property2_hierarchy = [] |
464
|
|
|
property1_list = [] |
465
|
|
|
if property1 in cstruct: |
466
|
|
|
property1_value = copy.deepcopy(cstruct[property1]) |
467
|
|
|
property1_list = [m['id'] for m in property1_value] |
468
|
|
|
if property2 in cstruct: |
469
|
|
|
property2_value = copy.deepcopy(cstruct[property2]) |
470
|
|
|
property2_list = [m['id'] for m in property2_value] |
471
|
|
|
property2_hierarchy = property2_list |
472
|
|
|
hierarchy_build(skos_manager, conceptscheme_id, property2_list, property2_hierarchy, concept_type, |
473
|
|
|
property2_list_name) |
474
|
|
|
for broader_concept_id in property1_list: |
475
|
|
|
if broader_concept_id in property2_hierarchy: |
476
|
|
|
errors.append(colander.Invalid( |
477
|
|
|
node_location, |
478
|
|
|
error_message |
479
|
|
|
)) |
480
|
|
|
|
481
|
|
|
|
482
|
|
|
def broader_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct): |
483
|
|
|
""" |
484
|
|
|
Checks that the broader concepts of a concepts are not alreadt narrower |
485
|
|
|
concepts of that concept. |
486
|
|
|
""" |
487
|
|
|
hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'broader', 'narrower', |
488
|
|
|
'narrower_concepts', 'concept', |
489
|
|
|
'The broader concept of a concept must not itself be a narrower concept of the concept being edited.' |
490
|
|
|
) |
491
|
|
|
|
492
|
|
|
|
493
|
|
|
def narrower_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct): |
494
|
|
|
""" |
495
|
|
|
Checks that the narrower concepts of a concept are not already broader |
496
|
|
|
concepts of that concept. |
497
|
|
|
""" |
498
|
|
|
hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'narrower', 'broader', |
499
|
|
|
'broader_concepts', 'concept', |
500
|
|
|
'The narrower concept of a concept must not itself be a broader concept of the concept being edited.' |
501
|
|
|
) |
502
|
|
|
|
503
|
|
|
|
504
|
|
|
def collection_members_unique_rule(errors, node_location, members): |
505
|
|
|
""" |
506
|
|
|
Checks that a collection has no duplicate members. |
507
|
|
|
""" |
508
|
|
|
if len(members) > len(set(members)): |
509
|
|
|
errors.append(colander.Invalid( |
510
|
|
|
node_location, |
511
|
|
|
'All members of a collection should be unique.' |
512
|
|
|
)) |
513
|
|
|
|
514
|
|
|
|
515
|
|
|
def members_only_in_collection_rule(errors, node, concept_type, members): |
516
|
|
|
""" |
517
|
|
|
Checks that only collections have members. |
518
|
|
|
""" |
519
|
|
|
if concept_type != 'collection' and len(members) > 0: |
520
|
|
|
errors.append(colander.Invalid( |
521
|
|
|
node, |
522
|
|
|
'Only collections can have members.' |
523
|
|
|
)) |
524
|
|
|
|
525
|
|
|
|
526
|
|
|
def memberof_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct): |
527
|
|
|
hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'member_of', 'members', |
528
|
|
|
'members', 'collection', |
529
|
|
|
'The parent member_of collection of a concept must not itself be a member of the concept being edited.' |
530
|
|
|
) |
531
|
|
|
|
532
|
|
|
|
533
|
|
|
def members_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct): |
534
|
|
|
""" |
535
|
|
|
Checks that a collection does not have members that are in themselves |
536
|
|
|
already "parents" of that collection. |
537
|
|
|
""" |
538
|
|
|
hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'members', 'member_of', |
539
|
|
|
'member_of', 'collection', |
540
|
|
|
'The item of a members collection must not itself be a parent of the concept/collection being edited.' |
541
|
|
|
) |
542
|
|
|
|
543
|
|
|
|
544
|
|
|
def concept_matches_rule(errors, node_location, matches, concept_type): |
545
|
|
|
""" |
546
|
|
|
Checks that only concepts have matches. |
547
|
|
|
""" |
548
|
|
|
if matches is not None and len(matches) > 0 and concept_type != 'concept': |
549
|
|
|
errors.append(colander.Invalid( |
550
|
|
|
node_location, |
551
|
|
|
'Only concepts can have matches' |
552
|
|
|
)) |
553
|
|
|
|
554
|
|
|
|
555
|
|
|
def concept_matches_unique_rule(errors, node_location, matches): |
556
|
|
|
""" |
557
|
|
|
Checks that a concept has not duplicate matches. |
558
|
|
|
|
559
|
|
|
This means that a concept can only have one match (no matter what the type) |
560
|
|
|
with another concept. We don't allow eg. a concept that has both a broadMatch |
561
|
|
|
and a relatedMatch with the same concept. |
562
|
|
|
""" |
563
|
|
|
if matches is not None: |
564
|
|
|
uri_list = [] |
565
|
|
|
for matchtype in matches: |
566
|
|
|
uri_list.extend([uri for uri in matches[matchtype]]) |
567
|
|
|
if len(uri_list) > len(set(uri_list)): |
568
|
|
|
errors.append(colander.Invalid( |
569
|
|
|
node_location, |
570
|
|
|
'All matches of a concept should be unique.' |
571
|
|
|
)) |
572
|
|
|
|
573
|
|
|
|
574
|
|
|
def languagetag_validator(node, cstruct): |
575
|
|
|
""" |
576
|
|
|
This validator validates a languagetag. |
577
|
|
|
|
578
|
|
|
The validator will check if a tag is a valid IANA language tag. The the |
579
|
|
|
validator is informed that this should be a new language tag, it will also |
580
|
|
|
check if the tag doesn't already exist. |
581
|
|
|
|
582
|
|
|
:param colander.SchemaNode node: The schema that's being used while validating. |
583
|
|
|
:param cstruct: The value being validated. |
584
|
|
|
""" |
585
|
|
|
request = node.bindings['request'] |
586
|
|
|
languages_manager = request.data_managers['languages_manager'] |
587
|
|
|
new = node.bindings['new'] |
588
|
|
|
errors = [] |
589
|
|
|
language_tag = cstruct['id'] |
590
|
|
|
|
591
|
|
|
if new: |
592
|
|
|
languagetag_checkduplicate(node['id'], language_tag, languages_manager, errors) |
593
|
|
|
languagetag_isvalid_rule(node['id'], language_tag, errors) |
594
|
|
|
|
595
|
|
|
if len(errors) > 0: |
596
|
|
|
raise ValidationError( |
597
|
|
|
'Language could not be validated', |
598
|
|
|
[e.asdict() for e in errors] |
599
|
|
|
) |
600
|
|
|
|
601
|
|
|
|
602
|
|
|
def languagetag_isvalid_rule(node, language_tag, errors): |
603
|
|
|
""" |
604
|
|
|
Check that a languagetag is a valid IANA language tag. |
605
|
|
|
""" |
606
|
|
|
if not tags.check(language_tag): |
607
|
|
|
errors.append(colander.Invalid( |
608
|
|
|
node, |
609
|
|
|
'Invalid language tag: %s' % ", ".join([err.message for err in tags.tag(language_tag).errors]) |
610
|
|
|
)) |
611
|
|
|
|
612
|
|
|
|
613
|
|
|
def languagetag_checkduplicate(node, language_tag, languages_manager, errors): |
614
|
|
|
""" |
615
|
|
|
Check that a languagetag isn't duplicated. |
616
|
|
|
""" |
617
|
|
|
language_present = languages_manager.count_languages(language_tag) |
618
|
|
|
if language_present: |
619
|
|
|
errors.append(colander.Invalid( |
620
|
|
|
node, |
621
|
|
|
'Duplicate language tag: %s' % language_tag) |
622
|
|
|
) |
623
|
|
|
|
624
|
|
|
|
625
|
|
|
def subordinate_arrays_only_in_concept_rule(errors, node, concept_type, subordinate_arrays): |
626
|
|
|
""" |
627
|
|
|
Checks that only a concept has subordinate arrays. |
628
|
|
|
""" |
629
|
|
|
if concept_type != 'concept' and len(subordinate_arrays) > 0: |
630
|
|
|
errors.append(colander.Invalid( |
631
|
|
|
node, |
632
|
|
|
'Only concept can have subordinate arrays.' |
633
|
|
|
)) |
634
|
|
|
|
635
|
|
|
|
636
|
|
|
def subordinate_arrays_type_rule(errors, node_location, skos_manager, conceptscheme_id, subordinate_arrays): |
637
|
|
|
""" |
638
|
|
|
Checks that subordinate arrays are always collections. |
639
|
|
|
""" |
640
|
|
|
for subordinate_id in subordinate_arrays: |
641
|
|
|
subordinate = skos_manager.get_thing(subordinate_id, conceptscheme_id) |
642
|
|
|
if subordinate.type != 'collection': |
643
|
|
|
errors.append(colander.Invalid( |
644
|
|
|
node_location, |
645
|
|
|
'A subordinate array should always be a collection' |
646
|
|
|
)) |
647
|
|
|
|
648
|
|
|
|
649
|
|
|
def subordinate_arrays_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct): |
650
|
|
|
""" |
651
|
|
|
Checks that the subordinate arrays of a concept are not themselves |
652
|
|
|
parents of that concept. |
653
|
|
|
""" |
654
|
|
|
hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'subordinate_arrays', 'member_of', |
655
|
|
|
'members', 'collection', |
656
|
|
|
'The subordinate_array collection of a concept must not itself be a parent of the concept being edited.' |
657
|
|
|
) |
658
|
|
|
|
659
|
|
|
|
660
|
|
|
def superordinates_only_in_concept_rule(errors, node, concept_type, superordinates): |
661
|
|
|
""" |
662
|
|
|
Checks that only collections have superordinates. |
663
|
|
|
""" |
664
|
|
|
if concept_type != 'collection' and len(superordinates) > 0: |
665
|
|
|
errors.append(colander.Invalid( |
666
|
|
|
node, |
667
|
|
|
'Only collection can have superordinates.' |
668
|
|
|
)) |
669
|
|
|
|
670
|
|
|
|
671
|
|
|
def superordinates_type_rule(errors, node_location, skos_manager, conceptscheme_id, superordinates): |
672
|
|
|
""" |
673
|
|
|
Checks that superordinates are always concepts. |
674
|
|
|
""" |
675
|
|
|
for superordinate_id in superordinates: |
676
|
|
|
superordinate = skos_manager.get_thing(superordinate_id, conceptscheme_id) |
677
|
|
|
if superordinate.type != 'concept': |
678
|
|
|
errors.append(colander.Invalid( |
679
|
|
|
node_location, |
680
|
|
|
'A superordinate should always be a concept' |
681
|
|
|
)) |
682
|
|
|
|
683
|
|
|
|
684
|
|
|
def superordinates_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct): |
685
|
|
|
""" |
686
|
|
|
Checks that the superordinate concepts of a collection are not themselves |
687
|
|
|
members of that collection. |
688
|
|
|
""" |
689
|
|
|
hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'superordinates', 'members', |
690
|
|
|
'members', 'collection', |
691
|
|
|
'The superordinates of a collection must not itself be a member of the collection being edited.' |
692
|
|
|
) |
693
|
|
|
|
694
|
|
|
|
695
|
|
|
def validate_provider_json(json_data): |
696
|
|
|
errors = [] |
697
|
|
|
|
698
|
|
|
# Do not allow keys in the metadata which exist in the root of the json. |
699
|
|
|
forbidden_metadata_keys = ( |
700
|
|
|
'default_language', |
701
|
|
|
'subject', |
702
|
|
|
'force_display_language', # while not the same, this would just be confusing |
703
|
|
|
'atramhasis.force_display_language', |
704
|
|
|
'id_generation_strategy', # while not the same, this would just be confusing |
705
|
|
|
'atramhasis.id_generation_strategy', |
706
|
|
|
) |
707
|
|
|
metadata = json_data.get('metadata', {}) |
708
|
|
|
if wrong_keys := [k for k in forbidden_metadata_keys if k in metadata]: |
709
|
|
|
errors.append({'metadata': f'Found disallowed key(s): {", ".join(wrong_keys)}.'}) |
710
|
|
|
|
711
|
|
|
if json_data.get('default_language'): |
712
|
|
|
if not tags.check(json_data['default_language']): |
713
|
|
|
errors.append({'default_language': 'Invalid language.'}) |
714
|
|
|
|
715
|
|
|
if json_data.get('force_display_language'): |
716
|
|
|
if not tags.check(json_data['force_display_language']): |
717
|
|
|
errors.append({'force_display_language': 'Invalid language.'}) |
718
|
|
|
|
719
|
|
|
if json_data.get('subject'): |
720
|
|
|
if json_data['subject'] != ['hidden']: |
721
|
|
|
errors.append({'subject': 'Subject must be one of: "hidden"'}) |
722
|
|
|
|
723
|
|
|
if errors: |
724
|
|
|
raise ValidationError('Provider could not be validated.', errors) |
725
|
|
|
|