Completed
Push — master ( f62e78...ac5142 )
by Satoru
24s
created

_validate()   B

Complexity

Conditions 4

Size

Total Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
c 0
b 0
f 0
dl 0
loc 22
rs 8.9197
1
#
2
# Copyright (C) 2015 - 2017 Satoru SATOH <ssato redhat.com>
3
# License: MIT
4
#
5
"""anyconfig.schema module.
6
7
.. versionchanged:: 0.9.4
8
   Change parameter passed to :func:`validate`, s/.*safe/ac_schema_safe/g
9
10
.. versionchanged:: 0.8.3
11
   Replace format_checker with cls option
12
13
.. versionchanged:: 0.7.0
14
   allow passing `ac_schema_strict` to API :func:`gen_schema` to generate more
15
   strict and precise JSON schema object
16
17
.. versionadded:: 0.0.11
18
   Added new API :func:`gen_schema` to generate schema object
19
20
.. versionadded:: 0.0.10
21
   Added new API :func:`validate` to validate config with JSON schema
22
"""
23
from __future__ import absolute_import
24
try:
25
    import jsonschema
26
except ImportError:
27
    pass
28
29
import anyconfig.compat
30
import anyconfig.utils
31
32
33
_NA_MSG = "Validation module (jsonschema) is not available"
34
35
_SIMPLETYPE_MAP = {list: "array", tuple: "array",
36
                   bool: "boolean",
37
                   int: "integer", float: "number",
38
                   dict: "object",
39
                   str: "string"}
40
_SIMPLE_TYPES = (bool, int, float, str)
41
42
if not anyconfig.compat.IS_PYTHON_3:
43
    try:
44
        _SIMPLETYPE_MAP[unicode] = "string"
45
        _SIMPLE_TYPES = (bool, int, float, str, unicode)
46
    except NameError:
47
        pass
48
49
50
def _validate_all(data, schema):
51
    """
52
    See the descritpion of :func:`validate` for more details of parameters and
53
    return value.
54
55
    :seealso: https://python-jsonschema.readthedocs.io/en/latest/validate/,
56
    a section of 'iter_errors' especially
57
    """
58
    try:
59
        vldtr = jsonschema.Draft4Validator(schema)  # :raises: SchemaError, ...
60
        errors = list(vldtr.iter_errors(data))
61
62
        return (not errors, [err.message for err in errors])
63
64
    except NameError:
65
        return (True, _NA_MSG)
66
67
    return (True, '')
68
69
70
def _validate(data, schema, ac_schema_safe=True, **options):
71
    """
72
    See the descritpion of :func:`validate` for more details of parameters and
73
    return value.
74
75
    Validate target object `data` with given schema object.
76
    """
77
    try:
78
        jsonschema.validate(data, schema, **options)
79
        return (True, '')
80
81
    except NameError:
82
        return (True, _NA_MSG)
83
84
    except (jsonschema.ValidationError, jsonschema.SchemaError,
85
            Exception) as exc:
86
        if ac_schema_safe:
87
            return (False, str(exc))  # Validation was failed.
88
        else:
89
            raise
90
91
    return (True, '')
92
93
94
def validate(data, schema, ac_schema_safe=True, ac_schema_errors=False,
95
             **options):
96
    """
97
    Validate target object with given schema object, loaded from JSON schema.
98
99
    See also: https://python-jsonschema.readthedocs.org/en/latest/validate/
100
101
    :parae data: Target object (a dict or a dict-like object) to validate
102
    :param schema: Schema object (a dict or a dict-like object)
103
        instantiated from schema JSON file or schema JSON string
104
    :param options: Other keyword options such as:
105
106
        - ac_schema_safe: Exception (jsonschema.ValidationError or
107
          jsonschema.SchemaError or others) will be thrown during validation
108
          process due to any validation or related errors. However, these will
109
          be catched by default, and will be re-raised if `ac_safe` is False.
110
111
        - ac_schema_errors: Lazily yield each of the validation errors and
112
          returns all of them if validation fails.
113
114
    :return: (True if validation succeeded else False, error message[s])
115
    """
116
    options = anyconfig.utils.filter_options(("cls", ), options)
117
    if ac_schema_errors:
118
        return _validate_all(data, schema, **options)
119
120
    return _validate(data, schema, ac_schema_safe, **options)
121
122
123
def _process_options(**options):
124
    """
125
    Helper function to process keyword arguments passed to gen_schema.
126
127
    :return: A tuple of (typemap :: dict, strict :: bool)
128
    """
129
    return (options.get("ac_schema_typemap", _SIMPLETYPE_MAP),
130
            bool(options.get("ac_schema_strict", False)))
131
132
133
def array_to_schema(arr, **options):
134
    """
135
    Generate a JSON schema object with type annotation added for given object.
136
137
    :param arr: Array of dict or MergeableDict objects
138
    :param options: Other keyword options such as:
139
140
        - ac_schema_strict: True if more strict (precise) schema is needed
141
        - ac_schema_typemap: Type to JSON schema type mappings
142
143
    :return: Another MergeableDict instance represents JSON schema of items
144
    """
145
    (typemap, strict) = _process_options(**options)
146
147
    arr = list(arr)
148
    scm = dict(type=typemap[list],
149
               items=gen_schema(arr[0] if arr else "str", **options))
150
    if strict:
151
        nitems = len(arr)
152
        scm["minItems"] = nitems
153
        scm["uniqueItems"] = len(set(arr)) == nitems
154
155
    return scm
156
157
158
def object_to_schema(obj, **options):
159
    """
160
    Generate a node represents JSON schema object with type annotation added
161
    for given object node.
162
163
    :param obj: Dict or MergeableDict object
164
    :param options: Other keyword options such as:
165
166
        - ac_schema_strict: True if more strict (precise) schema is needed
167
        - ac_schema_typemap: Type to JSON schema type mappings
168
169
    :yield: Another MergeableDict instance represents JSON schema of object
170
    """
171
    (typemap, strict) = _process_options(**options)
172
173
    props = dict((k, gen_schema(v, **options)) for k, v in obj.items())
174
    scm = dict(type=typemap[dict], properties=props)
175
    if strict:
176
        scm["required"] = sorted(props.keys())
177
178
    return scm
179
180
181
def gen_schema(data, **options):
182
    """
183
    Generate a node represents JSON schema object with type annotation added
184
    for given object node.
185
186
    :param data: Configuration data object (dict[-like] or namedtuple)
187
    :param options: Other keyword options such as:
188
189
        - ac_schema_strict: True if more strict (precise) schema is needed
190
        - ac_schema_typemap: Type to JSON schema type mappings
191
192
    :return: A dict represents JSON schema of this node
193
    """
194
    if data is None:
195
        return dict(type="null")
196
197
    _type = type(data)
198
199
    if _type in _SIMPLE_TYPES:
200
        typemap = options.get("ac_schema_typemap", _SIMPLETYPE_MAP)
201
        scm = dict(type=typemap[_type])
202
203
    elif isinstance(data, dict):
204
        scm = object_to_schema(data, **options)
205
206
    elif _type in (list, tuple) or hasattr(data, "__iter__"):
207
        scm = array_to_schema(data, **options)
208
209
    return scm
210
211
# vim:sw=4:ts=4:et:
212