GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — develop-v1.6.0 ( 9d5181...7efb31 )
by
unknown
04:49
created

modify_schema_allow_default_none()   F

Complexity

Conditions 18

Size

Total Lines 47

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 18
dl 0
loc 47
rs 2.5878
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like modify_schema_allow_default_none() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import copy
18
19
import six
20
import jsonschema
21
from jsonschema import _validators
22
from jsonschema.validators import create
23
24
from st2common.exceptions.action import InvalidActionParameterException
25
from st2common.util import jsonify
26
from st2common.util.misc import deep_update
27
28
__all__ = [
29
    'get_validator',
30
    'get_draft_schema',
31
    'get_action_parameters_schema',
32
    'get_schema_for_action_parameters',
33
    'get_schema_for_resource_parameters',
34
    'is_property_type_single',
35
    'is_property_type_list',
36
    'is_property_type_anyof',
37
    'is_property_type_oneof',
38
    'is_property_nullable',
39
    'is_attribute_type_array',
40
    'is_attribute_type_object',
41
    'validate'
42
]
43
44
# https://github.com/json-schema/json-schema/blob/master/draft-04/schema
45
# The source material is licensed under the AFL or BSD license.
46
# Both draft 4 and custom schema has additionalProperties set to false by default.
47
# The custom schema differs from draft 4 with the extension of position, immutable,
48
# and draft 3 version of required.
49
PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)))
50
SCHEMAS = {
51
    'draft4': jsonify.load_file(os.path.join(PATH, 'draft4.json')),
52
    'custom': jsonify.load_file(os.path.join(PATH, 'custom.json')),
53
54
    # Custom schema for action params which doesn't allow parameter "type" attribute to be array
55
    'action_params': jsonify.load_file(os.path.join(PATH, 'action_params.json'))
56
}
57
58
SCHEMA_ANY_TYPE = {
59
    "anyOf": [
60
        {"type": "array"},
61
        {"type": "boolean"},
62
        {"type": "integer"},
63
        {"type": "number"},
64
        {"type": "object"},
65
        {"type": "string"}
66
    ]
67
}
68
69
RUNNER_PARAM_OVERRIDABLE_ATTRS = [
70
    'default',
71
    'description',
72
    'enum',
73
    'immutable',
74
    'required'
75
]
76
77
78
def get_draft_schema(version='custom', additional_properties=False):
79
    schema = copy.deepcopy(SCHEMAS[version])
80
    if additional_properties and 'additionalProperties' in schema:
81
        del schema['additionalProperties']
82
    return schema
83
84
85
def get_action_parameters_schema(additional_properties=False):
86
    """
87
    Return a generic schema which is used for validating action parameters definition.
88
    """
89
    return get_draft_schema(version='action_params', additional_properties=additional_properties)
90
91
92
CustomValidator = create(
93
    meta_schema=get_draft_schema(version='custom', additional_properties=True),
94
    validators={
95
        u"$ref": _validators.ref,
96
        u"additionalItems": _validators.additionalItems,
97
        u"additionalProperties": _validators.additionalProperties,
98
        u"allOf": _validators.allOf_draft4,
99
        u"anyOf": _validators.anyOf_draft4,
100
        u"dependencies": _validators.dependencies,
101
        u"enum": _validators.enum,
102
        u"format": _validators.format,
103
        u"items": _validators.items,
104
        u"maxItems": _validators.maxItems,
105
        u"maxLength": _validators.maxLength,
106
        u"maxProperties": _validators.maxProperties_draft4,
107
        u"maximum": _validators.maximum,
108
        u"minItems": _validators.minItems,
109
        u"minLength": _validators.minLength,
110
        u"minProperties": _validators.minProperties_draft4,
111
        u"minimum": _validators.minimum,
112
        u"multipleOf": _validators.multipleOf,
113
        u"not": _validators.not_draft4,
114
        u"oneOf": _validators.oneOf_draft4,
115
        u"pattern": _validators.pattern,
116
        u"patternProperties": _validators.patternProperties,
117
        u"properties": _validators.properties_draft3,
118
        u"type": _validators.type_draft4,
119
        u"uniqueItems": _validators.uniqueItems,
120
    },
121
    version="custom_validator",
122
)
123
124
125
def is_property_type_single(property_schema):
126
    return (isinstance(property_schema, dict) and
127
            'anyOf' not in property_schema.keys() and
128
            'oneOf' not in property_schema.keys() and
129
            not isinstance(property_schema.get('type', 'string'), list))
130
131
132
def is_property_type_list(property_schema):
133
    return (isinstance(property_schema, dict) and
134
            isinstance(property_schema.get('type', 'string'), list))
