Completed
Pull Request — master (#268)
by Bart
01:11
created

memberof_hierarchy_rule()   A

Complexity

Conditions 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 4
rs 10
1
# -*- coding: utf-8 -*-
2
"""
3
Module that validates incoming JSON.
4
"""
5
6
import copy
7
8
import colander
9
from language_tags import tags
10
from skosprovider_sqlalchemy.models import (
11
    Language
12
)
13
from sqlalchemy.orm.exc import NoResultFound
14
15
from atramhasis.errors import ValidationError
16
17
18
class Label(colander.MappingSchema):
19
    label = colander.SchemaNode(
20
        colander.String()
21
    )
22
    type = colander.SchemaNode(
23
        colander.String()
24
    )
25
    language = colander.SchemaNode(
26
        colander.String()
27
    )
28
29
30
class Note(colander.MappingSchema):
31
    note = colander.SchemaNode(
32
        colander.String()
33
    )
34
    type = colander.SchemaNode(
35
        colander.String()
36
    )
37
    language = colander.SchemaNode(
38
        colander.String()
39
    )
40
41
42
class Source(colander.MappingSchema):
43
    citation = colander.SchemaNode(
44
        colander.String()
45
    )
46
47
48
class Labels(colander.SequenceSchema):
49
    label = Label()
50
51
52
class Notes(colander.SequenceSchema):
53
    note = Note()
54
55
56
class Sources(colander.SequenceSchema):
57
    source = Source()
58
59
60
class RelatedConcept(colander.MappingSchema):
61
    id = colander.SchemaNode(
62
        colander.Int()
63
    )
64
65
66
class Concepts(colander.SequenceSchema):
67
    concept = RelatedConcept()
68
69
70
class MatchList(colander.SequenceSchema):
71
    match = colander.SchemaNode(
72
        colander.String(),
73
        missing=None
74
    )
75
76
77
class Matches(colander.MappingSchema):
78
    broad = MatchList(missing=[])
79
    close = MatchList(missing=[])
80
    exact = MatchList(missing=[])
81
    narrow = MatchList(missing=[])
82
    related = MatchList(missing=[])
83
84
85
class Concept(colander.MappingSchema):
86
    id = colander.SchemaNode(
87
        colander.Int(),
88
        missing=None
89
    )
90
    type = colander.SchemaNode(
91
        colander.String(),
92
        missing='concept'
93
    )
94
    labels = Labels(missing=[])
95
    notes = Notes(missing=[])
96
    sources = Sources(missing=[])
97
    broader = Concepts(missing=[])
98
    narrower = Concepts(missing=[])
99
    related = Concepts(missing=[])
100
    members = Concepts(missing=[])
101
    member_of = Concepts(missing=[])
102
    subordinate_arrays = Concepts(missing=[])
103
    superordinates = Concepts(missing=[])
104
    matches = Matches(missing={})
105
106
107
class ConceptScheme(colander.MappingSchema):
108
    labels = Labels(missing=[])
109
    notes = Notes(missing=[])
110
    sources = Sources(missing=[])
111
112
113
class LanguageTag(colander.MappingSchema):
114
    id = colander.SchemaNode(
115
        colander.String()
116
    )
117
    name = colander.SchemaNode(
118
        colander.String()
119
    )
120
121
122
def concept_schema_validator(node, cstruct):
123
    """
124
    This validator validates an incoming concept or collection
125
126
    This validator will run a list of rules against the concept or collection
127
    to see that there are no validation rules being broken.
128
129
    :param colander.SchemaNode node: The schema that's being used while validating.
130
    :param cstruct: The concept or collection being validated.
131
    """
132
    request = node.bindings['request']
133
    skos_manager = request.data_managers['skos_manager']
134
    languages_manager = request.data_managers['languages_manager']
135
    conceptscheme_id = node.bindings['conceptscheme_id']
136
    concept_type = cstruct['type']
137
    id = cstruct['id']
138
    narrower = None
139
    broader = None
140
    related = None
141
    members = None
142
    member_of = None
143
    r_validated = False
144
    n_validated = False
145
    b_validated = False
146
    m_validated = False
147
    o_validated = False
148
    errors = []
149
    min_labels_rule(errors, node, cstruct)
150
    if 'labels' in cstruct:
151
        labels = copy.deepcopy(cstruct['labels'])
152
        label_type_rule(errors, node, skos_manager, labels)
153
        label_lang_rule(errors, node, languages_manager, labels)
154
        max_preflabels_rule(errors, node, labels)
155
    if 'related' in cstruct:
156
        related = copy.deepcopy(cstruct['related'])
157
        related = [m['id'] for m in related]
158
        r_validated = semantic_relations_rule(errors, node['related'], skos_manager,
159
                                              conceptscheme_id, related, id)
160
        concept_relations_rule(errors, node['related'], related, concept_type)
161
    if 'narrower' in cstruct:
162
        narrower = copy.deepcopy(cstruct['narrower'])
163
        narrower = [m['id'] for m in narrower]
164
        n_validated = semantic_relations_rule(errors, node['narrower'], skos_manager,
165
                                              conceptscheme_id, narrower, id)
166
        concept_relations_rule(errors, node['narrower'], narrower, concept_type)
167
    if 'broader' in cstruct:
168
        broader = copy.deepcopy(cstruct['broader'])
169
        broader = [m['id'] for m in broader]
170
        b_validated = semantic_relations_rule(errors, node['broader'], skos_manager,
171
                                              conceptscheme_id, broader, id)
172
        concept_relations_rule(errors, node['broader'], broader, concept_type)
173
    if 'members' in cstruct:
174
        members = copy.deepcopy(cstruct['members'])
175
        members = [m['id'] for m in members]
176
        m_validated = semantic_relations_rule(errors, node['members'], skos_manager,
177
                                              conceptscheme_id, members, id)
178
    if 'member_of' in cstruct:
179
        member_of = copy.deepcopy(cstruct['member_of'])
180
        member_of = [m['id'] for m in member_of]
181
        o_validated = semantic_relations_rule(errors, node['member_of'], skos_manager,
182
                                              conceptscheme_id, member_of, id)
183
    if r_validated and n_validated and b_validated:
184
        concept_type_rule(errors, node['narrower'], skos_manager, conceptscheme_id, narrower)
185
        narrower_hierarchy_rule(errors, node['narrower'], skos_manager, conceptscheme_id, cstruct)
186
        concept_type_rule(errors, node['broader'], skos_manager, conceptscheme_id, broader)
187
        broader_hierarchy_rule(errors, node['broader'], skos_manager, conceptscheme_id, cstruct)
188
        concept_type_rule(errors, node['related'], skos_manager, conceptscheme_id, related)
189
190
    if m_validated and o_validated:
191
        members_only_in_collection_rule(errors, node['members'], concept_type, members)
192
        collection_members_unique_rule(errors, node['members'], members)
193
        collection_type_rule(errors, node['member_of'], skos_manager, conceptscheme_id, member_of)
194
        memberof_hierarchy_rule(errors, node['member_of'], skos_manager, conceptscheme_id, cstruct)
195
        members_hierarchy_rule(errors, node['members'], skos_manager, conceptscheme_id, cstruct)
196
197
    if 'matches' in cstruct:
198
        matches = copy.deepcopy(cstruct['matches'])
199
        concept_matches_rule(errors, node['matches'], matches, concept_type)
200
        concept_matches_unique_rule(errors, node['matches'], matches)
201
202
    if 'subordinate_arrays' in cstruct:
203
        subordinate_arrays = copy.deepcopy(cstruct['subordinate_arrays'])
204
        subordinate_arrays = [m['id'] for m in subordinate_arrays]
205
        subordinate_arrays_only_in_concept_rule(errors, node['subordinate_arrays'], concept_type, subordinate_arrays)
206
        subordinate_arrays_type_rule(errors, node['subordinate_arrays'], skos_manager, conceptscheme_id,
207
                                     subordinate_arrays)
208
        subordinate_arrays_hierarchy_rule(errors, node['subordinate_arrays'], skos_manager, conceptscheme_id, cstruct)
209
210
    if 'superordinates' in cstruct:
211
        superordinates = copy.deepcopy(cstruct['superordinates'])
212
        superordinates = [m['id'] for m in superordinates]
213
        superordinates_only_in_concept_rule(errors, node['superordinates'], concept_type, superordinates)
214
        superordinates_type_rule(errors, node['superordinates'], skos_manager, conceptscheme_id, superordinates)
215
        superordinates_hierarchy_rule(errors, node['superordinates'], skos_manager, conceptscheme_id, cstruct)
216
217
    if len(errors) > 0:
218
        raise ValidationError(
219
            'Concept could not be validated',
220
            [e.asdict() for e in errors]
221
        )
222
223
224
def conceptscheme_schema_validator(node, cstruct):
225
    """
226
    This validator validates the incoming conceptscheme labels
227
228
    :param colander.SchemaNode node: The schema that's being used while validating.
229
    :param cstruct: The conceptscheme being validated.
230
    """
231
    request = node.bindings['request']
232
    skos_manager = request.data_managers['skos_manager']
233
    languages_manager = request.data_managers['languages_manager']
234
    errors = []
235
    min_labels_rule(errors, node, cstruct)
236
    if 'labels' in cstruct:
237
        labels = copy.deepcopy(cstruct['labels'])
238
        label_type_rule(errors, node, skos_manager, labels)
239
        label_lang_rule(errors, node, languages_manager, labels)
240
        max_preflabels_rule(errors, node, labels)
241
    if len(errors) > 0:
242
        raise ValidationError(
243
            'ConceptScheme could not be validated',
244
            [e.asdict() for e in errors]
245
        )
246
247
248
def concept_relations_rule(errors, node_location, relations, concept_type):
249
    """
250
    Checks that only concepts have narrower, broader and related relations.
251
    """
252
    if relations is not None and len(relations) > 0 and concept_type != 'concept':
253
        errors.append(colander.Invalid(
254
            node_location,
255
            'Only concepts can have narrower/broader/related relations'
256
        ))
257
258
259
def max_preflabels_rule(errors, node, labels):
260
    """
261
    Checks that there's only one prefLabel for a certain language.
262
    """
263
    preflabel_found = []
264
    for label in labels:
265
        if label['type'] == 'prefLabel':
266
            if label['language'] in preflabel_found:
267
                errors.append(colander.Invalid(
268
                    node['labels'],
269
                    'Only one prefLabel per language allowed.'
270
                ))
271
            else:
272
                preflabel_found.append(label['language'])
273
274
275
def min_labels_rule(errors, node, cstruct):
276
    """
277
    Checks that a label or collection always has a least one label.
278
    """
279
    if 'labels' in cstruct:
280
        labels = copy.deepcopy(cstruct['labels'])
281
        if len(labels) == 0:
282
            errors.append(colander.Invalid(
283
                node['labels'],
284
                'At least one label is necessary'
285
            ))
286
287
288
def label_type_rule(errors, node, skos_manager, labels):
289
    """
290
    Checks that a label has the correct type.
291
    """
292
    label_types = skos_manager.get_all_label_types()
293
    label_types = [label_type.name for label_type in label_types]
294
    for label in labels:
295
        if label['type'] not in label_types:
296
            errors.append(colander.Invalid(
297
                node['labels'],
298
                'Invalid labeltype.'
299
            ))
300
301
302
def label_lang_rule(errors, node, languages_manager, labels):
303
    """
304
    Checks that languages of a label are valid.
305
306
    Checks that they are valid IANA language tags. If the language tag was not
307
    already present in the database, it adds them.
308
    """
309
    for label in labels:
310
        language_tag = label['language']
311
        if not tags.check(language_tag):
312
            errors.append(colander.Invalid(
313
                node['labels'],
314
                'Invalid language tag: %s' % ", ".join([err.message for err in tags.tag(language_tag).errors])
315
            ))
316
        else:
317
            languages_present = languages_manager.count_languages(language_tag)
318
            if not languages_present:
319
                descriptions = ', '.join(tags.description(language_tag))
320
                language_item = Language(id=language_tag, name=descriptions)
321
                languages_manager.save(language_item)
322
323
324
def concept_type_rule(errors, node_location, skos_manager, conceptscheme_id, items):
325
    """
326
    Checks that the targets of narrower, broader and related are concepts and
327
    not collections.
328
    """
329
    for item_concept_id in items:
330
        item_concept = skos_manager.get_thing(item_concept_id, conceptscheme_id)
331
        if item_concept.type != 'concept':
332
            errors.append(colander.Invalid(
333
                node_location,
334
                'A narrower, broader or related concept should always be a concept, not a collection'
335
            ))
336
337
338
def collection_type_rule(errors, node_location, skos_manager, conceptscheme_id, members):
339
    """
340
    Checks that the targets of member_of are collections and not concepts.
341
    """
342
    for member_collection_id in members:
343
        member_collection = skos_manager.get_thing(member_collection_id, conceptscheme_id)
344
        if member_collection.type != 'collection':
345
            errors.append(colander.Invalid(
346
                node_location,
347
                'A member_of parent should always be a collection'
348
            ))
349
350
351
def semantic_relations_rule(errors, node_location, skos_manager, conceptscheme_id, members, collection_id):
352
    """
353
    Checks that the elements in a group of concepts or collections are not the
354
    the group itself, that they actually exist and are within
355
    the same conceptscheme.
356
    """
357
    for member_concept_id in members:
358
        if member_concept_id == collection_id:
359
            errors.append(colander.Invalid(
360
                node_location,
361
                'A concept or collection cannot be related to itself'
362
            ))
363
            return False
364
        try:
365
            skos_manager.get_thing(member_concept_id, conceptscheme_id)
366
        except NoResultFound:
367
            errors.append(colander.Invalid(
368
                node_location,
369
                'Concept not found, check concept_id. Please be aware members should be within one scheme'
370
            ))
371
            return False
372
    return True
373
374
375
def hierarchy_build(skos_manager, conceptscheme_id, property_list, property_hierarchy, property_concept_type,
376
                    property_list_name):
377
    for property_concept_id in property_list:
378
        try:
379
            property_concept = skos_manager.get_thing(property_concept_id, conceptscheme_id)
380
        except NoResultFound:
381
            property_concept = None
382
        if property_concept is not None and (
383
                        property_concept.type == property_concept_type or property_concept_type is None):
384
            property_concepts = [n.concept_id for n in getattr(property_concept, property_list_name)]
385
            for members_id in property_concepts:
386
                property_hierarchy.append(members_id)
387
                hierarchy_build(skos_manager, conceptscheme_id, property_concepts, property_hierarchy,
388
                                property_concept_type, property_list_name)
389
390
391
def hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, property1, property2,
392
                   property2_list_name, concept_type, error_message):
