Completed
Push — master ( 667158...a0e616 )
by Bart
11s
created

html_preparer()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 2
c 0
b 0
f 0
cc 1
rs 10
1
# -*- coding: utf-8 -*-
2
"""
3
Module that validates incoming JSON.
4
"""
5
6
import copy
7
8
import bleach
9
import colander
10
from language_tags import tags
11
from skosprovider_sqlalchemy.models import (
12
    Language
13
)
14
from sqlalchemy.orm.exc import NoResultFound
15
16
from atramhasis.errors import ValidationError
17
18
19
class Label(colander.MappingSchema):
20
    label = colander.SchemaNode(
21
        colander.String()
22
    )
23
    type = colander.SchemaNode(
24
        colander.String()
25
    )
26
    language = colander.SchemaNode(
27
        colander.String()
28
    )
29
30
31
def html_preparer(value):
32
    return bleach.clean(value, tags=['strong', 'em', 'a'], strip=True)
33
34
35
class Note(colander.MappingSchema):
36
    note = colander.SchemaNode(
37
        colander.String(),
38
        preparer=html_preparer
39
    )
40
    type = colander.SchemaNode(
41
        colander.String()
42
    )
43
    language = colander.SchemaNode(
44
        colander.String()
45
    )
46
47
48
class Source(colander.MappingSchema):
49
    citation = colander.SchemaNode(
50
        colander.String(),
51
        preparer=html_preparer
52
    )
53
54
55
class Labels(colander.SequenceSchema):
56
    label = Label()
57
58
59
class Notes(colander.SequenceSchema):
60
    note = Note()
61
62
63
class Sources(colander.SequenceSchema):
64
    source = Source()
65
66
67
class RelatedConcept(colander.MappingSchema):
68
    id = colander.SchemaNode(
69
        colander.Int()
70
    )
71
72
73
class Concepts(colander.SequenceSchema):
74
    concept = RelatedConcept()
75
76
77
class MatchList(colander.SequenceSchema):
78
    match = colander.SchemaNode(
79
        colander.String(),
80
        missing=None
81
    )
82
83
84
class Matches(colander.MappingSchema):
85
    broad = MatchList(missing=[])
86
    close = MatchList(missing=[])
87
    exact = MatchList(missing=[])
88
    narrow = MatchList(missing=[])
89
    related = MatchList(missing=[])
90
91
92
class Concept(colander.MappingSchema):
93
    id = colander.SchemaNode(
94
        colander.Int(),
95
        missing=None
96
    )
97
    type = colander.SchemaNode(
98
        colander.String(),
99
        missing='concept'
100
    )
101
    labels = Labels(missing=[])
102
    notes = Notes(missing=[])
103
    sources = Sources(missing=[])
104
    broader = Concepts(missing=[])
105
    narrower = Concepts(missing=[])
106
    related = Concepts(missing=[])
107
    members = Concepts(missing=[])
108
    member_of = Concepts(missing=[])
109
    subordinate_arrays = Concepts(missing=[])
110
    superordinates = Concepts(missing=[])
111
    matches = Matches(missing={})
112
113
114
class ConceptScheme(colander.MappingSchema):
115
    labels = Labels(missing=[])
116
    notes = Notes(missing=[])
117
    sources = Sources(missing=[])
118
119
120
class LanguageTag(colander.MappingSchema):
121
    id = colander.SchemaNode(
122
        colander.String()
123
    )
124
    name = colander.SchemaNode(
125
        colander.String()
126
    )
127
128
129
def concept_schema_validator(node, cstruct):
130
    """
131
    This validator validates an incoming concept or collection
132
133
    This validator will run a list of rules against the concept or collection
134
    to see that there are no validation rules being broken.
135
136
    :param colander.SchemaNode node: The schema that's being used while validating.
137
    :param cstruct: The concept or collection being validated.
138
    """
139
    request = node.bindings['request']
140
    skos_manager = request.data_managers['skos_manager']
141
    languages_manager = request.data_managers['languages_manager']
142
    conceptscheme_id = node.bindings['conceptscheme_id']
143
    concept_type = cstruct['type']
144
    id = cstruct['id']
145
    narrower = None
146
    broader = None
147
    related = None
148
    members = None
149
    member_of = None
150
    r_validated = False
151
    n_validated = False
152
    b_validated = False
153
    m_validated = False
154
    o_validated = False
155
    errors = []
156
    min_labels_rule(errors, node, cstruct)
157
    if 'labels' in cstruct:
158
        labels = copy.deepcopy(cstruct['labels'])
159
        label_type_rule(errors, node, skos_manager, labels)
