Test Failed
Push — master ( 9ea4cd...0ffab9 )
by Koen
03:34 queued 13s
created

superordinates_only_in_concept_rule()   A

Complexity

Conditions 3

Size

Total Lines 8
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 8
rs 10
c 0
b 0
f 0
cc 3
nop 4
1
"""
2
Module that validates incoming JSON.
3
"""
4
5
import copy
6
7
import bleach
8
import colander
9
from language_tags import tags
10
from skosprovider_sqlalchemy.models import (
11
    Language
12
)
13
from sqlalchemy.orm.exc import NoResultFound
14
15
from atramhasis.errors import ValidationError
16
17
18
class Label(colander.MappingSchema):
19
    label = colander.SchemaNode(
20
        colander.String()
21
    )
22
    type = colander.SchemaNode(
23
        colander.String()
24
    )
25
    language = colander.SchemaNode(
26
        colander.String()
27
    )
28
29
30
def html_preparer(value):
31
    '''
32
    Prepare the value by stripping all html except certain tags.
33
34
    :param value: The value to be cleaned. 
35
    :rtype: str
36
    '''
37
    try:
38
        return bleach.clean(value, tags=['strong', 'em', 'a'], strip=True)
39
    except TypeError as e:
40
        # Trying to clean a non-string
41
        # Ignore for now so it can be caught later on
42
        return value
43
44
45
class Note(colander.MappingSchema):
46
    note = colander.SchemaNode(
47
        colander.String(),
48
        preparer=html_preparer
49
    )
50
    type = colander.SchemaNode(
51
        colander.String()
52
    )
53
    language = colander.SchemaNode(
54
        colander.String()
55
    )
56
57
58
class Source(colander.MappingSchema):
59
    citation = colander.SchemaNode(
60
        colander.String(),
61
        preparer=html_preparer
62
    )
63
64
65
class Labels(colander.SequenceSchema):
66
    label = Label()
67
68
69
class Notes(colander.SequenceSchema):
70
    note = Note()
71
72
73
class Sources(colander.SequenceSchema):
74
    source = Source()
75
76
77
class RelatedConcept(colander.MappingSchema):
78
    id = colander.SchemaNode(
79
        colander.Int()
80
    )
81
82
83
class Concepts(colander.SequenceSchema):
84
    concept = RelatedConcept()
85
86
87
class MatchList(colander.SequenceSchema):
88
    match = colander.SchemaNode(
89
        colander.String(),
90
        missing=None
91
    )
92
93
94
class Matches(colander.MappingSchema):
95
    broad = MatchList(missing=[])
96
    close = MatchList(missing=[])
97
    exact = MatchList(missing=[])
98
    narrow = MatchList(missing=[])
99
    related = MatchList(missing=[])
100
101
102
class Concept(colander.MappingSchema):
103
    id = colander.SchemaNode(
104
        colander.Int(),
105
        missing=None
106
    )
107
    type = colander.SchemaNode(
108
        colander.String(),
109
        missing='concept'
110
    )
111
    labels = Labels(missing=[])
112
    notes = Notes(missing=[])
113
    sources = Sources(missing=[])
114
    broader = Concepts(missing=[])
115
    narrower = Concepts(missing=[])
116
    related = Concepts(missing=[])
117
    members = Concepts(missing=[])
118
    member_of = Concepts(missing=[])
119
    subordinate_arrays = Concepts(missing=[])
120
    superordinates = Concepts(missing=[])
121
    matches = Matches(missing={})
122
    infer_concept_relations = colander.SchemaNode(
123
        colander.Boolean(),
124
        missing=colander.drop
125
    )
126
127
128
class ConceptScheme(colander.MappingSchema):
129
    labels = Labels(missing=[])
130
    notes = Notes(missing=[])
131
    sources = Sources(missing=[])
132
133
134
class LanguageTag(colander.MappingSchema):
135
    id = colander.SchemaNode(
136
        colander.String()
137
    )