393
    """
394
    Checks that the property1 of a concept are not already in property2 hierarchy
395
396
    """
397
    property2_hierarchy = []
398
    property1_list = []
399
    if property1 in cstruct:
400
        property1_value = copy.deepcopy(cstruct[property1])
401
        property1_list = [m['id'] for m in property1_value]
402
    if property2 in cstruct:
403
        property2_value = copy.deepcopy(cstruct[property2])
404
        property2_list = [m['id'] for m in property2_value]
405
        property2_hierarchy = property2_list
406
        hierarchy_build(skos_manager, conceptscheme_id, property2_list, property2_hierarchy, concept_type,
407
                        property2_list_name)
408
    for broader_concept_id in property1_list:
409
        if broader_concept_id in property2_hierarchy:
410
            errors.append(colander.Invalid(
411
                node_location,
412
                error_message
413
            ))
414
415
416
def broader_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
417
    """
418
    Checks that the broader concepts of a concepts are not alreadt narrower
419
    concepts of that concept.
420
    """
421
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'broader', 'narrower',
422
                   'narrower_concepts', 'concept',
423
                   'The broader concept of a concept must not itself be a narrower concept of the concept being edited.'
424
                   )
425
426
427
def narrower_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
428
    """
429
    Checks that the narrower concepts of a concept are not already broader
430
    concepts of that concept.
431
    """