160
        label_lang_rule(errors, node, languages_manager, labels)
161
        max_preflabels_rule(errors, node, labels)
162
    if 'related' in cstruct:
163
        related = copy.deepcopy(cstruct['related'])
164
        related = [m['id'] for m in related]
165
        r_validated = semantic_relations_rule(errors, node['related'], skos_manager,
166
                                              conceptscheme_id, related, id)
167
        concept_relations_rule(errors, node['related'], related, concept_type)
168
    if 'narrower' in cstruct:
169
        narrower = copy.deepcopy(cstruct['narrower'])
170
        narrower = [m['id'] for m in narrower]
171
        n_validated = semantic_relations_rule(errors, node['narrower'], skos_manager,
172
                                              conceptscheme_id, narrower, id)
173
        concept_relations_rule(errors, node['narrower'], narrower, concept_type)
174
    if 'broader' in cstruct:
175
        broader = copy.deepcopy(cstruct['broader'])
176
        broader = [m['id'] for m in broader]
177
        b_validated = semantic_relations_rule(errors, node['broader'], skos_manager,
178
                                              conceptscheme_id, broader, id)
179
        concept_relations_rule(errors, node['broader'], broader, concept_type)
180
    if 'members' in cstruct:
181
        members = copy.deepcopy(cstruct['members'])
182
        members = [m['id'] for m in members]
183
        m_validated = semantic_relations_rule(errors, node['members'], skos_manager,
184
                                              conceptscheme_id, members, id)
185
    if 'member_of' in cstruct:
186
        member_of = copy.deepcopy(cstruct['member_of'])
187
        member_of = [m['id'] for m in member_of]
188
        o_validated = semantic_relations_rule(errors, node['member_of'], skos_manager,
189
                                              conceptscheme_id, member_of, id)
190
    if r_validated and n_validated and b_validated:
191
        concept_type_rule(errors, node['narrower'], skos_manager, conceptscheme_id, narrower)
192
        narrower_hierarchy_rule(errors, node['narrower'], skos_manager, conceptscheme_id, cstruct)
193
        concept_type_rule(errors, node['broader'], skos_manager, conceptscheme_id, broader)
194
        broader_hierarchy_rule(errors, node['broader'], skos_manager, conceptscheme_id, cstruct)
195
        concept_type_rule(errors, node['related'], skos_manager, conceptscheme_id, related)
196
197
    if m_validated and o_validated:
198
        members_only_in_collection_rule(errors, node['members'], concept_type, members)
199
        collection_members_unique_rule(errors, node['members'], members)
200
        collection_type_rule(errors, node['member_of'], skos_manager, conceptscheme_id, member_of)
201
        memberof_hierarchy_rule(errors, node['member_of'], skos_manager, conceptscheme_id, cstruct)
202
        members_hierarchy_rule(errors, node['members'], skos_manager, conceptscheme_id, cstruct)
203
204
    if 'matches' in cstruct:
205
        matches = copy.deepcopy(cstruct['matches'])
206
        concept_matches_rule(errors, node['matches'], matches, concept_type)
207
        concept_matches_unique_rule(errors, node['matches'], matches)
208
209
    if 'subordinate_arrays' in cstruct:
210
        subordinate_arrays = copy.deepcopy(cstruct['subordinate_arrays'])
211
        subordinate_arrays = [m['id'] for m in subordinate_arrays]
212
        subordinate_arrays_only_in_concept_rule(errors, node['subordinate_arrays'], concept_type, subordinate_arrays)
213
        subordinate_arrays_type_rule(errors, node['subordinate_arrays'], skos_manager, conceptscheme_id,
214
                                     subordinate_arrays)
215
        subordinate_arrays_hierarchy_rule(errors, node['subordinate_arrays'], skos_manager, conceptscheme_id, cstruct)
216
217
    if 'superordinates' in cstruct:
218
        superordinates = copy.deepcopy(cstruct['superordinates'])
219
        superordinates = [m['id'] for m in superordinates]
220
        superordinates_only_in_concept_rule(errors, node['superordinates'], concept_type, superordinates)
221
        superordinates_type_rule(errors, node['superordinates'], skos_manager, conceptscheme_id, superordinates)
222
        superordinates_hierarchy_rule(errors, node['superordinates'], skos_manager, conceptscheme_id, cstruct)
223
224
    if len(errors) > 0:
225
        raise ValidationError(
226
            'Concept could not be validated',
227
            [e.asdict() for e in errors]
228
        )
229
230
231
def conceptscheme_schema_validator(node, cstruct):
232
    """
233
    This validator validates the incoming conceptscheme labels
234
235
    :param colander.SchemaNode node: The schema that's being used while validating.
236
    :param cstruct: The conceptscheme being validated.
237
    """