138
    name = colander.SchemaNode(
139
        colander.String()
140
    )
141
142
143
def concept_schema_validator(node, cstruct):
144
    """
145
    This validator validates an incoming concept or collection
146
147
    This validator will run a list of rules against the concept or collection
148
    to see that there are no validation rules being broken.
149
150
    :param colander.SchemaNode node: The schema that's being used while validating.
151
    :param cstruct: The concept or collection being validated.
152
    """
153
    request = node.bindings['request']
154
    skos_manager = request.data_managers['skos_manager']
155
    languages_manager = request.data_managers['languages_manager']
156
    conceptscheme_id = node.bindings['conceptscheme_id']
157
    concept_type = cstruct['type']
158
    id = cstruct['id']
159
    narrower = None
160
    broader = None
161
    related = None
162
    members = None
163
    member_of = None
164
    r_validated = False
165
    n_validated = False
166
    b_validated = False
167
    m_validated = False
168
    o_validated = False
169
    errors = []
170
    min_labels_rule(errors, node, cstruct)
171
    if 'labels' in cstruct:
172
        labels = copy.deepcopy(cstruct['labels'])
173
        label_type_rule(errors, node, skos_manager, labels)
174
        label_lang_rule(errors, node, languages_manager, labels)
175
        max_preflabels_rule(errors, node, labels)
176
    if 'related' in cstruct:
177
        related = copy.deepcopy(cstruct['related'])
178
        related = [m['id'] for m in related]
179
        r_validated = semantic_relations_rule(errors, node['related'], skos_manager,
180
                                              conceptscheme_id, related, id)
181
        concept_relations_rule(errors, node['related'], related, concept_type)
182
    if 'narrower' in cstruct:
183
        narrower = copy.deepcopy(cstruct['narrower'])
184
        narrower = [m['id'] for m in narrower]
185
        n_validated = semantic_relations_rule(errors, node['narrower'], skos_manager,
186
                                              conceptscheme_id, narrower, id)
187
        concept_relations_rule(errors, node['narrower'], narrower, concept_type)
188
    if 'broader' in cstruct:
189
        broader = copy.deepcopy(cstruct['broader'])
190
        broader = [m['id'] for m in broader]
191
        b_validated = semantic_relations_rule(errors, node['broader'], skos_manager,
192
                                              conceptscheme_id, broader, id)
193
        concept_relations_rule(errors, node['broader'], broader, concept_type)
194
    if 'members' in cstruct:
195
        members = copy.deepcopy(cstruct['members'])
196
        members = [m['id'] for m in members]
197
        m_validated = semantic_relations_rule(errors, node['members'], skos_manager,
198
                                              conceptscheme_id, members, id)
199
    if 'member_of' in cstruct:
200
        member_of = copy.deepcopy(cstruct['member_of'])
201
        member_of = [m['id'] for m in member_of]
202
        o_validated = semantic_relations_rule(errors, node['member_of'], skos_manager,
203
                                              conceptscheme_id, member_of, id)
204
    if r_validated and n_validated and b_validated:
205
        concept_type_rule(errors, node['narrower'], skos_manager, conceptscheme_id, narrower)
206
        narrower_hierarchy_rule(errors, node['narrower'], skos_manager, conceptscheme_id, cstruct)
207
        concept_type_rule(errors, node['broader'], skos_manager, conceptscheme_id, broader)
208
        broader_hierarchy_rule(errors, node['broader'], skos_manager, conceptscheme_id, cstruct)
209
        concept_type_rule(errors, node['related'], skos_manager, conceptscheme_id, related)
210
211
    if m_validated and o_validated:
212
        members_only_in_collection_rule(errors, node['members'], concept_type, members)
213
        collection_members_unique_rule(errors, node['members'], members)
214
        collection_type_rule(errors, node['member_of'], skos_manager, conceptscheme_id, member_of)
215
        memberof_hierarchy_rule(errors, node['member_of'], skos_manager, conceptscheme_id, cstruct)