432
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'narrower', 'broader',
433
                   'broader_concepts', 'concept',
434
                   'The narrower concept of a concept must not itself be a broader concept of the concept being edited.'
435
                   )
436
437
438
def collection_members_unique_rule(errors, node_location, members):
439
    """
440
    Checks that a collection has no duplicate members.
441
    """
442
    if len(members) > len(set(members)):
443
        errors.append(colander.Invalid(
444
            node_location,
445
            'All members of a collection should be unique.'
446
        ))
447
448
449
def members_only_in_collection_rule(errors, node, concept_type, members):
450
    """
451
    Checks that only collections have members.
452
    """
453
    if concept_type != 'collection' and len(members) > 0:
454
        errors.append(colander.Invalid(
455
            node,
456
            'Only collections can have members.'
457
        ))
458
459
460
def memberof_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
461
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'member_of', 'members',
462
                   'members', 'collection',
463
                   'The parent member_of collection of a concept must not itself be a member of the concept being edited.'
464
                   )
465
466
467
def members_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
468
    """
469
    Checks that a collection does not have members that are in themselves
470
    already "parents" of that collection.
471
    """
472
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'members', 'member_of',
473
                   'member_of', 'collection',
474
                   'The item of a members collection must not itself be a parent of the concept/collection being edited.'
