Completed
Push — master ( e370ca...f86a7d )
by Bart
11s
created

note_html_preparer()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 2
c 0
b 0
f 0
cc 1
rs 10
1
# -*- coding: utf-8 -*-
2
"""
3
Module that validates incoming JSON.
4
"""
5
6
import copy
7
8
import bleach
9
import colander
10
from language_tags import tags
11
from skosprovider_sqlalchemy.models import (
12
    Language
13
)
14
from sqlalchemy.orm.exc import NoResultFound
15
16
from atramhasis.errors import ValidationError
17
18
19
class Label(colander.MappingSchema):
20
    label = colander.SchemaNode(
21
        colander.String()
22
    )
23
    type = colander.SchemaNode(
24
        colander.String()
25
    )
26
    language = colander.SchemaNode(
27
        colander.String()
28
    )
29
30
31
def note_html_preparer(value):
32
    return bleach.clean(value, tags=['strong', 'em', 'a'], strip=True)
33
34
35
class Note(colander.MappingSchema):
36
    note = colander.SchemaNode(
37
        colander.String(),
38
        preparer=note_html_preparer
39
    )
40
    type = colander.SchemaNode(
41
        colander.String()
42
    )
43
    language = colander.SchemaNode(
44
        colander.String()
45
    )
46
47
48
class Source(colander.MappingSchema):
49
    citation = colander.SchemaNode(
50
        colander.String()
51
    )
52
53
54
class Labels(colander.SequenceSchema):
55
    label = Label()
56
57
58
class Notes(colander.SequenceSchema):
59
    note = Note()
60
61
62
class Sources(colander.SequenceSchema):
63
    source = Source()
64
65
66
class RelatedConcept(colander.MappingSchema):
67
    id = colander.SchemaNode(
68
        colander.Int()
69
    )
70
71
72
class Concepts(colander.SequenceSchema):
73
    concept = RelatedConcept()
74
75
76
class MatchList(colander.SequenceSchema):
77
    match = colander.SchemaNode(
78
        colander.String(),
79
        missing=None
80
    )
81
82
83
class Matches(colander.MappingSchema):
84
    broad = MatchList(missing=[])
85
    close = MatchList(missing=[])
86
    exact = MatchList(missing=[])
87
    narrow = MatchList(missing=[])
88
    related = MatchList(missing=[])
89
90
91
class Concept(colander.MappingSchema):
92
    id = colander.SchemaNode(
93
        colander.Int(),
94
        missing=None
95
    )
96
    type = colander.SchemaNode(
97
        colander.String(),
98
        missing='concept'
99
    )
100
    labels = Labels(missing=[])
101
    notes = Notes(missing=[])
102
    sources = Sources(missing=[])
103
    broader = Concepts(missing=[])
104
    narrower = Concepts(missing=[])
105
    related = Concepts(missing=[])
106
    members = Concepts(missing=[])
107
    member_of = Concepts(missing=[])
108
    subordinate_arrays = Concepts(missing=[])
109
    superordinates = Concepts(missing=[])
110
    matches = Matches(missing={})
111
112
113
class ConceptScheme(colander.MappingSchema):
114
    labels = Labels(missing=[])
115
    notes = Notes(missing=[])
116
    sources = Sources(missing=[])
117
118
119
class LanguageTag(colander.MappingSchema):
120
    id = colander.SchemaNode(
121
        colander.String()
122
    )
123
    name = colander.SchemaNode(
124
        colander.String()
125
    )
126
127
128
def concept_schema_validator(node, cstruct):
129
    """
130
    This validator validates an incoming concept or collection
131
132
    This validator will run a list of rules against the concept or collection
133
    to see that there are no validation rules being broken.
134
135
    :param colander.SchemaNode node: The schema that's being used while validating.
136
    :param cstruct: The concept or collection being validated.
137
    """
138
    request = node.bindings['request']
139
    skos_manager = request.data_managers['skos_manager']
140
    languages_manager = request.data_managers['languages_manager']
141
    conceptscheme_id = node.bindings['conceptscheme_id']
142
    concept_type = cstruct['type']
143
    id = cstruct['id']
144
    narrower = None
145
    broader = None
146
    related = None
147
    members = None
148
    member_of = None
149
    r_validated = False
150
    n_validated = False
151
    b_validated = False
152
    m_validated = False
153
    o_validated = False
154
    errors = []
155
    min_labels_rule(errors, node, cstruct)
156
    if 'labels' in cstruct:
157
        labels = copy.deepcopy(cstruct['labels'])
158
        label_type_rule(errors, node, skos_manager, labels)
