Passed
Push — develop ( d8da15...d23be3 )
by Plexxi
07:00 queued 03:32
created

ActionExecutionReRunController.post()   C

Complexity

Conditions 7

Size

Total Lines 49

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 7
c 2
b 0
f 0
dl 0
loc 49
rs 5.5
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import re
18
import httplib
19
import sys
20
import traceback
21
22
import jsonschema
23
import pecan
24
from pecan import abort
25
from six.moves import http_client
26
27
from st2api.controllers.base import BaseRestControllerMixin
28
from st2api.controllers.resource import ResourceController
29
from st2api.controllers.v1.executionviews import ExecutionViewsController
30
from st2api.controllers.v1.executionviews import SUPPORTED_FILTERS
31
from st2common import log as logging
32
from st2common.constants.action import LIVEACTION_STATUS_CANCELED, LIVEACTION_STATUS_FAILED
33
from st2common.constants.action import LIVEACTION_CANCELABLE_STATES
34
from st2common.exceptions.param import ParamException
35
from st2common.exceptions.apivalidation import ValueValidationException
36
from st2common.exceptions.trace import TraceNotFoundException
37
from st2common.models.api.action import LiveActionAPI
38
from st2common.models.api.action import LiveActionCreateAPI
39
from st2common.models.api.base import jsexpose
40
from st2common.models.api.execution import ActionExecutionAPI
41
from st2common.persistence.liveaction import LiveAction
42
from st2common.persistence.execution import ActionExecution
43
from st2common.services import action as action_service
44
from st2common.services import executions as execution_service
45
from st2common.services import trace as trace_service
46
from st2common.util import jsonify
47
from st2common.util import isotime
48
from st2common.util import action_db as action_utils
49
from st2common.util.api import get_requester
50
from st2common.util import param as param_utils
51
from st2common.rbac.types import PermissionType
52
from st2common.rbac.decorators import request_user_has_permission
53
from st2common.rbac.decorators import request_user_has_resource_db_permission
54
from st2common.rbac.utils import assert_request_user_has_resource_db_permission
55
from st2common.rbac.utils import assert_request_user_is_admin_if_user_query_param_is_provider
56
57
__all__ = [
58
    'ActionExecutionsController'
59
]
60
61
LOG = logging.getLogger(__name__)
62
63
# Note: We initialize filters here and not in the constructor
64
SUPPORTED_EXECUTIONS_FILTERS = copy.deepcopy(SUPPORTED_FILTERS)
65
SUPPORTED_EXECUTIONS_FILTERS.update({
66
    'timestamp_gt': 'start_timestamp.gt',
67
    'timestamp_lt': 'start_timestamp.lt'
68
})
69
70
MONITOR_THREAD_EMPTY_Q_SLEEP_TIME = 5
71
MONITOR_THREAD_NO_WORKERS_SLEEP_TIME = 1
72
73
74
class ActionExecutionsControllerMixin(BaseRestControllerMixin):
75
    """
76
    Mixin class with shared methods.
77
    """
78
79
    model = ActionExecutionAPI
80
    access = ActionExecution
81
82
    # A list of attributes which can be specified using ?exclude_attributes filter
83
    valid_exclude_attributes = [
84
        'result',
85
        'trigger_instance'
86
    ]
87
88
    def _get_from_model_kwargs_for_request(self, request):
89
        """
90
        Set mask_secrets=False if the user is an admin and provided ?show_secrets=True query param.
91
        """
92
        return {'mask_secrets': self._get_mask_secrets(request)}
93
94
    def _handle_schedule_execution(self, liveaction_api):
95
        """
96
        :param liveaction: LiveActionAPI object.
97
        :type liveaction: :class:`LiveActionAPI`
98
        """
99
100
        # Assert the permissions
101
        action_ref = liveaction_api.action
102
        action_db = action_utils.get_action_by_ref(action_ref)
103
        user = liveaction_api.user or get_requester()
104
105
        assert_request_user_has_resource_db_permission(request=pecan.request, resource_db=action_db,
106
            permission_type=PermissionType.ACTION_EXECUTE)
107
108
        # TODO: Validate user is admin if user is provided
109
        assert_request_user_is_admin_if_user_query_param_is_provider(request=pecan.request,
110
                                                                     user=user)
111
112
        try:
113
            return self._schedule_execution(liveaction=liveaction_api, user=user)
114
        except ValueError as e:
115
            LOG.exception('Unable to execute action.')
116
            abort(http_client.BAD_REQUEST, str(e))
117
        except jsonschema.ValidationError as e:
118
            LOG.exception('Unable to execute action. Parameter validation failed.')