475
                   )
476
477
478
def concept_matches_rule(errors, node_location, matches, concept_type):
479
    """
480
    Checks that only concepts have matches.
481
    """
482
    if matches is not None and len(matches) > 0 and concept_type != 'concept':
483
        errors.append(colander.Invalid(
484
            node_location,
485
            'Only concepts can have matches'
486
        ))
487
488
489
def concept_matches_unique_rule(errors, node_location, matches):
490
    """
491
    Checks that a concept has not duplicate matches.
492
493
    This means that a concept can only have one match (no matter what the type)
494
    with another concept. We don't allow eg. a concept that has both a broadMatch
495
    and a relatedMatch with the same concept.
496
    """
497
    if matches is not None:
498
        uri_list = []
499
        for matchtype in matches:
500
            uri_list.extend([uri for uri in matches[matchtype]])
501
        if len(uri_list) > len(set(uri_list)):
502
            errors.append(colander.Invalid(
503
                node_location,
504
                'All matches of a concept should be unique.'
505
            ))
506
507
508
def languagetag_validator(node, cstruct):
509
    """
510
    This validator validates a languagetag.
511
512
    The validator will check if a tag is a valid IANA language tag. The the
513
    validator is informed that this should be a new language tag, it will also
514
    check if the tag doesn't already exist.
515
516
    :param colander.SchemaNode node: The schema that's being used while validating.
517
    :param cstruct: The value being validated.
518
    """