216
        members_hierarchy_rule(errors, node['members'], skos_manager, conceptscheme_id, cstruct)
217
218
    if 'matches' in cstruct:
219
        matches = copy.deepcopy(cstruct['matches'])
220
        concept_matches_rule(errors, node['matches'], matches, concept_type)
221
        concept_matches_unique_rule(errors, node['matches'], matches)
222
223
    if 'subordinate_arrays' in cstruct:
224
        subordinate_arrays = copy.deepcopy(cstruct['subordinate_arrays'])
225
        subordinate_arrays = [m['id'] for m in subordinate_arrays]
226
        subordinate_arrays_only_in_concept_rule(errors, node['subordinate_arrays'], concept_type, subordinate_arrays)
227
        subordinate_arrays_type_rule(errors, node['subordinate_arrays'], skos_manager, conceptscheme_id,
228
                                     subordinate_arrays)
229
        subordinate_arrays_hierarchy_rule(errors, node['subordinate_arrays'], skos_manager, conceptscheme_id, cstruct)
230
231
    if 'superordinates' in cstruct:
232
        superordinates = copy.deepcopy(cstruct['superordinates'])
233
        superordinates = [m['id'] for m in superordinates]
234
        superordinates_only_in_concept_rule(errors, node['superordinates'], concept_type, superordinates)
235
        superordinates_type_rule(errors, node['superordinates'], skos_manager, conceptscheme_id, superordinates)
236
        superordinates_hierarchy_rule(errors, node['superordinates'], skos_manager, conceptscheme_id, cstruct)
237
238
    if cstruct['type'] == 'concept' and 'infer_concept_relations' in cstruct:
239
        msg = "'infer_concept_relations' can only be set for collections."
240
        errors.append(colander.Invalid(node['infer_concept_relations'], msg=msg))
241
242
    if len(errors) > 0:
243
        raise ValidationError(
244
            'Concept could not be validated',
245
            [e.asdict() for e in errors]
246
        )
247
248
249
def conceptscheme_schema_validator(node, cstruct):
250
    """
251
    This validator validates the incoming conceptscheme labels
252
253
    :param colander.SchemaNode node: The schema that's being used while validating.
254
    :param cstruct: The conceptscheme being validated.
255
    """
256
    request = node.bindings['request']
257
    skos_manager = request.data_managers['skos_manager']
258
    languages_manager = request.data_managers['languages_manager']
259
    errors = []
260
    min_labels_rule(errors, node, cstruct)
261
    if 'labels' in cstruct:
262
        labels = copy.deepcopy(cstruct['labels'])
263
        label_type_rule(errors, node, skos_manager, labels)
264
        label_lang_rule(errors, node, languages_manager, labels)
265
        max_preflabels_rule(errors, node, labels)
266
    if len(errors) > 0:
267
        raise ValidationError(
268
            'ConceptScheme could not be validated',
269
            [e.asdict() for e in errors]
270
        )
271
272
273
def concept_relations_rule(errors, node_location, relations, concept_type):
274
    """
275
    Checks that only concepts have narrower, broader and related relations.
276
    """
277
    if relations is not None and len(relations) > 0 and concept_type != 'concept':
278
        errors.append(colander.Invalid(
279
            node_location,
280
            'Only concepts can have narrower/broader/related relations'
281
        ))
282
283
284
def max_preflabels_rule(errors, node, labels):
285
    """
286
    Checks that there's only one prefLabel for a certain language.
287
    """
288
    preflabel_found = []
289
    for label in labels:
290
        if label['type'] == 'prefLabel':
291
            if label['language'] in preflabel_found:
292
                errors.append(colander.Invalid(
293
                    node['labels'],
294
                    'Only one prefLabel per language allowed.'
295
                ))
296
            else:
297
                preflabel_found.append(label['language'])
298
299
300
def min_labels_rule(errors, node, cstruct):
301
    """
302
    Checks that a label or collection always has a least one label.
303
    """
304
    if 'labels' in cstruct:
305
        labels = copy.deepcopy(cstruct['labels'])
