Completed
Push — master ( d1f0a7...3b2ece )
by Edward
21:04 queued 05:38
created

st2common.util.schema.modify_schema_allow_default_none()   F

Complexity

Conditions 13

Size

Total Lines 38

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 38
rs 2.7716
cc 13

How to fix   Complexity   

Complexity

Complex classes like st2common.util.schema.modify_schema_allow_default_none() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import copy
18
19
import six
20
import jsonschema
21
from jsonschema import _validators
22
from jsonschema.validators import create
23
24
from st2common.util import jsonify
25
26
__all__ = [
27
    'get_validator',
28
    'get_draft_schema',
29
    'get_action_parameters_schema',
30
    'get_schema_for_action_parameters',
31
    'get_schema_for_resource_parameters',
32
    'validate'
33
]
34
35
# https://github.com/json-schema/json-schema/blob/master/draft-04/schema
36
# The source material is licensed under the AFL or BSD license.
37
# Both draft 4 and custom schema has additionalProperties set to false by default.
38
# The custom schema differs from draft 4 with the extension of position, immutable,
39
# and draft 3 version of required.
40
PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)))
41
SCHEMAS = {
42
    'draft4': jsonify.load_file(os.path.join(PATH, 'draft4.json')),
43
    'custom': jsonify.load_file(os.path.join(PATH, 'custom.json')),
44
45
    # Custom schema for action params which doesn't allow parameter "type" attribute to be array
46
    'action_params': jsonify.load_file(os.path.join(PATH, 'action_params.json'))
47
}
48
49
SCHEMA_ANY_TYPE = {
50
    "anyOf": [
51
        {"type": "array"},
52
        {"type": "boolean"},
53
        {"type": "integer"},
54
        {"type": "number"},
55
        {"type": "object"},
56
        {"type": "string"}
57
    ]
58
}
59
60
61
def get_draft_schema(version='custom', additional_properties=False):
62
    schema = copy.deepcopy(SCHEMAS[version])
63
    if additional_properties and 'additionalProperties' in schema:
64
        del schema['additionalProperties']
65
    return schema
66
67
68
def get_action_parameters_schema(additional_properties=False):
69
    """
70
    Return a generic schema which is used for validating action parameters definition.
71
    """
72
    return get_draft_schema(version='action_params', additional_properties=additional_properties)
73
74
75
CustomValidator = create(
76
    meta_schema=get_draft_schema(version='custom', additional_properties=True),
77
    validators={
78
        u"$ref": _validators.ref,
79
        u"additionalItems": _validators.additionalItems,
80
        u"additionalProperties": _validators.additionalProperties,
81
        u"allOf": _validators.allOf_draft4,
82
        u"anyOf": _validators.anyOf_draft4,
83
        u"dependencies": _validators.dependencies,
84
        u"enum": _validators.enum,
85
        u"format": _validators.format,
86
        u"items": _validators.items,
87
        u"maxItems": _validators.maxItems,
88
        u"maxLength": _validators.maxLength,
89
        u"maxProperties": _validators.maxProperties_draft4,
90
        u"maximum": _validators.maximum,
91
        u"minItems": _validators.minItems,
92
        u"minLength": _validators.minLength,
93
        u"minProperties": _validators.minProperties_draft4,
94
        u"minimum": _validators.minimum,
95
        u"multipleOf": _validators.multipleOf,
96
        u"not": _validators.not_draft4,
97
        u"oneOf": _validators.oneOf_draft4,
98
        u"pattern": _validators.pattern,
99
        u"patternProperties": _validators.patternProperties,
100
        u"properties": _validators.properties_draft3,
101
        u"type": _validators.type_draft4,
102
        u"uniqueItems": _validators.uniqueItems,
103
    },
104
    version="custom_validator",
105
)
106
107
108
def assign_default_values(instance, schema):
109
    """
110
    Assign default values on the provided instance based on the schema default specification.
111
    """
112
    instance = copy.deepcopy(instance)
113
    instance_is_dict = isinstance(instance, dict)
114
    instance_is_array = isinstance(instance, list)
115
116
    if not instance_is_dict and not instance_is_array:
117
        return instance
118
119
    properties = schema.get('properties', {})
120
    for property_name, property_data in six.iteritems(properties):
121
        has_default_value = 'default' in property_data
122
        default_value = property_data.get('default', None)
123
124
        # Assign default value on the instance so the validation doesn't fail if requires is true
125
        # but the value is not provided
126
        if has_default_value:
127
            if instance_is_dict and instance.get(property_name, None) is None:
128
                instance[property_name] = default_value
129
            elif instance_is_array:
130
                for index, _ in enumerate(instance):
131
                    if instance[index].get(property_name, None) is None:
132
                        instance[index][property_name] = default_value
133
134
        # Support for nested properties (array and object)
135
        attribute_type = property_data.get('type', None)
136
        schema_items = property_data.get('items', {})
137
138
        # Array