159
        label_lang_rule(errors, node, languages_manager, labels)
160
        max_preflabels_rule(errors, node, labels)
161
    if 'related' in cstruct:
162
        related = copy.deepcopy(cstruct['related'])
163
        related = [m['id'] for m in related]
164
        r_validated = semantic_relations_rule(errors, node['related'], skos_manager,
165
                                              conceptscheme_id, related, id)
166
        concept_relations_rule(errors, node['related'], related, concept_type)
167
    if 'narrower' in cstruct:
168
        narrower = copy.deepcopy(cstruct['narrower'])
169
        narrower = [m['id'] for m in narrower]
170
        n_validated = semantic_relations_rule(errors, node['narrower'], skos_manager,
171
                                              conceptscheme_id, narrower, id)
172
        concept_relations_rule(errors, node['narrower'], narrower, concept_type)
173
    if 'broader' in cstruct:
174
        broader = copy.deepcopy(cstruct['broader'])
175
        broader = [m['id'] for m in broader]
176
        b_validated = semantic_relations_rule(errors, node['broader'], skos_manager,
177
                                              conceptscheme_id, broader, id)
178
        concept_relations_rule(errors, node['broader'], broader, concept_type)
179
    if 'members' in cstruct:
180
        members = copy.deepcopy(cstruct['members'])
181
        members = [m['id'] for m in members]
182
        m_validated = semantic_relations_rule(errors, node['members'], skos_manager,
183
                                              conceptscheme_id, members, id)
184
    if 'member_of' in cstruct:
185
        member_of = copy.deepcopy(cstruct['member_of'])
186
        member_of = [m['id'] for m in member_of]
187
        o_validated = semantic_relations_rule(errors, node['member_of'], skos_manager,
188
                                              conceptscheme_id, member_of, id)
189
    if r_validated and n_validated and b_validated:
190
        concept_type_rule(errors, node['narrower'], skos_manager, conceptscheme_id, narrower)
191
        narrower_hierarchy_rule(errors, node['narrower'], skos_manager, conceptscheme_id, cstruct)
192
        concept_type_rule(errors, node['broader'], skos_manager, conceptscheme_id, broader)
193
        broader_hierarchy_rule(errors, node['broader'], skos_manager, conceptscheme_id, cstruct)
194
        concept_type_rule(errors, node['related'], skos_manager, conceptscheme_id, related)
195
196
    if m_validated and o_validated:
197
        members_only_in_collection_rule(errors, node['members'], concept_type, members)
198
        collection_members_unique_rule(errors, node['members'], members)
199
        collection_type_rule(errors, node['member_of'], skos_manager, conceptscheme_id, member_of)
200
        memberof_hierarchy_rule(errors, node['member_of'], skos_manager, conceptscheme_id, cstruct)
201
        members_hierarchy_rule(errors, node['members'], skos_manager, conceptscheme_id, cstruct)
202
203
    if 'matches' in cstruct:
204
        matches = copy.deepcopy(cstruct['matches'])
205
        concept_matches_rule(errors, node['matches'], matches, concept_type)
206
        concept_matches_unique_rule(errors, node['matches'], matches)
207
208
    if 'subordinate_arrays' in cstruct:
209
        subordinate_arrays = copy.deepcopy(cstruct['subordinate_arrays'])
210
        subordinate_arrays = [m['id'] for m in subordinate_arrays]
211
        subordinate_arrays_only_in_concept_rule(errors, node['subordinate_arrays'], concept_type, subordinate_arrays)
212
        subordinate_arrays_type_rule(errors, node['subordinate_arrays'], skos_manager, conceptscheme_id,
213
                                     subordinate_arrays)
214
        subordinate_arrays_hierarchy_rule(errors, node['subordinate_arrays'], skos_manager, conceptscheme_id, cstruct)
215
216
    if 'superordinates' in cstruct:
217
        superordinates = copy.deepcopy(cstruct['superordinates'])
218
        superordinates = [m['id'] for m in superordinates]
219
        superordinates_only_in_concept_rule(errors, node['superordinates'], concept_type, superordinates)
220
        superordinates_type_rule(errors, node['superordinates'], skos_manager, conceptscheme_id, superordinates)
221
        superordinates_hierarchy_rule(errors, node['superordinates'], skos_manager, conceptscheme_id, cstruct)
222
223
    if len(errors) > 0:
224
        raise ValidationError(
225
            'Concept could not be validated',
226
            [e.asdict() for e in errors]
227
        )
228
229
230
def conceptscheme_schema_validator(node, cstruct):
231
    """
232
    This validator validates the incoming conceptscheme labels
233
234
    :param colander.SchemaNode node: The schema that's being used while validating.
235
    :param cstruct: The conceptscheme being validated.
236
    """