119
            abort(http_client.BAD_REQUEST, re.sub("u'([^']*)'", r"'\1'", e.message))
120
        except TraceNotFoundException as e:
121
            abort(http_client.BAD_REQUEST, str(e))
122
        except ValueValidationException as e:
123
            raise e
124
        except Exception as e:
125
            LOG.exception('Unable to execute action. Unexpected error encountered.')
126
            abort(http_client.INTERNAL_SERVER_ERROR, str(e))
127
128
    def _schedule_execution(self, liveaction, user=None):
129
        # Initialize execution context if it does not exist.
130
        if not hasattr(liveaction, 'context'):
131
            liveaction.context = dict()
132
133
        liveaction.context['user'] = user
134
        LOG.debug('User is: %s' % liveaction.context['user'])
135
136
        # Retrieve other st2 context from request header.
137
        if 'st2-context' in pecan.request.headers and pecan.request.headers['st2-context']:
138
            context = jsonify.try_loads(pecan.request.headers['st2-context'])
139
            if not isinstance(context, dict):
140
                raise ValueError('Unable to convert st2-context from the headers into JSON.')
141
            liveaction.context.update(context)
142
143
        # Schedule the action execution.
144
        liveaction_db = LiveActionAPI.to_model(liveaction)
145
        liveaction_db, actionexecution_db = action_service.create_request(liveaction_db)
146
147
        action_db = action_utils.get_action_by_ref(liveaction_db.action)
148
        runnertype_db = action_utils.get_runnertype_by_name(action_db.runner_type['name'])
149
150
        try:
151
            liveaction_db.parameters = param_utils.render_live_params(
152
                runnertype_db.runner_parameters, action_db.parameters, liveaction_db.parameters,
153
                liveaction_db.context)
154
        except ParamException:
155
            # By this point the execution is already in the DB therefore need to mark it failed.
156
            _, e, tb = sys.exc_info()
157
            action_service.update_status(
158
                liveaction=liveaction_db,
159
                new_status=LIVEACTION_STATUS_FAILED,
160
                result={'error': str(e), 'traceback': ''.join(traceback.format_tb(tb, 20))})
161
            # Might be a good idea to return the actual ActionExecution rather than bubble up
162
            # the execption.
163
            raise ValueValidationException(str(e))
164
165
        liveaction_db = LiveAction.add_or_update(liveaction_db, publish=False)
166
167
        _, actionexecution_db = action_service.publish_request(liveaction_db, actionexecution_db)
168
        from_model_kwargs = self._get_from_model_kwargs_for_request(request=pecan.request)
169
        return ActionExecutionAPI.from_model(actionexecution_db, from_model_kwargs)
170
171
    def _get_result_object(self, id):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
172
        """
173
        Retrieve result object for the provided action execution.
174
175
        :param id: Action execution ID.
176
        :type id: ``str``
177
178
        :rtype: ``dict``
179
        """
180
        fields = ['result']
181
        action_exec_db = self.access.impl.model.objects.filter(id=id).only(*fields).get()
182
        return action_exec_db.result
183
184
    def _get_children(self, id_, depth=-1, result_fmt=None):
185
        # make sure depth is int. Url encoding will make it a string and needs to
186
        # be converted back in that case.
187
        depth = int(depth)
188
        from_model_kwargs = self._get_from_model_kwargs_for_request(request=pecan.request)
189
        LOG.debug('retrieving children for id: %s with depth: %s', id_, depth)
190
        descendants = execution_service.get_descendants(actionexecution_id=id_,
191
                                                        descendant_depth=depth,
192
                                                        result_fmt=result_fmt)
193
194
        return [self.model.from_model(descendant, from_model_kwargs) for
195
                descendant in descendants]
196
197
198
class BaseActionExecutionNestedController(ActionExecutionsControllerMixin, ResourceController):
199
    # Note: We need to override "get_one" and "get_all" to return 404 since nested controller
200
    # don't implement thos methods
201
202
    # ResourceController attributes
203
    query_options = {}
204
    supported_filters = {}
205
206
    def get_all(self):
207
        abort(httplib.NOT_FOUND)
208
209
    def get_one(self, id):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
210
        abort(httplib.NOT_FOUND)
211
212
213
class ActionExecutionChildrenController(BaseActionExecutionNestedController):
214
    @request_user_has_resource_db_permission(permission_type=PermissionType.EXECUTION_VIEW)
215
    @jsexpose(arg_types=[str])
216
    def get(self, id, **kwargs):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
217
        """
218
        Retrieve children for the provided action execution.
219
220
        :rtype: ``list``
221
        """
222
        return self._get_children(id_=id, **kwargs)
