Passed
Push — develop ( f534b1...a82689 )
by Plexxi
06:09 queued 03:13
created

_handle_schedule_execution()   C

Complexity

Conditions 7

Size

Total Lines 36

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
dl 0
loc 36
rs 5.5
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import re
18
import httplib
19
import sys
20
import traceback
21
22
import jsonschema
23
from oslo_config import cfg
24
from six.moves import http_client
25
26
from st2api.controllers.base import BaseRestControllerMixin
27
from st2api.controllers.resource import ResourceController
28
from st2api.controllers.v1.executionviews import ExecutionViewsController
29
from st2api.controllers.v1.executionviews import SUPPORTED_FILTERS
30
from st2common import log as logging
31
from st2common.constants.action import LIVEACTION_STATUS_CANCELED, LIVEACTION_STATUS_FAILED
32
from st2common.constants.action import LIVEACTION_CANCELABLE_STATES
33
from st2common.exceptions.param import ParamException
34
from st2common.exceptions.apivalidation import ValueValidationException
35
from st2common.exceptions.trace import TraceNotFoundException
36
from st2common.models.api.action import LiveActionAPI
37
from st2common.models.api.action import LiveActionCreateAPI
38
from st2common.models.api.base import cast_argument_value
39
from st2common.models.api.execution import ActionExecutionAPI
40
from st2common.models.db.auth import UserDB
41
from st2common.persistence.liveaction import LiveAction
42
from st2common.persistence.execution import ActionExecution
43
from st2common.router import abort
44
from st2common.router import Response
45
from st2common.services import action as action_service
46
from st2common.services import executions as execution_service
47
from st2common.services import trace as trace_service
48
from st2common.util import isotime
49
from st2common.util import action_db as action_utils
50
from st2common.util import param as param_utils
51
from st2common.util.jsonify import try_loads
52
from st2common.rbac.types import PermissionType
53
from st2common.rbac import utils as rbac_utils
54
from st2common.rbac.utils import assert_user_has_resource_db_permission
55
from st2common.rbac.utils import assert_user_is_admin_if_user_query_param_is_provided
56
57
__all__ = [
58
    'ActionExecutionsController'
59
]
60
61
LOG = logging.getLogger(__name__)
62
63
# Note: We initialize filters here and not in the constructor
64
SUPPORTED_EXECUTIONS_FILTERS = copy.deepcopy(SUPPORTED_FILTERS)
65
SUPPORTED_EXECUTIONS_FILTERS.update({
66
    'timestamp_gt': 'start_timestamp.gt',
67
    'timestamp_lt': 'start_timestamp.lt'
68
})
69
70
MONITOR_THREAD_EMPTY_Q_SLEEP_TIME = 5
71
MONITOR_THREAD_NO_WORKERS_SLEEP_TIME = 1
72
73
74
class ActionExecutionsControllerMixin(BaseRestControllerMixin):
75
    """
76
    Mixin class with shared methods.
77
    """
78
79
    model = ActionExecutionAPI
80
    access = ActionExecution
81
82
    # A list of attributes which can be specified using ?exclude_attributes filter
83
    valid_exclude_attributes = [
84
        'result',
85
        'trigger_instance'
86
    ]
87
88
    def _get_from_model_kwargs_for_request(self, **kwargs):
89
        """
90
        Set mask_secrets=False if the user is an admin and provided ?show_secrets=True query param.
91
        """
92
        return {'mask_secrets': not kwargs.get('show_secrets', False)}
93
94
    def _handle_schedule_execution(self, liveaction_api, requester_user=None, **kwargs):
95
        """
96
        :param liveaction: LiveActionAPI object.
97
        :type liveaction: :class:`LiveActionAPI`
98
        """
99
100
        if not requester_user:
101
            requester_user = UserDB(cfg.CONF.system_user.user)
102
103
        # Assert the permissions
104
        action_ref = liveaction_api.action
105
        action_db = action_utils.get_action_by_ref(action_ref)
106
        user = liveaction_api.user or requester_user.name