306
        if len(labels) == 0:
307
            errors.append(colander.Invalid(
308
                node['labels'],
309
                'At least one label is necessary'
310
            ))
311
312
313
def label_type_rule(errors, node, skos_manager, labels):
314
    """
315
    Checks that a label has the correct type.
316
    """
317
    label_types = skos_manager.get_all_label_types()
318
    label_types = [label_type.name for label_type in label_types]
319
    for label in labels:
320
        if label['type'] not in label_types:
321
            errors.append(colander.Invalid(
322
                node['labels'],
323
                'Invalid labeltype.'
324
            ))
325
326
327
def label_lang_rule(errors, node, languages_manager, labels):
328
    """
329
    Checks that languages of a label are valid.
330
331
    Checks that they are valid IANA language tags. If the language tag was not
332
    already present in the database, it adds them.
333
    """
334
    for label in labels:
335
        language_tag = label['language']
336
        if not tags.check(language_tag):
337
            errors.append(colander.Invalid(
338
                node['labels'],
339
                'Invalid language tag: %s' % ", ".join([err.message for err in tags.tag(language_tag).errors])
340
            ))
341
        else:
342
            languages_present = languages_manager.count_languages(language_tag)
343
            if not languages_present:
344
                descriptions = ', '.join(tags.description(language_tag))
345
                language_item = Language(id=language_tag, name=descriptions)
346
                languages_manager.save(language_item)
347
348
349
def concept_type_rule(errors, node_location, skos_manager, conceptscheme_id, items):
350
    """
351
    Checks that the targets of narrower, broader and related are concepts and
352
    not collections.
353
    """
354
    for item_concept_id in items:
355
        item_concept = skos_manager.get_thing(item_concept_id, conceptscheme_id)
356
        if item_concept.type != 'concept':
357
            errors.append(colander.Invalid(
358
                node_location,
359
                'A narrower, broader or related concept should always be a concept, not a collection'
360
            ))
361
362
363
def collection_type_rule(errors, node_location, skos_manager, conceptscheme_id, members):
364
    """
365
    Checks that the targets of member_of are collections and not concepts.
366
    """
367
    for member_collection_id in members:
368
        member_collection = skos_manager.get_thing(member_collection_id, conceptscheme_id)
369
        if member_collection.type != 'collection':
370
            errors.append(colander.Invalid(
371
                node_location,
372
                'A member_of parent should always be a collection'
373
            ))
374
375
376
def semantic_relations_rule(errors, node_location, skos_manager, conceptscheme_id, members, collection_id):
377
    """
378
    Checks that the elements in a group of concepts or collections are not the
379
    the group itself, that they actually exist and are within
380
    the same conceptscheme.
381
    """
382
    for member_concept_id in members:
383
        if member_concept_id == collection_id:
384
            errors.append(colander.Invalid(
385
                node_location,
386
                'A concept or collection cannot be related to itself'
387
            ))
388
            return False
389
        try:
390
            skos_manager.get_thing(member_concept_id, conceptscheme_id)
391
        except NoResultFound:
392
            errors.append(colander.Invalid(
393
                node_location,
394
                'Concept not found, check concept_id. Please be aware members should be within one scheme'
395
            ))
396
            return False
397
    return True
398
399
400
def hierarchy_build(skos_manager, conceptscheme_id, property_list, property_hierarchy, property_concept_type,
401
                    property_list_name):
402
    for property_concept_id in property_list:
403
        try:
404
            property_concept = skos_manager.get_thing(property_concept_id, conceptscheme_id)
405
        except NoResultFound:
406
            property_concept = None
407
        if property_concept is not None and (
408
                        property_concept.type == property_concept_type or property_concept_type is None):
409
            property_concepts = [n.concept_id for n in getattr(property_concept, property_list_name)]
410
            for members_id in property_concepts:
411
                property_hierarchy.append(members_id)
412
                hierarchy_build(skos_manager, conceptscheme_id, property_concepts, property_hierarchy,
413
                                property_concept_type, property_list_name)