223
224
225
class ActionExecutionAttributeController(BaseActionExecutionNestedController):
226
    @request_user_has_resource_db_permission(permission_type=PermissionType.EXECUTION_VIEW)
227
    @jsexpose()
228
    def get(self, id, attribute, **kwargs):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
229
        """
230
        Retrieve a particular attribute for the provided action execution.
231
232
        Handles requests:
233
234
            GET /executions/<id>/attribute/<attribute name>
235
236
        :rtype: ``dict``
237
        """
238
        fields = [attribute]
239
        fields = self._validate_exclude_fields(fields)
240
        action_exec_db = self.access.impl.model.objects.filter(id=id).only(*fields).get()
241
        result = getattr(action_exec_db, attribute, None)
242
        return result
243
244
245
class ActionExecutionReRunController(ActionExecutionsControllerMixin, ResourceController):
246
    supported_filters = {}
247
    exclude_fields = [
248
        'result',
249
        'trigger_instance'
250
    ]
251
252
    class ExecutionSpecificationAPI(object):
253
        def __init__(self, parameters=None, tasks=None, reset=None, user=None):
254
            self.parameters = parameters or {}
255
            self.tasks = tasks or []
256
            self.reset = reset or []
257
            self.user = user
258
259
        def validate(self):
260
            if (self.tasks or self.reset) and self.parameters:
261
                raise ValueError('Parameters override is not supported when '
262
                                 're-running task(s) for a workflow.')
263
264
            if self.parameters:
265
                assert isinstance(self.parameters, dict)
266
267
            if self.tasks:
268
                assert isinstance(self.tasks, list)
269
270
            if self.reset:
271
                assert isinstance(self.reset, list)
272
273
            if list(set(self.reset) - set(self.tasks)):
274
                raise ValueError('List of tasks to reset does not match the tasks to rerun.')
275
276
            return self
277
278
    @jsexpose(body_cls=ExecutionSpecificationAPI, status_code=http_client.CREATED)
279
    def post(self, spec, execution_id, no_merge=None):
280
        """
281
        Re-run the provided action execution optionally specifying override parameters.
282
283
        Handles requests:
284
285
            POST /executions/<id>/re_run
286
        """
287
        existing_execution = self._get_one(id=execution_id, exclude_fields=self.exclude_fields)
288
289
        if spec.tasks and existing_execution.runner['name'] != 'mistral-v2':
290
            raise ValueError('Task option is only supported for Mistral workflows.')
291
292
        # Merge in any parameters provided by the user
293
        new_parameters = {}
294
        if not no_merge:
295
            new_parameters.update(getattr(existing_execution, 'parameters', {}))
296
        new_parameters.update(spec.parameters)
297
298
        # Create object for the new execution
299
        action_ref = existing_execution.action['ref']
300
301
        # Include additional option(s) for the execution
302
        context = {
303
            're-run': {
304
                'ref': execution_id,
305
            }
306
        }
307
308
        if spec.tasks:
309
            context['re-run']['tasks'] = spec.tasks
310
311
        if spec.reset:
312
            context['re-run']['reset'] = spec.reset
313
314
        # Add trace to the new execution
315
        trace = trace_service.get_trace_db_by_action_execution(
316
            action_execution_id=existing_execution.id)
317
318
        if trace:
319
            context['trace_context'] = {'id_': str(trace.id)}
320
321
        new_liveaction_api = LiveActionCreateAPI(action=action_ref,
322
                                                 context=context,
323
                                                 parameters=new_parameters,
324
                                                 user=spec.user)
325
326
        return self._handle_schedule_execution(liveaction_api=new_liveaction_api)
327
328
329
class ActionExecutionsController(ActionExecutionsControllerMixin, ResourceController):
330
    """
331
        Implements the RESTful web endpoint that handles
332
        the lifecycle of ActionExecutions in the system.
333
    """
334
335
    # Nested controllers
336
    views = ExecutionViewsController()
337
338
    children = ActionExecutionChildrenController()
339
    attribute = ActionExecutionAttributeController()
340
    re_run = ActionExecutionReRunController()
341
342
    # ResourceController attributes
343
    query_options = {
344
        'sort': ['-start_timestamp', 'action.ref']
345
    }
346
    supported_filters = SUPPORTED_EXECUTIONS_FILTERS
347
    filter_transform_functions = {
348
        'timestamp_gt': lambda value: isotime.parse(value=value),
349
        'timestamp_lt': lambda value: isotime.parse(value=value)
350
    }
351
352
    @request_user_has_permission(permission_type=PermissionType.EXECUTION_LIST)
353
    @jsexpose()