238
    request = node.bindings['request']
239
    skos_manager = request.data_managers['skos_manager']
240
    languages_manager = request.data_managers['languages_manager']
241
    errors = []
242
    min_labels_rule(errors, node, cstruct)
243
    if 'labels' in cstruct:
244
        labels = copy.deepcopy(cstruct['labels'])
245
        label_type_rule(errors, node, skos_manager, labels)
246
        label_lang_rule(errors, node, languages_manager, labels)
247
        max_preflabels_rule(errors, node, labels)
248
    if len(errors) > 0:
249
        raise ValidationError(
250
            'ConceptScheme could not be validated',
251
            [e.asdict() for e in errors]
252
        )
253
254
255
def concept_relations_rule(errors, node_location, relations, concept_type):
256
    """
257
    Checks that only concepts have narrower, broader and related relations.
258
    """
259
    if relations is not None and len(relations) > 0 and concept_type != 'concept':
260
        errors.append(colander.Invalid(
261
            node_location,
262
            'Only concepts can have narrower/broader/related relations'
263
        ))
264
265
266
def max_preflabels_rule(errors, node, labels):
267
    """
268
    Checks that there's only one prefLabel for a certain language.
269
    """
270
    preflabel_found = []
271
    for label in labels:
272
        if label['type'] == 'prefLabel':
273
            if label['language'] in preflabel_found:
274
                errors.append(colander.Invalid(
275
                    node['labels'],
276
                    'Only one prefLabel per language allowed.'
277
                ))
278
            else:
279
                preflabel_found.append(label['language'])
280
281
282
def min_labels_rule(errors, node, cstruct):
283
    """
284
    Checks that a label or collection always has a least one label.
285
    """
286
    if 'labels' in cstruct:
287
        labels = copy.deepcopy(cstruct['labels'])
288
        if len(labels) == 0:
289
            errors.append(colander.Invalid(
290
                node['labels'],
291
                'At least one label is necessary'
292
            ))
293
294
295
def label_type_rule(errors, node, skos_manager, labels):
296
    """
297
    Checks that a label has the correct type.
298
    """
299
    label_types = skos_manager.get_all_label_types()
300
    label_types = [label_type.name for label_type in label_types]
301
    for label in labels:
302
        if label['type'] not in label_types:
303
            errors.append(colander.Invalid(
304
                node['labels'],
305
                'Invalid labeltype.'
306
            ))
307
308
309
def label_lang_rule(errors, node, languages_manager, labels):
310
    """
311
    Checks that languages of a label are valid.
312
313
    Checks that they are valid IANA language tags. If the language tag was not
314
    already present in the database, it adds them.
315
    """
316
    for label in labels:
317
        language_tag = label['language']
318
        if not tags.check(language_tag):
319
            errors.append(colander.Invalid(
320
                node['labels'],
321
                'Invalid language tag: %s' % ", ".join([err.message for err in tags.tag(language_tag).errors])
322
            ))
323
        else:
324
            languages_present = languages_manager.count_languages(language_tag)
325
            if not languages_present:
326
                descriptions = ', '.join(tags.description(language_tag))
327
                language_item = Language(id=language_tag, name=descriptions)
328
                languages_manager.save(language_item)
329
330
331
def concept_type_rule(errors, node_location, skos_manager, conceptscheme_id, items):
332
    """
333
    Checks that the targets of narrower, broader and related are concepts and
334
    not collections.
335
    """
336
    for item_concept_id in items:
337
        item_concept = skos_manager.get_thing(item_concept_id, conceptscheme_id)
338
        if item_concept.type != 'concept':
339
            errors.append(colander.Invalid(
340
                node_location,
341
                'A narrower, broader or related concept should always be a concept, not a collection'
342
            ))
343
344
345
def collection_type_rule(errors, node_location, skos_manager, conceptscheme_id, members):
346
    """
347
    Checks that the targets of member_of are collections and not concepts.
348
    """
349
    for member_collection_id in members:
350
        member_collection = skos_manager.get_thing(member_collection_id, conceptscheme_id)
351
        if member_collection.type != 'collection':
352
            errors.append(colander.Invalid(
353
                node_location,
354
                'A member_of parent should always be a collection'
355
            ))
356
357
358
def semantic_relations_rule(errors, node_location, skos_manager, conceptscheme_id, members, collection_id):
359
    """
360
    Checks that the elements in a group of concepts or collections are not the
361
    the group itself, that they actually exist and are within
362
    the same conceptscheme.
363
    """