414
415
416
def hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, property1, property2,
417
                   property2_list_name, concept_type, error_message):
418
    """
419
    Checks that the property1 of a concept are not already in property2 hierarchy
420
421
    """
422
    property2_hierarchy = []
423
    property1_list = []
424
    if property1 in cstruct:
425
        property1_value = copy.deepcopy(cstruct[property1])
426
        property1_list = [m['id'] for m in property1_value]
427
    if property2 in cstruct:
428
        property2_value = copy.deepcopy(cstruct[property2])
429
        property2_list = [m['id'] for m in property2_value]
430
        property2_hierarchy = property2_list
431
        hierarchy_build(skos_manager, conceptscheme_id, property2_list, property2_hierarchy, concept_type,
432
                        property2_list_name)
433
    for broader_concept_id in property1_list:
434
        if broader_concept_id in property2_hierarchy:
435
            errors.append(colander.Invalid(
436
                node_location,
437
                error_message
438
            ))
439
440
441
def broader_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
442
    """
443
    Checks that the broader concepts of a concepts are not alreadt narrower
444
    concepts of that concept.
445
    """
446
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'broader', 'narrower',
447
                   'narrower_concepts', 'concept',
448
                   'The broader concept of a concept must not itself be a narrower concept of the concept being edited.'
449
                   )
450
451
452
def narrower_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
453
    """
454
    Checks that the narrower concepts of a concept are not already broader
455
    concepts of that concept.
456
    """
457
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'narrower', 'broader',
458
                   'broader_concepts', 'concept',
459
                   'The narrower concept of a concept must not itself be a broader concept of the concept being edited.'
460
                   )
461
462
463
def collection_members_unique_rule(errors, node_location, members):
464
    """
465
    Checks that a collection has no duplicate members.
466
    """
467
    if len(members) > len(set(members)):
468
        errors.append(colander.Invalid(
469
            node_location,
470
            'All members of a collection should be unique.'
471
        ))
472
473
474
def members_only_in_collection_rule(errors, node, concept_type, members):
475
    """
476
    Checks that only collections have members.
477
    """
478
    if concept_type != 'collection' and len(members) > 0:
479
        errors.append(colander.Invalid(
480
            node,
481
            'Only collections can have members.'
482
        ))
483
484
485
def memberof_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
486
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'member_of', 'members',
487
                   'members', 'collection',
488
                   'The parent member_of collection of a concept must not itself be a member of the concept being edited.'
489
                   )
490
491
492
def members_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
493
    """
494
    Checks that a collection does not have members that are in themselves
495
    already "parents" of that collection.
496
    """
497
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'members', 'member_of',
498
                   'member_of', 'collection',
499
                   'The item of a members collection must not itself be a parent of the concept/collection being edited.'
500
                   )
501
502
503
def concept_matches_rule(errors, node_location, matches, concept_type):
504
    """
505
    Checks that only concepts have matches.
506
    """
507
    if matches is not None and len(matches) > 0 and concept_type != 'concept':
508
        errors.append(colander.Invalid(
509
            node_location,
510
            'Only concepts can have matches'
511
        ))
512
513
514
def concept_matches_unique_rule(errors, node_location, matches):
515
    """
516
    Checks that a concept has not duplicate matches.
517
518
    This means that a concept can only have one match (no matter what the type)
519
    with another concept. We don't allow eg. a concept that has both a broadMatch
520
    and a relatedMatch with the same concept.
521
    """
522
    if matches is not None:
523
        uri_list = []
524
        for matchtype in matches:
525
            uri_list.extend([uri for uri in matches[matchtype]])
526
        if len(uri_list) > len(set(uri_list)):
527
            errors.append(colander.Invalid(
528
                node_location,
529
                'All matches of a concept should be unique.'
530
            ))