237
    request = node.bindings['request']
238
    skos_manager = request.data_managers['skos_manager']
239
    languages_manager = request.data_managers['languages_manager']
240
    errors = []
241
    min_labels_rule(errors, node, cstruct)
242
    if 'labels' in cstruct:
243
        labels = copy.deepcopy(cstruct['labels'])
244
        label_type_rule(errors, node, skos_manager, labels)
245
        label_lang_rule(errors, node, languages_manager, labels)
246
        max_preflabels_rule(errors, node, labels)
247
    if len(errors) > 0:
248
        raise ValidationError(
249
            'ConceptScheme could not be validated',
250
            [e.asdict() for e in errors]
251
        )
252
253
254
def concept_relations_rule(errors, node_location, relations, concept_type):
255
    """
256
    Checks that only concepts have narrower, broader and related relations.
257
    """
258
    if relations is not None and len(relations) > 0 and concept_type != 'concept':
259
        errors.append(colander.Invalid(
260
            node_location,
261
            'Only concepts can have narrower/broader/related relations'
262
        ))
263
264
265
def max_preflabels_rule(errors, node, labels):
266
    """
267
    Checks that there's only one prefLabel for a certain language.
268
    """
269
    preflabel_found = []
270
    for label in labels:
271
        if label['type'] == 'prefLabel':
272
            if label['language'] in preflabel_found:
273
                errors.append(colander.Invalid(
274
                    node['labels'],
275
                    'Only one prefLabel per language allowed.'
276
                ))
277
            else:
278
                preflabel_found.append(label['language'])
279
280
281
def min_labels_rule(errors, node, cstruct):
282
    """
283
    Checks that a label or collection always has a least one label.
284
    """
285
    if 'labels' in cstruct:
286
        labels = copy.deepcopy(cstruct['labels'])
287
        if len(labels) == 0:
288
            errors.append(colander.Invalid(
289
                node['labels'],
290
                'At least one label is necessary'
291
            ))
292
293
294
def label_type_rule(errors, node, skos_manager, labels):
295
    """
296
    Checks that a label has the correct type.
297
    """
298
    label_types = skos_manager.get_all_label_types()
299
    label_types = [label_type.name for label_type in label_types]
300
    for label in labels:
301
        if label['type'] not in label_types:
302
            errors.append(colander.Invalid(
303
                node['labels'],
304
                'Invalid labeltype.'
305
            ))
306
307
308
def label_lang_rule(errors, node, languages_manager, labels):
309
    """
310
    Checks that languages of a label are valid.
311
312
    Checks that they are valid IANA language tags. If the language tag was not
313
    already present in the database, it adds them.
314
    """
315
    for label in labels:
316
        language_tag = label['language']
317
        if not tags.check(language_tag):
318
            errors.append(colander.Invalid(
319
                node['labels'],
320
                'Invalid language tag: %s' % ", ".join([err.message for err in tags.tag(language_tag).errors])
321
            ))
322
        else:
323
            languages_present = languages_manager.count_languages(language_tag)
324
            if not languages_present:
325
                descriptions = ', '.join(tags.description(language_tag))
326
                language_item = Language(id=language_tag, name=descriptions)
327
                languages_manager.save(language_item)
328
329
330
def concept_type_rule(errors, node_location, skos_manager, conceptscheme_id, items):
331
    """
332
    Checks that the targets of narrower, broader and related are concepts and
333
    not collections.
334
    """
335
    for item_concept_id in items:
336
        item_concept = skos_manager.get_thing(item_concept_id, conceptscheme_id)
337
        if item_concept.type != 'concept':
338
            errors.append(colander.Invalid(
339
                node_location,
340
                'A narrower, broader or related concept should always be a concept, not a collection'
341
            ))
342
343
344
def collection_type_rule(errors, node_location, skos_manager, conceptscheme_id, members):
345
    """
346
    Checks that the targets of member_of are collections and not concepts.
347
    """
348
    for member_collection_id in members:
349
        member_collection = skos_manager.get_thing(member_collection_id, conceptscheme_id)
350
        if member_collection.type != 'collection':
351
            errors.append(colander.Invalid(
352
                node_location,
353
                'A member_of parent should always be a collection'
354
            ))
355
356
357
def semantic_relations_rule(errors, node_location, skos_manager, conceptscheme_id, members, collection_id):
358
    """
359
    Checks that the elements in a group of concepts or collections are not the
360
    the group itself, that they actually exist and are within
361
    the same conceptscheme.
362
    """