364
    for member_concept_id in members:
365
        if member_concept_id == collection_id:
366
            errors.append(colander.Invalid(
367
                node_location,
368
                'A concept or collection cannot be related to itself'
369
            ))
370
            return False
371
        try:
372
            skos_manager.get_thing(member_concept_id, conceptscheme_id)
373
        except NoResultFound:
374
            errors.append(colander.Invalid(
375
                node_location,
376
                'Concept not found, check concept_id. Please be aware members should be within one scheme'
377
            ))
378
            return False
379
    return True
380
381
382
def hierarchy_build(skos_manager, conceptscheme_id, property_list, property_hierarchy, property_concept_type,
383
                    property_list_name):
384
    for property_concept_id in property_list:
385
        try:
386
            property_concept = skos_manager.get_thing(property_concept_id, conceptscheme_id)
387
        except NoResultFound:
388
            property_concept = None
389
        if property_concept is not None and (
390
                        property_concept.type == property_concept_type or property_concept_type is None):
391
            property_concepts = [n.concept_id for n in getattr(property_concept, property_list_name)]
392
            for members_id in property_concepts:
393
                property_hierarchy.append(members_id)
394
                hierarchy_build(skos_manager, conceptscheme_id, property_concepts, property_hierarchy,
395
                                property_concept_type, property_list_name)
396
397
398
def hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, property1, property2,
399
                   property2_list_name, concept_type, error_message):
400
    """
401
    Checks that the property1 of a concept are not already in property2 hierarchy
402
403
    """
404
    property2_hierarchy = []
405
    property1_list = []
406
    if property1 in cstruct:
407
        property1_value = copy.deepcopy(cstruct[property1])
408
        property1_list = [m['id'] for m in property1_value]
409
    if property2 in cstruct:
410
        property2_value = copy.deepcopy(cstruct[property2])
411
        property2_list = [m['id'] for m in property2_value]
412
        property2_hierarchy = property2_list
413
        hierarchy_build(skos_manager, conceptscheme_id, property2_list, property2_hierarchy, concept_type,
414
                        property2_list_name)
415
    for broader_concept_id in property1_list:
416
        if broader_concept_id in property2_hierarchy:
417
            errors.append(colander.Invalid(
418
                node_location,
419
                error_message
420
            ))
421
422
423
def broader_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
424
    """
425
    Checks that the broader concepts of a concepts are not alreadt narrower
426
    concepts of that concept.
427
    """
428
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'broader', 'narrower',
429
                   'narrower_concepts', 'concept',
430
                   'The broader concept of a concept must not itself be a narrower concept of the concept being edited.'
431
                   )
432
433
434
def narrower_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
435
    """
436
    Checks that the narrower concepts of a concept are not already broader
437
    concepts of that concept.
438
    """
439
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'narrower', 'broader',
440
                   'broader_concepts', 'concept',
441
                   'The narrower concept of a concept must not itself be a broader concept of the concept being edited.'
442
                   )
443
444
445
def collection_members_unique_rule(errors, node_location, members):
446
    """
447
    Checks that a collection has no duplicate members.
448
    """
449
    if len(members) > len(set(members)):
450
        errors.append(colander.Invalid(
451
            node_location,
452
            'All members of a collection should be unique.'
453
        ))
454
455
456
def members_only_in_collection_rule(errors, node, concept_type, members):
457
    """
458
    Checks that only collections have members.
459
    """
460
    if concept_type != 'collection' and len(members) > 0:
461
        errors.append(colander.Invalid(
462
            node,
463
            'Only collections can have members.'
464
        ))
465
466
467
def memberof_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
468
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'member_of', 'members',
469
                   'members', 'collection',
470
                   'The parent member_of collection of a concept must not itself be a member of the concept being edited.'
471
                   )
472
473
474
def members_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
475
    """
476
    Checks that a collection does not have members that are in themselves
477
    already "parents" of that collection.
478
    """
479
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'members', 'member_of',
480
                   'member_of', 'collection',
481
                   'The item of a members collection must not itself be a parent of the concept/collection being edited.'
482
                   )
483
484
485
def concept_matches_rule(errors, node_location, matches, concept_type):
486
    """
487
    Checks that only concepts have matches.
488
    """
489
    if matches is not None and len(matches) > 0 and concept_type != 'concept':
490
        errors.append(colander.Invalid(
491
            node_location,
492
            'Only concepts can have matches'
493
        ))
494
495
496
def concept_matches_unique_rule(errors, node_location, matches):
497
    """
498
    Checks that a concept has not duplicate matches.
499
500
    This means that a concept can only have one match (no matter what the type)
501
    with another concept. We don't allow eg. a concept that has both a broadMatch
502
    and a relatedMatch with the same concept.
503
    """