107
108
        assert_user_has_resource_db_permission(user_db=requester_user, resource_db=action_db,
109
                                               permission_type=PermissionType.ACTION_EXECUTE)
110
111
        # Validate that the authenticated user is admin if user query param is provided
112
        assert_user_is_admin_if_user_query_param_is_provided(user_db=requester_user,
113
                                                             user=user)
114
115
        try:
116
            return self._schedule_execution(liveaction=liveaction_api, user=user, **kwargs)
117
        except ValueError as e:
118
            LOG.exception('Unable to execute action.')
119
            abort(http_client.BAD_REQUEST, str(e))
120
        except jsonschema.ValidationError as e:
121
            LOG.exception('Unable to execute action. Parameter validation failed.')
122
            abort(http_client.BAD_REQUEST, re.sub("u'([^']*)'", r"'\1'", e.message))
123
        except TraceNotFoundException as e:
124
            abort(http_client.BAD_REQUEST, str(e))
125
        except ValueValidationException as e:
126
            raise e
127
        except Exception as e:
128
            LOG.exception('Unable to execute action. Unexpected error encountered.')
129
            abort(http_client.INTERNAL_SERVER_ERROR, str(e))
130
131
    def _schedule_execution(self, liveaction, user=None, **kwargs):
132
        # Initialize execution context if it does not exist.
133
        if not hasattr(liveaction, 'context'):
134
            liveaction.context = dict()
135
136
        liveaction.context['user'] = user
137
        LOG.debug('User is: %s' % liveaction.context['user'])
138
139
        # Retrieve other st2 context from request header.
140
        context_string = kwargs.get('st2-context', None)
141
        if context_string:
142
            context = try_loads(context_string)
143
            if not isinstance(context, dict):
144
                raise ValueError('Unable to convert st2-context from the headers into JSON.')
145
            liveaction.context.update(context)
146
147
        # Schedule the action execution.
148
        liveaction_db = LiveActionAPI.to_model(liveaction)
149
        liveaction_db, actionexecution_db = action_service.create_request(liveaction_db)
150
151
        action_db = action_utils.get_action_by_ref(liveaction_db.action)
152
        runnertype_db = action_utils.get_runnertype_by_name(action_db.runner_type['name'])
153
154
        try:
155
            liveaction_db.parameters = param_utils.render_live_params(
156
                runnertype_db.runner_parameters, action_db.parameters, liveaction_db.parameters,
157
                liveaction_db.context)
158
        except ParamException:
159
            # By this point the execution is already in the DB therefore need to mark it failed.
160
            _, e, tb = sys.exc_info()
161
            action_service.update_status(
162
                liveaction=liveaction_db,
163
                new_status=LIVEACTION_STATUS_FAILED,
164
                result={'error': str(e), 'traceback': ''.join(traceback.format_tb(tb, 20))})
165
            # Might be a good idea to return the actual ActionExecution rather than bubble up
166
            # the execption.
167
            raise ValueValidationException(str(e))
168
169
        liveaction_db = LiveAction.add_or_update(liveaction_db, publish=False)
170
171
        _, actionexecution_db = action_service.publish_request(liveaction_db, actionexecution_db)
172
        from_model_kwargs = self._get_from_model_kwargs_for_request(**kwargs)
173
        execution_api = ActionExecutionAPI.from_model(actionexecution_db, from_model_kwargs)
174
175
        return Response(json=execution_api, status=http_client.CREATED)
176
177
    def _get_result_object(self, id):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
178
        """
179
        Retrieve result object for the provided action execution.
180
181
        :param id: Action execution ID.
182
        :type id: ``str``
183
184
        :rtype: ``dict``
185
        """
186
        fields = ['result']
187
        action_exec_db = self.access.impl.model.objects.filter(id=id).only(*fields).get()
188
        return action_exec_db.result
189
190
    def _get_children(self, id_, depth=-1, result_fmt=None, **kwargs):
191
        # make sure depth is int. Url encoding will make it a string and needs to
192
        # be converted back in that case.
193
        depth = int(depth)
194
        from_model_kwargs = self._get_from_model_kwargs_for_request(**kwargs)