519
    request = node.bindings['request']
520
    languages_manager = request.data_managers['languages_manager']
521
    new = node.bindings['new']
522
    errors = []
523
    language_tag = cstruct['id']
524
525
    if new:
526
        languagetag_checkduplicate(node['id'], language_tag, languages_manager, errors)
527
    languagetag_isvalid_rule(node['id'], language_tag, errors)
528
529
    if len(errors) > 0:
530
        raise ValidationError(
531
            'Language could not be validated',
532
            [e.asdict() for e in errors]
533
        )
534
535
536
def languagetag_isvalid_rule(node, language_tag, errors):
537
    """
538
    Check that a languagetag is a valid IANA language tag.
539
    """
540
    if not tags.check(language_tag):
541
        errors.append(colander.Invalid(
542
            node,
543
            'Invalid language tag: %s' % ", ".join([err.message for err in tags.tag(language_tag).errors])
544
        ))
545
546
547
def languagetag_checkduplicate(node, language_tag, languages_manager, errors):
548
    """
549
    Check that a languagetag isn't duplicated.
550
    """
551
    language_present = languages_manager.count_languages(language_tag)
552
    if language_present:
553
        errors.append(colander.Invalid(
554
            node,
555
            'Duplicate language tag: %s' % language_tag)
556
        )
557
558
559
def subordinate_arrays_only_in_concept_rule(errors, node, concept_type, subordinate_arrays):
560
    """
561
    Checks that only a concept has subordinate arrays.
562
    """
563
    if concept_type != 'concept' and len(subordinate_arrays) > 0:
564
        errors.append(colander.Invalid(
565
            node,
566
            'Only concept can have subordinate arrays.'
567
        ))
568
569
570
def subordinate_arrays_type_rule(errors, node_location, skos_manager, conceptscheme_id, subordinate_arrays):
571
    """
572
    Checks that subordinate arrays are always collections.
573
    """
574
    for subordinate_id in subordinate_arrays:
575
        subordinate = skos_manager.get_thing(subordinate_id, conceptscheme_id)
576
        if subordinate.type != 'collection':
577
            errors.append(colander.Invalid(
578
                node_location,
579
                'A subordinate array should always be a collection'
580
            ))
581
582
583
def subordinate_arrays_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
584
    """
585
    Checks that the subordinate arrays of a concept are not themselves
586
    parents of that concept.
587
    """
588
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'subordinate_arrays', 'member_of',
589
                   'members', 'collection',
590
                   'The subordinate_array collection of a concept must not itself be a parent of the concept being edited.'
591
                   )
592
593
594
def superordinates_only_in_concept_rule(errors, node, concept_type, superordinates):
595
    """
596
    Checks that only collections have superordinates.
597
    """
598
    if concept_type != 'collection' and len(superordinates) > 0:
599
        errors.append(colander.Invalid(
600
            node,
601
            'Only collection can have superordinates.'
602
        ))
603
604
605
def superordinates_type_rule(errors, node_location, skos_manager, conceptscheme_id, superordinates):
606
    """
607
    Checks that superordinates are always concepts.
608
    """
609
    for superordinate_id in superordinates:
610
        superordinate = skos_manager.get_thing(superordinate_id, conceptscheme_id)
611
        if superordinate.type != 'concept':
612
            errors.append(colander.Invalid(
613
                node_location,
614
                'A superordinate should always be a concept'
615
            ))
616
617
618
def superordinates_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
619
    """
620
    Checks that the superordinate concepts of a collection are not themselves
621
    members of that collection.
622
    """
623
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'superordinates', 'members',
624
                   'members', 'collection',
625
                   'The superordinates of a collection must not itself be a member of the collection being edited.'
626
                   )
627