504
    if matches is not None:
505
        uri_list = []
506
        for matchtype in matches:
507
            uri_list.extend([uri for uri in matches[matchtype]])
508
        if len(uri_list) > len(set(uri_list)):
509
            errors.append(colander.Invalid(
510
                node_location,
511
                'All matches of a concept should be unique.'
512
            ))
513
514
515
def languagetag_validator(node, cstruct):
516
    """
517
    This validator validates a languagetag.
518
519
    The validator will check if a tag is a valid IANA language tag. The the
520
    validator is informed that this should be a new language tag, it will also
521
    check if the tag doesn't already exist.
522
523
    :param colander.SchemaNode node: The schema that's being used while validating.
524
    :param cstruct: The value being validated.
525
    """
526
    request = node.bindings['request']
527
    languages_manager = request.data_managers['languages_manager']
528
    new = node.bindings['new']
529
    errors = []
530
    language_tag = cstruct['id']
531
532
    if new:
533
        languagetag_checkduplicate(node['id'], language_tag, languages_manager, errors)
534
    languagetag_isvalid_rule(node['id'], language_tag, errors)
535
536
    if len(errors) > 0:
537
        raise ValidationError(
538
            'Language could not be validated',
539
            [e.asdict() for e in errors]
540
        )
541
542
543
def languagetag_isvalid_rule(node, language_tag, errors):
544
    """
545
    Check that a languagetag is a valid IANA language tag.
546
    """
547
    if not tags.check(language_tag):
548
        errors.append(colander.Invalid(
549
            node,
550
            'Invalid language tag: %s' % ", ".join([err.message for err in tags.tag(language_tag).errors])
551
        ))
552
553
554
def languagetag_checkduplicate(node, language_tag, languages_manager, errors):
555
    """
556
    Check that a languagetag isn't duplicated.
557
    """
558
    language_present = languages_manager.count_languages(language_tag)
559
    if language_present:
560
        errors.append(colander.Invalid(
561
            node,
562
            'Duplicate language tag: %s' % language_tag)
563
        )
564
565
566
def subordinate_arrays_only_in_concept_rule(errors, node, concept_type, subordinate_arrays):
567
    """
568
    Checks that only a concept has subordinate arrays.
569
    """
570
    if concept_type != 'concept' and len(subordinate_arrays) > 0:
571
        errors.append(colander.Invalid(
572
            node,
573
            'Only concept can have subordinate arrays.'
574
        ))
575
576
577
def subordinate_arrays_type_rule(errors, node_location, skos_manager, conceptscheme_id, subordinate_arrays):
578
    """
579
    Checks that subordinate arrays are always collections.
580
    """
581
    for subordinate_id in subordinate_arrays:
582
        subordinate = skos_manager.get_thing(subordinate_id, conceptscheme_id)
583
        if subordinate.type != 'collection':
584
            errors.append(colander.Invalid(
585
                node_location,
586
                'A subordinate array should always be a collection'
587
            ))
588
589
590
def subordinate_arrays_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
591
    """
592
    Checks that the subordinate arrays of a concept are not themselves
593
    parents of that concept.
594
    """
595
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'subordinate_arrays', 'member_of',
596
                   'members', 'collection',
597
                   'The subordinate_array collection of a concept must not itself be a parent of the concept being edited.'
598
                   )
599
600
601
def superordinates_only_in_concept_rule(errors, node, concept_type, superordinates):
602
    """
603
    Checks that only collections have superordinates.
604
    """
605
    if concept_type != 'collection' and len(superordinates) > 0:
606
        errors.append(colander.Invalid(
607
            node,
608
            'Only collection can have superordinates.'
609
        ))
610
611
612
def superordinates_type_rule(errors, node_location, skos_manager, conceptscheme_id, superordinates):
613
    """
614
    Checks that superordinates are always concepts.
615
    """
616
    for superordinate_id in superordinates:
617
        superordinate = skos_manager.get_thing(superordinate_id, conceptscheme_id)
618
        if superordinate.type != 'concept':
619
            errors.append(colander.Invalid(
620
                node_location,
621
                'A superordinate should always be a concept'
622
            ))
623
624
625
def superordinates_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct):
626
    """
627
    Checks that the superordinate concepts of a collection are not themselves
628
    members of that collection.
629
    """
630
    hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'superordinates', 'members',
631
                   'members', 'collection',
632
                   'The superordinates of a collection must not itself be a member of the collection being edited.'
633
                   )
634