195
        LOG.debug('retrieving children for id: %s with depth: %s', id_, depth)
196
        descendants = execution_service.get_descendants(actionexecution_id=id_,
197
                                                        descendant_depth=depth,
198
                                                        result_fmt=result_fmt)
199
200
        return [self.model.from_model(descendant, from_model_kwargs) for
201
                descendant in descendants]
202
203
204
class BaseActionExecutionNestedController(ActionExecutionsControllerMixin, ResourceController):
205
    # Note: We need to override "get_one" and "get_all" to return 404 since nested controller
206
    # don't implement thos methods
207
208
    # ResourceController attributes
209
    query_options = {}
210
    supported_filters = {}
211
212
    def get_all(self):
213
        abort(httplib.NOT_FOUND)
214
215
    def get_one(self, id):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
216
        abort(httplib.NOT_FOUND)
217
218
219
class ActionExecutionChildrenController(BaseActionExecutionNestedController):
220
    def get_one(self, id, requester_user, depth=-1, result_fmt=None, **kwargs):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
221
        """
222
        Retrieve children for the provided action execution.
223
224
        :rtype: ``list``
225
        """
226
227
        instance = self._get_by_id(resource_id=id)
228
229
        permission_type = PermissionType.EXECUTION_VIEW
230
        rbac_utils.assert_user_has_resource_db_permission(user_db=requester_user,
231
                                                          resource_db=instance,
232
                                                          permission_type=permission_type)
233
234
        return self._get_children(id_=id, depth=depth, result_fmt=result_fmt, **kwargs)
235
236
237
class ActionExecutionAttributeController(BaseActionExecutionNestedController):
238
    valid_exclude_attributes = ['action__pack', 'action__uid'] + \
239
        ActionExecutionsControllerMixin.valid_exclude_attributes
240
241
    def get(self, id, attribute, requester_user, **kwargs):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
242
        """
243
        Retrieve a particular attribute for the provided action execution.
244
245
        Handles requests:
246
247
            GET /executions/<id>/attribute/<attribute name>
248
249
        :rtype: ``dict``
250
        """
251
        fields = [attribute, 'action__pack', 'action__uid']
252
        fields = self._validate_exclude_fields(fields)
253
        action_exec_db = self.access.impl.model.objects.filter(id=id).only(*fields).get()
254
255
        permission_type = PermissionType.EXECUTION_VIEW
256
        rbac_utils.assert_user_has_resource_db_permission(user_db=requester_user,
257
                                                          resource_db=action_exec_db,
258
                                                          permission_type=permission_type)
259
260
        result = getattr(action_exec_db, attribute, None)
261
        return result
262
263
264
class ActionExecutionReRunController(ActionExecutionsControllerMixin, ResourceController):
265
    supported_filters = {}
266
    exclude_fields = [
267
        'result',
268
        'trigger_instance'
269
    ]
270
271
    class ExecutionSpecificationAPI(object):
272
        def __init__(self, parameters=None, tasks=None, reset=None, user=None):
273
            self.parameters = parameters or {}
274
            self.tasks = tasks or []
275
            self.reset = reset or []
276
            self.user = user
277
278
        def validate(self):
279
            if (self.tasks or self.reset) and self.parameters:
280
                raise ValueError('Parameters override is not supported when '
281
                                 're-running task(s) for a workflow.')
282
283
            if self.parameters:
284
                assert isinstance(self.parameters, dict)
285
286
            if self.tasks:
287
                assert isinstance(self.tasks, list)
288
289
            if self.reset:
290
                assert isinstance(self.reset, list)
291
292
            if list(set(self.reset) - set(self.tasks)):
293
                raise ValueError('List of tasks to reset does not match the tasks to rerun.')
294
295
            return self
296
297
    def post(self, spec_api, id, requester_user=None, no_merge=False, **kwargs):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
298
        """
299
        Re-run the provided action execution optionally specifying override parameters.
300
301
        Handles requests:
302
303
            POST /executions/<id>/re_run
304
        """
305
306
        if (spec_api.tasks or spec_api.reset) and spec_api.parameters:
307
            raise ValueError('Parameters override is not supported when '
308
                             're-running task(s) for a workflow.')
309
310
        if spec_api.parameters:
311
            assert isinstance(spec_api.parameters, dict)
312
313
        if spec_api.tasks:
314
            assert isinstance(spec_api.tasks, list)
315
316
        if spec_api.reset:
317
            assert isinstance(spec_api.reset, list)
318
319
        if list(set(spec_api.reset) - set(spec_api.tasks)):
320
            raise ValueError('List of tasks to reset does not match the tasks to rerun.')
321
322
        no_merge = cast_argument_value(value_type=bool, value=no_merge)
323
        existing_execution = self._get_one_by_id(id=id, exclude_fields=self.exclude_fields,
324
                                                 requester_user=requester_user,
325
                                                 permission_type=PermissionType.EXECUTION_VIEW)
326
327
        if spec_api.tasks and existing_execution.runner['name'] != 'mistral-v2':
328
            raise ValueError('Task option is only supported for Mistral workflows.')
329
330
        # Merge in any parameters provided by the user
331
        new_parameters = {}
332
        if not no_merge:
333
            new_parameters.update(getattr(existing_execution, 'parameters', {}))
334
        new_parameters.update(spec_api.parameters)
335
336
        # Create object for the new execution
337
        action_ref = existing_execution.action['ref']
338
339
        # Include additional option(s) for the execution
340
        context = {
341
            're-run': {
342
                'ref': id,
343
            }
344
        }
345
346
        if spec_api.tasks:
347
            context['re-run']['tasks'] = spec_api.tasks
348
349
        if spec_api.reset:
350
            context['re-run']['reset'] = spec_api.reset
351
352
        # Add trace to the new execution
353
        trace = trace_service.get_trace_db_by_action_execution(
354
            action_execution_id=existing_execution.id)
355
356
        if trace:
357
            context['trace_context'] = {'id_': str(trace.id)}
358
359
        new_liveaction_api = LiveActionCreateAPI(action=action_ref,
360
                                                 context=context,
361
                                                 parameters=new_parameters,
362
                                                 user=spec_api.user)
363
364
        return self._handle_schedule_execution(liveaction_api=new_liveaction_api,
365
                                               requester_user=requester_user)
366
367
368
class ActionExecutionsController(ActionExecutionsControllerMixin, ResourceController):
369
    """
370
        Implements the RESTful web endpoint that handles
371
        the lifecycle of ActionExecutions in the system.
372
    """
373
374
    # Nested controllers
375
    views = ExecutionViewsController()
376
377
    children = ActionExecutionChildrenController()
378
    attribute = ActionExecutionAttributeController()
379
    re_run = ActionExecutionReRunController()
380
381
    # ResourceController attributes
382
    query_options = {
383
        'sort': ['-start_timestamp', 'action.ref']
384
    }
385
    supported_filters = SUPPORTED_EXECUTIONS_FILTERS
386
    filter_transform_functions = {
387
        'timestamp_gt': lambda value: isotime.parse(value=value),
388
        'timestamp_lt': lambda value: isotime.parse(value=value)
389
    }
390
391
    def get_all(self, exclude_attributes=None, **kw):
392
        """
393
        List all executions.
394
395
        Handles requests:
396
            GET /executions[?exclude_attributes=result,trigger_instance]
397
398
        :param exclude_attributes: Comma delimited string of attributes to exclude from the object.
399
        :type exclude_attributes: ``str``
400
        """
401
        if exclude_attributes:
402
            exclude_fields = exclude_attributes.split(',')
403
        else:
404
            exclude_fields = None
405
406
        exclude_fields = self._validate_exclude_fields(exclude_fields=exclude_fields)
407
408
        # Use a custom sort order when filtering on a timestamp so we return a correct result as
409
        # expected by the user
410
        if kw.get('timestamp_lt', None) or kw.get('sort_desc', None):
411
            query_options = {'sort': ['-start_timestamp', 'action.ref']}
412
            kw['query_options'] = query_options
413
        elif kw.get('timestamp_gt', None) or kw.get('sort_asc', None):