135
136
137
def is_property_type_anyof(property_schema):
138
    return isinstance(property_schema, dict) and 'anyOf' in property_schema.keys()
139
140
141
def is_property_type_oneof(property_schema):
142
    return isinstance(property_schema, dict) and 'oneOf' in property_schema.keys()
143
144
145
def is_property_nullable(property_type_schema):
146
    # For anyOf and oneOf, the property_schema is a list of types.
147
    if isinstance(property_type_schema, list):
148
        return len([t for t in property_type_schema
149
                    if ((isinstance(t, six.string_types) and t == 'null') or
150
                        (isinstance(t, dict) and t.get('type', 'string') == 'null'))]) > 0
151
152
    return (isinstance(property_type_schema, dict) and
153
            property_type_schema.get('type', 'string') == 'null')
154
155
156
def is_attribute_type_array(attribute_type):
157
    return (attribute_type == 'array' or
158
            (isinstance(attribute_type, list) and 'array' in attribute_type))
159
160
161
def is_attribute_type_object(attribute_type):
162
    return (attribute_type == 'object' or
163
            (isinstance(attribute_type, list) and 'object' in attribute_type))
164
165
166
def assign_default_values(instance, schema):
167
    """
168
    Assign default values on the provided instance based on the schema default specification.
169
    """
170
    instance = copy.deepcopy(instance)
171
    instance_is_dict = isinstance(instance, dict)
172
    instance_is_array = isinstance(instance, list)
173
174
    if not instance_is_dict and not instance_is_array:
175
        return instance
176
177
    properties = schema.get('properties', {})
178
179
    for property_name, property_data in six.iteritems(properties):
180
        has_default_value = 'default' in property_data
181
        default_value = property_data.get('default', None)
182
183
        # Assign default value on the instance so the validation doesn't fail if requires is true
184
        # but the value is not provided
185
        if has_default_value:
186
            if instance_is_dict and instance.get(property_name, None) is None:
187
                instance[property_name] = default_value
188
            elif instance_is_array:
189
                for index, _ in enumerate(instance):
190
                    if instance[index].get(property_name, None) is None:
191
                        instance[index][property_name] = default_value
192
193
        # Support for nested properties (array and object)
194
        attribute_type = property_data.get('type', None)
195
        schema_items = property_data.get('items', {})
196
197
        # Array
198
        if (is_attribute_type_array(attribute_type) and
199
                schema_items and schema_items.get('properties', {})):
200
            array_instance = instance.get(property_name, None)
201
            array_schema = schema['properties'][property_name]['items']
202
203
            if array_instance is not None:
204
                # Note: We don't perform subschema assignment if no value is provided
205
                instance[property_name] = assign_default_values(instance=array_instance,
206
                                                                schema=array_schema)
207
208
        # Object
209
        if is_attribute_type_object(attribute_type) and property_data.get('properties', {}):
210
            object_instance = instance.get(property_name, None)
211
            object_schema = schema['properties'][property_name]
212
213
            if object_instance is not None:
214
                # Note: We don't perform subschema assignment if no value is provided
215
                instance[property_name] = assign_default_values(instance=object_instance,
216
                                                                schema=object_schema)
217
218
    return instance
219
220
221
def modify_schema_allow_default_none(schema):
222
    """
223
    Manipulate the provided schema so None is also an allowed value for each attribute which
224
    defines a default value of None.
225
    """
226
    schema = copy.deepcopy(schema)
227
    properties = schema.get('properties', {})
228
229
    for property_name, property_data in six.iteritems(properties):
230
        is_optional = not property_data.get('required', False)
231
        has_default_value = 'default' in property_data
232
        default_value = property_data.get('default', None)
233
        property_schema = schema['properties'][property_name]
234
235
        if (has_default_value or is_optional) and default_value is None:
236
            # If property is anyOf and oneOf then it has to be process differently.
237
            if (is_property_type_anyof(property_schema) and
238
                    not is_property_nullable(property_schema['anyOf'])):
239
                property_schema['anyOf'].append({'type': 'null'})
240
            elif (is_property_type_oneof(property_schema) and
241
                    not is_property_nullable(property_schema['oneOf'])):
242
                property_schema['oneOf'].append({'type': 'null'})
243
            elif (is_property_type_list(property_schema) and
244
                    not is_property_nullable(property_schema.get('type'))):
245
                property_schema['type'].append('null')
246
            elif (is_property_type_single(property_schema) and
247
                    not is_property_nullable(property_schema.get('type'))):
248
                property_schema['type'] = [property_schema.get('type', 'string'), 'null']
249
250
        # Support for nested properties (array and object)
251
        attribute_type = property_data.get('type', None)
252
        schema_items = property_data.get('items', {})
253
254
        # Array