354
    def get_all(self, exclude_attributes=None, **kw):
355
        """
356
        List all executions.
357
358
        Handles requests:
359
            GET /executions[?exclude_attributes=result,trigger_instance]
360
361
        :param exclude_attributes: Comma delimited string of attributes to exclude from the object.
362
        :type exclude_attributes: ``str``
363
        """
364
        if exclude_attributes:
365
            exclude_fields = exclude_attributes.split(',')
366
        else:
367
            exclude_fields = None
368
369
        exclude_fields = self._validate_exclude_fields(exclude_fields=exclude_fields)
370
371
        # Use a custom sort order when filtering on a timestamp so we return a correct result as
372
        # expected by the user
373
        if 'timestamp_lt' in kw:
374
            query_options = {'sort': ['-start_timestamp', 'action.ref']}
375
            kw['query_options'] = query_options
376
        elif 'timestamp_gt' in kw:
377
            query_options = {'sort': ['+start_timestamp', 'action.ref']}
378
            kw['query_options'] = query_options
379
380
        return self._get_action_executions(exclude_fields=exclude_fields, **kw)
381
382
    @request_user_has_resource_db_permission(permission_type=PermissionType.EXECUTION_VIEW)
383
    @jsexpose(arg_types=[str])
384
    def get_one(self, id, exclude_attributes=None, **kwargs):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
385
        """
386
        Retrieve a single execution.
387
388
        Handles requests:
389
            GET /executions/<id>[?exclude_attributes=result,trigger_instance]
390
391
        :param exclude_attributes: Comma delimited string of attributes to exclude from the object.
392
        :type exclude_attributes: ``str``
393
        """
394
        if exclude_attributes:
395
            exclude_fields = exclude_attributes.split(',')
396
        else:
397
            exclude_fields = None
398
399
        exclude_fields = self._validate_exclude_fields(exclude_fields=exclude_fields)
400
401
        return self._get_one(id=id, exclude_fields=exclude_fields)
402
403
    @jsexpose(body_cls=LiveActionCreateAPI, status_code=http_client.CREATED)
404
    def post(self, liveaction_api):
405
        return self._handle_schedule_execution(liveaction_api=liveaction_api)
406
407
    @request_user_has_resource_db_permission(permission_type=PermissionType.EXECUTION_STOP)
408
    @jsexpose(arg_types=[str])
409
    def delete(self, exec_id):
410
        """
411
        Stops a single execution.
412
413
        Handles requests:
414
            DELETE /executions/<id>
415
416
        """
417
        execution_api = self._get_one(id=exec_id)
418
419
        if not execution_api:
420
            abort(http_client.NOT_FOUND, 'Execution with id %s not found.' % exec_id)
421
422
        liveaction_id = execution_api.liveaction['id']
423
        if not liveaction_id:
424
            abort(http_client.INTERNAL_SERVER_ERROR,
425
                  'Execution object missing link to liveaction %s.' % liveaction_id)
426
427
        try:
428
            liveaction_db = LiveAction.get_by_id(liveaction_id)
429
        except:
430
            abort(http_client.INTERNAL_SERVER_ERROR,
431
                  'Execution object missing link to liveaction %s.' % liveaction_id)
432
433
        if liveaction_db.status == LIVEACTION_STATUS_CANCELED:
434
            abort(http_client.OK, 'Action is already in "canceled" state.')
435
436
        if liveaction_db.status not in LIVEACTION_CANCELABLE_STATES:
437
            abort(http_client.OK, 'Action cannot be canceled. State = %s.' % liveaction_db.status)
438
439
        try:
440
            (liveaction_db, execution_db) = action_service.request_cancellation(
441
                liveaction_db, get_requester())
442
        except:
443
            LOG.exception('Failed requesting cancellation for liveaction %s.', liveaction_db.id)
444
            abort(http_client.INTERNAL_SERVER_ERROR, 'Failed canceling execution.')
445
446
        from_model_kwargs = self._get_from_model_kwargs_for_request(request=pecan.request)
447
448
        return ActionExecutionAPI.from_model(execution_db, from_model_kwargs)
449
450
    @jsexpose()
451
    def options(self, *args, **kw):
452
        return
453
454
    def _get_action_executions(self, exclude_fields=None, **kw):
455
        """
456
        :param exclude_fields: A list of object fields to exclude.
457
        :type exclude_fields: ``list``
458
        """
459
        kw['limit'] = int(kw.get('limit', 100))
460
461
        LOG.debug('Retrieving all action executions with filters=%s', kw)
462
        return super(ActionExecutionsController, self)._get_all(exclude_fields=exclude_fields,
463
                                                                **kw)
464