414
            query_options = {'sort': ['+start_timestamp', 'action.ref']}
415
            kw['query_options'] = query_options
416
417
        return self._get_action_executions(exclude_fields=exclude_fields, **kw)
418
419
    def get_one(self, id, requester_user, exclude_attributes=None, **kwargs):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
420
        """
421
        Retrieve a single execution.
422
423
        Handles requests:
424
            GET /executions/<id>[?exclude_attributes=result,trigger_instance]
425
426
        :param exclude_attributes: Comma delimited string of attributes to exclude from the object.
427
        :type exclude_attributes: ``str``
428
        """
429
        if exclude_attributes:
430
            exclude_fields = exclude_attributes.split(',')
431
        else:
432
            exclude_fields = None
433
434
        exclude_fields = self._validate_exclude_fields(exclude_fields=exclude_fields)
435
436
        return self._get_one_by_id(id=id, exclude_fields=exclude_fields,
437
                                   requester_user=requester_user,
438
                                   permission_type=PermissionType.EXECUTION_VIEW)
439
440
    def post(self, liveaction_api, requester_user=None, **kwargs):
441
        return self._handle_schedule_execution(liveaction_api=liveaction_api,
442
                                               requester_user=requester_user, **kwargs)
443
444
    def delete(self, id, requester_user, **kwargs):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
445
        """
446
        Stops a single execution.
447
448
        Handles requests:
449
            DELETE /executions/<id>
450
451
        """
452
        if not requester_user:
453
            requester_user = UserDB(cfg.CONF.system_user.user)
454
455
        execution_api = self._get_one_by_id(id=id, requester_user=requester_user,
456
                                            permission_type=PermissionType.EXECUTION_STOP)
457
458
        if not execution_api:
459
            abort(http_client.NOT_FOUND, 'Execution with id %s not found.' % id)
460
461
        liveaction_id = execution_api.liveaction['id']
462
        if not liveaction_id:
463
            abort(http_client.INTERNAL_SERVER_ERROR,
464
                  'Execution object missing link to liveaction %s.' % liveaction_id)
465
466
        try:
467
            liveaction_db = LiveAction.get_by_id(liveaction_id)
468
        except:
469
            abort(http_client.INTERNAL_SERVER_ERROR,
470
                  'Execution object missing link to liveaction %s.' % liveaction_id)
471
472
        if liveaction_db.status == LIVEACTION_STATUS_CANCELED:
473
            LOG.info(
474
                'Action %s already in "canceled" state; \
475
                returning execution object.' % liveaction_db.id
476
            )
477
            return execution_api
478
479
        if liveaction_db.status not in LIVEACTION_CANCELABLE_STATES:
480
            abort(http_client.OK, 'Action cannot be canceled. State = %s.' % liveaction_db.status)
481
482
        try:
483
            (liveaction_db, execution_db) = action_service.request_cancellation(
484
                liveaction_db, requester_user.name or cfg.CONF.system_user.user)
485
        except:
486
            LOG.exception('Failed requesting cancellation for liveaction %s.', liveaction_db.id)
487
            abort(http_client.INTERNAL_SERVER_ERROR, 'Failed canceling execution.')
488
489
        from_model_kwargs = self._get_from_model_kwargs_for_request(**kwargs)
490
491
        return ActionExecutionAPI.from_model(execution_db, from_model_kwargs)
492
493
    def _get_action_executions(self, exclude_fields=None, **kw):
494
        """
495
        :param exclude_fields: A list of object fields to exclude.
496
        :type exclude_fields: ``list``
497
        """
498
499
        kw['limit'] = int(kw.get('limit', self.default_limit))
500
501
        LOG.debug('Retrieving all action executions with filters=%s', kw)
502
        return super(ActionExecutionsController, self)._get_all(exclude_fields=exclude_fields,
503
                                                                **kw)
504
505
506
action_executions_controller = ActionExecutionsController()
507
action_execution_rerun_controller = ActionExecutionReRunController()
508
action_execution_attribute_controller = ActionExecutionAttributeController()
509
action_execution_children_controller = ActionExecutionChildrenController()
510