139
        if attribute_type == 'array' and schema_items and schema_items.get('properties', {}):
140
            array_instance = instance.get(property_name, None)
141
            array_schema = schema['properties'][property_name]['items']
142
143
            if array_instance is not None:
144
                # Note: We don't perform subschema assignment if no value is provided
145
                instance[property_name] = assign_default_values(instance=array_instance,
146
                                                                schema=array_schema)
147
148
        # Object
149
        if attribute_type == 'object' and property_data.get('properties', {}):
150
            object_instance = instance.get(property_name, None)
151
            object_schema = schema['properties'][property_name]
152
153
            if object_instance is not None:
154
                # Note: We don't perform subschema assignment if no value is provided
155
                instance[property_name] = assign_default_values(instance=object_instance,
156
                                                                schema=object_schema)
157
158
    return instance
159
160
161
def modify_schema_allow_default_none(schema):
162
    """
163
    Manipulate the provided schema so None is also an allowed value for each attribute which
164
    defines a default value of None.
165
    """
166
    schema = copy.deepcopy(schema)
167
168
    properties = schema.get('properties', {})
169
    for property_name, property_data in six.iteritems(properties):
170
        has_default_value = 'default' in property_data
171
        default_value = property_data.get('default', None)
172
173
        if has_default_value and default_value is None:
174
            # Allow "None" to be also used as a valid attribute value
175
            property_type = schema['properties'][property_name]['type']
176
177
            if isinstance(property_type, list) and 'null' not in property_type:
178
                schema['properties'][property_name]['type'].append('null')
179
            elif isinstance(property_type, six.string_types) and 'null' not in property_type:
180
                schema['properties'][property_name]['type'] = [property_type, 'null']
181
182
        # Support for nested properties (array and object)
183
        attribute_type = property_data.get('type', None)
184
        schema_items = property_data.get('items', {})
185
186
        # Array
187
        if attribute_type == 'array' and schema_items and schema_items.get('properties', {}):
188
            array_schema = schema_items
189
            array_schema = modify_schema_allow_default_none(schema=array_schema)
190
            schema['properties'][property_name]['items'] = array_schema
191
192
        # Object
193
        if attribute_type == 'object' and property_data.get('properties', {}):
194
            object_schema = property_data
195
            object_schema = modify_schema_allow_default_none(schema=object_schema)
196
            schema['properties'][property_name] = object_schema
197
198
    return schema
199
200
201
def validate(instance, schema, cls=None, use_default=True, allow_default_none=False, *args,
202
             **kwargs):
203
    """
204
    Custom validate function which supports default arguments combined with the "required"
205
    property.
206
207
    Note: This function returns cleaned instance with default values assigned.
208
209
    :param use_default: True to support the use of the optional "default" property.
210
    :type use_default: ``bool``
211
    """
212
    instance = copy.deepcopy(instance)
213
    schema_type = schema.get('type', None)
214
    instance_is_dict = isinstance(instance, dict)
215
216
    if use_default and allow_default_none:
217
        schema = modify_schema_allow_default_none(schema=schema)
218
219
    if use_default and schema_type == 'object' and instance_is_dict:
220
        instance = assign_default_values(instance=instance, schema=schema)
221
222
    # pylint: disable=assignment-from-no-return
223
    jsonschema.validate(instance=instance, schema=schema, cls=cls, *args, **kwargs)
224
    return instance
225
226
227
VALIDATORS = {
228
    'draft4': jsonschema.Draft4Validator,
229
    'custom': CustomValidator
230
}
231
232
233
def get_validator(version='custom'):
234
    validator = VALIDATORS[version]
235
    return validator
236
237
238
def get_schema_for_action_parameters(action_db):
239
    """
240
    Dynamically construct JSON schema for the provided action from the parameters metadata.
241
242
    Note: This schema is used to validate parameters which are passed to the action.
243
    """
244
    from st2common.util.action_db import get_runnertype_by_name
245
    runner_type = get_runnertype_by_name(action_db.runner_type['name'])
246
247
    parameters_schema = {}
248
    parameters_schema.update(runner_type.runner_parameters)
249
    parameters_schema.update(action_db.parameters)
250
251
    schema = get_schema_for_resource_parameters(parameters_schema=parameters_schema)
252
253
    if parameters_schema:
254
        schema['title'] = action_db.name
255
        if action_db.description:
256
            schema['description'] = action_db.description
257
258
    return schema
259
260
261
def get_schema_for_resource_parameters(parameters_schema):
262
    """
263
    Dynamically construct JSON schema for the provided resource from the parameters metadata.
264
    """
265
    def normalize(x):
266
        return {k: v if v else SCHEMA_ANY_TYPE for k, v in six.iteritems(x)}
267
268
    schema = {}
269
    properties = {}
270
    properties.update(normalize(parameters_schema))
271
    if properties:
272
        schema['type'] = 'object'
273
        schema['properties'] = properties
274
        schema['additionalProperties'] = False
275
276
    return schema
277