531
532
533
def languagetag_validator(node, cstruct):
534
    """
535
    This validator validates a languagetag.
536
537
    The validator will check if a tag is a valid IANA language tag. The the
538
    validator is informed that this should be a new language tag, it will also
539
    check if the tag doesn't already exist.
540
541
    :param colander.SchemaNode node: The schema that's being used while validating.
542
    :param cstruct: The value being validated.
543
    """
544
    request = node.bindings['request']
545
    languages_manager = request.data_managers['languages_manager']
546
    new = node.bindings['new']
547
    errors = []
548
    language_tag = cstruct['id']
549
550
    if new:
551
        languagetag_checkduplicate(node['id'], language_tag, languages_manager, errors)
552
    languagetag_isvalid_rule(node['id'], language_tag, errors)
553
554
    if len(errors) > 0:
555
        raise ValidationError(
556
            'Language could not be validated',
557
            [e.asdict() for e in errors]
558
        )
559
560
561
def languagetag_isvalid_rule(node, language_tag, errors):
562
    """
563
    Check that a languagetag is a valid IANA language tag.
564
    """
565
    if not tags.check(language_tag):
566
        errors.append(colander.Invalid(
567
            node,
568
            'Invalid language tag: %s' % ", ".join([err.message for err in tags.tag(language_tag).errors])
569
        ))
570
571
572
def languagetag_checkduplicate(node, language_tag, languages_manager, errors):
573
    """
574
    Check that a languagetag isn't duplicated.
575
    """
576
    language_present = languages_manager.count_languages(language_tag)
577
    if language_present:
578
        errors.append(colander.Invalid(
579
            node,
580
            'Duplicate language tag: %s' % language_tag)
581
        )
582
583
584
def subordinate_arrays_only_in_concept_rule(errors, node, concept_type, subordinate_arrays):
585
    """
586
    Checks that only a concept has subordinate arrays.
587
    """
588
    if concept_type != 'concept' and len(subordinate_arrays) > 0:
589
        errors.append(colander.Invalid(
590
            node,
591
            'Only concept can have subordinate arrays.'
592
        ))
593
594
595
def subordinate_arrays_type_rule(errors, node_location, skos_manager, conceptscheme_id, subordinate_arrays):
596
    """
597
    Checks that subordinate arrays are always collections.
598
    """
599
    for subordinate_id in subordinate_arrays:
600
        subordinate = skos_manager.get_thing(subordinate_id, conceptscheme_id)
601
        if subordinate.type != 'collection':
602
            errors.append(colander.Invalid(
603
                node_location,
604
                'A subordinate array should always be a collection'
605
            ))
606
607
608
def subordinate_arrays_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
609
    """
610
    Checks that the subordinate arrays of a concept are not themselves
611
    parents of that concept.
612
    """
613
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'subordinate_arrays', 'member_of',
614
                   'members', 'collection',
615
                   'The subordinate_array collection of a concept must not itself be a parent of the concept being edited.'
616
                   )
617
618
619
def superordinates_only_in_concept_rule(errors, node, concept_type, superordinates):
620
    """
621
    Checks that only collections have superordinates.
622
    """
623
    if concept_type != 'collection' and len(superordinates) > 0:
624
        errors.append(colander.Invalid(
625
            node,
626
            'Only collection can have superordinates.'
627
        ))
628
629
630
def superordinates_type_rule(errors, node_location, skos_manager, conceptscheme_id, superordinates):
631
    """
632
    Checks that superordinates are always concepts.
633
    """
634
    for superordinate_id in superordinates:
635
        superordinate = skos_manager.get_thing(superordinate_id, conceptscheme_id)
636
        if superordinate.type != 'concept':
637
            errors.append(colander.Invalid(
638
                node_location,
639
                'A superordinate should always be a concept'
640
            ))
641
642
643
def superordinates_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
644
    """
645
    Checks that the superordinate concepts of a collection are not themselves
646
    members of that collection.
647
    """
648
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'superordinates', 'members',
649
                   'members', 'collection',
650
                   'The superordinates of a collection must not itself be a member of the collection being edited.'
651
                   )
652