363
    for member_concept_id in members:
364
        if member_concept_id == collection_id:
365
            errors.append(colander.Invalid(
366
                node_location,
367
                'A concept or collection cannot be related to itself'
368
            ))
369
            return False
370
        try:
371
            skos_manager.get_thing(member_concept_id, conceptscheme_id)
372
        except NoResultFound:
373
            errors.append(colander.Invalid(
374
                node_location,
375
                'Concept not found, check concept_id. Please be aware members should be within one scheme'
376
            ))
377
            return False
378
    return True
379
380
381
def hierarchy_build(skos_manager, conceptscheme_id, property_list, property_hierarchy, property_concept_type,
382
                    property_list_name):
383
    for property_concept_id in property_list:
384
        try:
385
            property_concept = skos_manager.get_thing(property_concept_id, conceptscheme_id)
386
        except NoResultFound:
387
            property_concept = None
388
        if property_concept is not None and (
389
                        property_concept.type == property_concept_type or property_concept_type is None):
390
            property_concepts = [n.concept_id for n in getattr(property_concept, property_list_name)]
391
            for members_id in property_concepts:
392
                property_hierarchy.append(members_id)
393
                hierarchy_build(skos_manager, conceptscheme_id, property_concepts, property_hierarchy,
394
                                property_concept_type, property_list_name)
395
396
397
def hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, property1, property2,
398
                   property2_list_name, concept_type, error_message):
399
    """
400
    Checks that the property1 of a concept are not already in property2 hierarchy
401
402
    """
403
    property2_hierarchy = []
404
    property1_list = []
405
    if property1 in cstruct:
406
        property1_value = copy.deepcopy(cstruct[property1])
407
        property1_list = [m['id'] for m in property1_value]
408
    if property2 in cstruct:
409
        property2_value = copy.deepcopy(cstruct[property2])
410
        property2_list = [m['id'] for m in property2_value]
411
        property2_hierarchy = property2_list
412
        hierarchy_build(skos_manager, conceptscheme_id, property2_list, property2_hierarchy, concept_type,
413
                        property2_list_name)
414
    for broader_concept_id in property1_list:
415
        if broader_concept_id in property2_hierarchy:
416
            errors.append(colander.Invalid(
417
                node_location,
418
                error_message
419
            ))
420
421
422
def broader_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
423
    """
424
    Checks that the broader concepts of a concepts are not alreadt narrower
425
    concepts of that concept.
426
    """
427
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'broader', 'narrower',
428
                   'narrower_concepts', 'concept',
429
                   'The broader concept of a concept must not itself be a narrower concept of the concept being edited.'
430
                   )
431
432
433
def narrower_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
434
    """
435
    Checks that the narrower concepts of a concept are not already broader
436
    concepts of that concept.
437
    """
438
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'narrower', 'broader',
439
                   'broader_concepts', 'concept',
440
                   'The narrower concept of a concept must not itself be a broader concept of the concept being edited.'
441
                   )
442
443
444
def collection_members_unique_rule(errors, node_location, members):
445
    """
446
    Checks that a collection has no duplicate members.
447
    """
448
    if len(members) > len(set(members)):
449
        errors.append(colander.Invalid(
450
            node_location,
451
            'All members of a collection should be unique.'
452
        ))
453
454
455
def members_only_in_collection_rule(errors, node, concept_type, members):
456
    """
457
    Checks that only collections have members.
458
    """
459
    if concept_type != 'collection' and len(members) > 0:
460
        errors.append(colander.Invalid(
461
            node,
462
            'Only collections can have members.'
463
        ))
464
465
466
def memberof_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
467
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'member_of', 'members',
468
                   'members', 'collection',
469
                   'The parent member_of collection of a concept must not itself be a member of the concept being edited.'
470
                   )
471
472
473
def members_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
474
    """
475
    Checks that a collection does not have members that are in themselves
476
    already "parents" of that collection.
477
    """
478
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'members', 'member_of',
479
                   'member_of', 'collection',
480
                   'The item of a members collection must not itself be a parent of the concept/collection being edited.'
481
                   )
482
483
484
def concept_matches_rule(errors, node_location, matches, concept_type):
485
    """
486
    Checks that only concepts have matches.
487
    """
488
    if matches is not None and len(matches) > 0 and concept_type != 'concept':
489
        errors.append(colander.Invalid(
490
            node_location,
491
            'Only concepts can have matches'
492
        ))
493
494
495
def concept_matches_unique_rule(errors, node_location, matches):
496
    """
497
    Checks that a concept has not duplicate matches.
498
499
    This means that a concept can only have one match (no matter what the type)
500
    with another concept. We don't allow eg. a concept that has both a broadMatch
501
    and a relatedMatch with the same concept.
502
    """