255
        if (is_attribute_type_array(attribute_type) and
256
                schema_items and schema_items.get('properties', {})):
257
            array_schema = schema_items
258
            array_schema = modify_schema_allow_default_none(schema=array_schema)
259
            schema['properties'][property_name]['items'] = array_schema
260
261
        # Object
262
        if is_attribute_type_object(attribute_type) and property_data.get('properties', {}):
263
            object_schema = property_data
264
            object_schema = modify_schema_allow_default_none(schema=object_schema)
265
            schema['properties'][property_name] = object_schema
266
267
    return schema
268
269
270
def validate(instance, schema, cls=None, use_default=True, allow_default_none=False, *args,
271
             **kwargs):
272
    """
273
    Custom validate function which supports default arguments combined with the "required"
274
    property.
275
276
    Note: This function returns cleaned instance with default values assigned.
277
278
    :param use_default: True to support the use of the optional "default" property.
279
    :type use_default: ``bool``
280
    """
281
282
    instance = copy.deepcopy(instance)
283
    schema_type = schema.get('type', None)
284
    instance_is_dict = isinstance(instance, dict)
285
286
    if use_default and allow_default_none:
287
        schema = modify_schema_allow_default_none(schema=schema)
288
289
    if use_default and schema_type == 'object' and instance_is_dict:
290
        instance = assign_default_values(instance=instance, schema=schema)
291
292
    # pylint: disable=assignment-from-no-return
293
    jsonschema.validate(instance=instance, schema=schema, cls=cls, *args, **kwargs)
294
295
    return instance
296
297
298
VALIDATORS = {
299
    'draft4': jsonschema.Draft4Validator,
300
    'custom': CustomValidator
301
}
302
303
304
def get_validator(version='custom'):
305
    validator = VALIDATORS[version]
306
    return validator
307
308
309
def validate_runner_parameter_attribute_override(action_ref, param_name, attr_name,
310
                                                 runner_param_attr_value, action_param_attr_value):
311
    """
312
    Validate that the provided parameter from the action schema can override the
313
    runner parameter.
314
    """
315
    param_values_are_the_same = action_param_attr_value == runner_param_attr_value
316
    if (attr_name not in RUNNER_PARAM_OVERRIDABLE_ATTRS and not param_values_are_the_same):
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after if.
Loading history...
317
        raise InvalidActionParameterException(
318
            'The attribute "%s" for the runner parameter "%s" in action "%s" '
319
            'cannot be overridden.' % (attr_name, param_name, action_ref))
320
321
    return True
322
323
324
def get_schema_for_action_parameters(action_db):
325
    """
326
    Dynamically construct JSON schema for the provided action from the parameters metadata.
327
328
    Note: This schema is used to validate parameters which are passed to the action.
329
    """
330
    from st2common.util.action_db import get_runnertype_by_name
331
    runner_type = get_runnertype_by_name(action_db.runner_type['name'])
332
333
    # Note: We need to perform a deep merge because user can only specify a single parameter
334
    # attribute when overriding it in an action metadata.
335
    parameters_schema = {}
336
    deep_update(parameters_schema, runner_type.runner_parameters)
337
    deep_update(parameters_schema, action_db.parameters)
338
339
    # Perform validation, make sure user is not providing parameters which can't
340
    # be overriden
341
    runner_parameter_names = runner_type.runner_parameters.keys()
342
343
    for name, schema in six.iteritems(action_db.parameters):
344
        if name not in runner_parameter_names:
345
            continue
346
347
        for attribute, value in six.iteritems(schema):
348
            runner_param_value = runner_type.runner_parameters[name].get(attribute)
349
            validate_runner_parameter_attribute_override(action_ref=action_db.ref,
350
                                                         param_name=name,
351
                                                         attr_name=attribute,
352
                                                         runner_param_attr_value=runner_param_value,
353
                                                         action_param_attr_value=value)
354
355
    schema = get_schema_for_resource_parameters(parameters_schema=parameters_schema)
356
357
    if parameters_schema:
358
        schema['title'] = action_db.name
359
        if action_db.description:
360
            schema['description'] = action_db.description
361
362
    return schema
363
364
365
def get_schema_for_resource_parameters(parameters_schema, allow_additional_properties=False):
366
    """
367
    Dynamically construct JSON schema for the provided resource from the parameters metadata.
368
    """
369
    def normalize(x):
370
        return {k: v if v else SCHEMA_ANY_TYPE for k, v in six.iteritems(x)}
371
372
    schema = {}
373
    properties = {}
374
    properties.update(normalize(parameters_schema))
375
    if properties:
376
        schema['type'] = 'object'
377
        schema['properties'] = properties
378
        schema['additionalProperties'] = allow_additional_properties
379
380
    return schema
381