503
    if matches is not None:
504
        uri_list = []
505
        for matchtype in matches:
506
            uri_list.extend([uri for uri in matches[matchtype]])
507
        if len(uri_list) > len(set(uri_list)):
508
            errors.append(colander.Invalid(
509
                node_location,
510
                'All matches of a concept should be unique.'
511
            ))
512
513
514
def languagetag_validator(node, cstruct):
515
    """
516
    This validator validates a languagetag.
517
518
    The validator will check if a tag is a valid IANA language tag. The the
519
    validator is informed that this should be a new language tag, it will also
520
    check if the tag doesn't already exist.
521
522
    :param colander.SchemaNode node: The schema that's being used while validating.
523
    :param cstruct: The value being validated.
524
    """
525
    request = node.bindings['request']
526
    languages_manager = request.data_managers['languages_manager']
527
    new = node.bindings['new']
528
    errors = []
529
    language_tag = cstruct['id']
530
531
    if new:
532
        languagetag_checkduplicate(node['id'], language_tag, languages_manager, errors)
533
    languagetag_isvalid_rule(node['id'], language_tag, errors)
534
535
    if len(errors) > 0:
536
        raise ValidationError(
537
            'Language could not be validated',
538
            [e.asdict() for e in errors]
539
        )
540
541
542
def languagetag_isvalid_rule(node, language_tag, errors):
543
    """
544
    Check that a languagetag is a valid IANA language tag.
545
    """
546
    if not tags.check(language_tag):
547
        errors.append(colander.Invalid(
548
            node,
549
            'Invalid language tag: %s' % ", ".join([err.message for err in tags.tag(language_tag).errors])
550
        ))
551
552
553
def languagetag_checkduplicate(node, language_tag, languages_manager, errors):
554
    """
555
    Check that a languagetag isn't duplicated.
556
    """
557
    language_present = languages_manager.count_languages(language_tag)
558
    if language_present:
559
        errors.append(colander.Invalid(
560
            node,
561
            'Duplicate language tag: %s' % language_tag)
562
        )
563
564
565
def subordinate_arrays_only_in_concept_rule(errors, node, concept_type, subordinate_arrays):
566
    """
567
    Checks that only a concept has subordinate arrays.
568
    """
569
    if concept_type != 'concept' and len(subordinate_arrays) > 0:
570
        errors.append(colander.Invalid(
571
            node,
572
            'Only concept can have subordinate arrays.'
573
        ))
574
575
576
def subordinate_arrays_type_rule(errors, node_location, skos_manager, conceptscheme_id, subordinate_arrays):
577
    """
578
    Checks that subordinate arrays are always collections.
579
    """
580
    for subordinate_id in subordinate_arrays:
581
        subordinate = skos_manager.get_thing(subordinate_id, conceptscheme_id)
582
        if subordinate.type != 'collection':
583
            errors.append(colander.Invalid(
584
                node_location,
585
                'A subordinate array should always be a collection'
586
            ))
587
588
589
def subordinate_arrays_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
590
    """
591
    Checks that the subordinate arrays of a concept are not themselves
592
    parents of that concept.
593
    """
594
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'subordinate_arrays', 'member_of',
595
                   'members', 'collection',
596
                   'The subordinate_array collection of a concept must not itself be a parent of the concept being edited.'
597
                   )
598
599
600
def superordinates_only_in_concept_rule(errors, node, concept_type, superordinates):
601
    """
602
    Checks that only collections have superordinates.
603
    """
604
    if concept_type != 'collection' and len(superordinates) > 0:
605
        errors.append(colander.Invalid(
606
            node,
607
            'Only collection can have superordinates.'
608
        ))
609
610
611
def superordinates_type_rule(errors, node_location, skos_manager, conceptscheme_id, superordinates):
612
    """
613
    Checks that superordinates are always concepts.
614
    """
615
    for superordinate_id in superordinates:
616
        superordinate = skos_manager.get_thing(superordinate_id, conceptscheme_id)
617
        if superordinate.type != 'concept':
618
            errors.append(colander.Invalid(
619
                node_location,
620
                'A superordinate should always be a concept'
621
            ))
622
623
624
def superordinates_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
625
    """
626
    Checks that the superordinate concepts of a collection are not themselves
627
    members of that collection.
628
    """
629
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'superordinates', 'members',
630
                   'members', 'collection',
631
                   'The superordinates of a collection must not itself be a member of the collection being edited.'
632
                   )
633