Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

st2common/st2common/services/action.py (2 issues)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import six
18
19
from st2common import log as logging
20
from st2common.constants import action as action_constants
21
from st2common.exceptions import actionrunner as runner_exc
22
from st2common.exceptions import db as db_exc
23
from st2common.exceptions import trace as trace_exc
24
from st2common.persistence.liveaction import LiveAction
25
from st2common.persistence.execution import ActionExecution
26
from st2common.persistence.execution import ActionExecutionOutput
27
from st2common.models.db.execution import ActionExecutionOutputDB
28
from st2common.runners import utils as runners_utils
29
from st2common.services import executions
30
from st2common.services import trace as trace_service
31
from st2common.util import date as date_utils
32
from st2common.util import action_db as action_utils
33
from st2common.util import schema as util_schema
34
35
36
__all__ = [
37
    'request',
38
    'create_request',
39
    'publish_request',
40
    'is_action_canceled_or_canceling',
41
42
    'request_pause',
43
    'request_resume',
44
45
    'store_execution_output_data',
46
]
47
48
LOG = logging.getLogger(__name__)
49
50
51
def _get_immutable_params(parameters):
52
    if not parameters:
53
        return []
54
    return [k for k, v in six.iteritems(parameters) if v.get('immutable', False)]
55
56
57
def create_request(liveaction):
58
    """
59
    Create an action execution.
60
61
    :return: (liveaction, execution)
62
    :rtype: tuple
63
    """
64
    # We import this here to avoid conflicts w/ runners that might import this
65
    # file since the runners don't have the config context by default.
66
    from st2common.metrics.base import get_driver
67
68
    # Use the user context from the parent action execution. Subtasks in a workflow
69
    # action can be invoked by a system user and so we want to use the user context
70
    # from the original workflow action.
71
    parent_context = executions.get_parent_context(liveaction)
72
    if parent_context:
73
        parent_user = parent_context.get('user', None)
74
        if parent_user:
75
            liveaction.context['user'] = parent_user
76
77
    # Validate action.
78
    action_db = action_utils.get_action_by_ref(liveaction.action)
79
    if not action_db:
80
        raise ValueError('Action "%s" cannot be found.' % liveaction.action)
81
    if not action_db.enabled:
82
        raise ValueError('Unable to execute. Action "%s" is disabled.' % liveaction.action)
83
84
    runnertype_db = action_utils.get_runnertype_by_name(action_db.runner_type['name'])
85
86
    if not hasattr(liveaction, 'parameters'):
87
        liveaction.parameters = dict()
88
89
    # Validate action parameters.
90
    schema = util_schema.get_schema_for_action_parameters(action_db)
91
    validator = util_schema.get_validator()
92
    util_schema.validate(liveaction.parameters, schema, validator, use_default=True,
93
                         allow_default_none=True)
94
95
    # validate that no immutable params are being overriden. Although possible to
96
    # ignore the override it is safer to inform the user to avoid surprises.
97
    immutables = _get_immutable_params(action_db.parameters)
98
    immutables.extend(_get_immutable_params(runnertype_db.runner_parameters))
99
    overridden_immutables = [p for p in six.iterkeys(liveaction.parameters) if p in immutables]
100
    if len(overridden_immutables) > 0:
101
        raise ValueError('Override of immutable parameter(s) %s is unsupported.'
102
                         % str(overridden_immutables))
103
104
    # Set notification settings for action.
105
    # XXX: There are cases when we don't want notifications to be sent for a particular
106
    # execution. So we should look at liveaction.parameters['notify']
107
    # and not set liveaction.notify.
108
    if not _is_notify_empty(action_db.notify):
109
        liveaction.notify = action_db.notify
110
111
    # Write to database and send to message queue.
112
    liveaction.status = action_constants.LIVEACTION_STATUS_REQUESTED
113
    liveaction.start_timestamp = date_utils.get_datetime_utc_now()
114
115
    # Set the "action_is_workflow" attribute
116
    liveaction.action_is_workflow = action_db.is_workflow()
117
118
    # Publish creation after both liveaction and actionexecution are created.
119
    liveaction = LiveAction.add_or_update(liveaction, publish=False)
120
121
    # Get trace_db if it exists. This could throw. If it throws, we have to cleanup
122
    # liveaction object so we don't see things in requested mode.
123
    trace_db = None
124
    try:
125
        _, trace_db = trace_service.get_trace_db_by_live_action(liveaction)
126
    except db_exc.StackStormDBObjectNotFoundError as e:
127
        _cleanup_liveaction(liveaction)
128
        raise trace_exc.TraceNotFoundException(str(e))
129
130
    execution = executions.create_execution_object(liveaction, publish=False)
131
132
    if trace_db:
133
        trace_service.add_or_update_given_trace_db(
134
            trace_db=trace_db,
135
            action_executions=[
136
                trace_service.get_trace_component_for_action_execution(execution, liveaction)
137
            ])
138
139
    get_driver().inc_counter('action.executions.%s' % (liveaction.status))
140
141
    return liveaction, execution
142
143
144
def publish_request(liveaction, execution):
145
    """
146
    Publish an action execution.
147
148
    :return: (liveaction, execution)
149
    :rtype: tuple
150
    """
151
    # Assume that this is a creation.
152
    LiveAction.publish_create(liveaction)
153
    LiveAction.publish_status(liveaction)
154
    ActionExecution.publish_create(execution)
155
156
    extra = {'liveaction_db': liveaction, 'execution_db': execution}
157
    LOG.audit('Action execution requested. LiveAction.id=%s, ActionExecution.id=%s' %
158
              (liveaction.id, execution.id), extra=extra)
159
160
    return liveaction, execution
161
162
163
def request(liveaction):
164
    liveaction, execution = create_request(liveaction)
165
    liveaction, execution = publish_request(liveaction, execution)
166
167
    return liveaction, execution
168
169
170
def update_status(liveaction, new_status, result=None, publish=True):
171
    if liveaction.status == new_status:
172
        return liveaction
173
174
    old_status = liveaction.status
175
176
    updates = {
177
        'liveaction_id': liveaction.id,
178
        'status': new_status,
179
        'result': result,
180
        'publish': False
181
    }
182
183
    if new_status in action_constants.LIVEACTION_COMPLETED_STATES:
184
        updates['end_timestamp'] = date_utils.get_datetime_utc_now()
185
186
    liveaction = action_utils.update_liveaction_status(**updates)
187
    action_execution = executions.update_execution(liveaction)
188
189
    msg = ('The status of action execution is changed from %s to %s. '
190
           '<LiveAction.id=%s, ActionExecution.id=%s>' % (old_status,
191
           new_status, liveaction.id, action_execution.id))
192
193
    extra = {
194
        'action_execution_db': action_execution,
195
        'liveaction_db': liveaction
196
    }
197
198
    LOG.audit(msg, extra=extra)
199
    LOG.info(msg)
200
201
    # Invoke post run if liveaction status is completed or paused.
202
    if (new_status in action_constants.LIVEACTION_COMPLETED_STATES or
203
            new_status == action_constants.LIVEACTION_STATUS_PAUSED):
204
        runners_utils.invoke_post_run(liveaction)
205
206
    if publish:
207
        LiveAction.publish_status(liveaction)
208
209
    return liveaction
210
211
212
def is_action_canceled_or_canceling(liveaction_id):
213
    liveaction_db = action_utils.get_liveaction_by_id(liveaction_id)
214
    return liveaction_db.status in [action_constants.LIVEACTION_STATUS_CANCELED,
215
                                    action_constants.LIVEACTION_STATUS_CANCELING]
216
217
218
def is_action_paused_or_pausing(liveaction_id):
219
    liveaction_db = action_utils.get_liveaction_by_id(liveaction_id)
220
    return liveaction_db.status in [action_constants.LIVEACTION_STATUS_PAUSED,
221
                                    action_constants.LIVEACTION_STATUS_PAUSING]
222
223
224
def request_cancellation(liveaction, requester):
225
    """
226
    Request cancellation of an action execution.
227
228
    :return: (liveaction, execution)
229
    :rtype: tuple
230
    """
231
    if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELING:
232
        return liveaction
233
234
    if liveaction.status not in action_constants.LIVEACTION_CANCELABLE_STATES:
235
        raise Exception(
236
            'Unable to cancel liveaction "%s" because it is already in a '
237
            'completed state.' % liveaction.id
238
        )
239
240
    result = {
241
        'message': 'Action canceled by user.',
242
        'user': requester
243
    }
244
245
    # Run cancelation sequence for liveaction that is in running state or
246
    # if the liveaction is operating under a workflow.
247
    if ('parent' in liveaction.context or
248
            liveaction.status in action_constants.LIVEACTION_STATUS_RUNNING):
249
        status = action_constants.LIVEACTION_STATUS_CANCELING
250
    else:
251
        status = action_constants.LIVEACTION_STATUS_CANCELED
252
253
    liveaction = update_status(liveaction, status, result=result)
254
255
    execution = ActionExecution.get(liveaction__id=str(liveaction.id))
256
257
    return (liveaction, execution)
258
259
260
def request_pause(liveaction, requester):
261
    """
262
    Request pause for a running action execution.
263
264
    :return: (liveaction, execution)
265
    :rtype: tuple
266
    """
267
    # Validate that the runner type of the action supports pause.
268
    action_db = action_utils.get_action_by_ref(liveaction.action)
269
270
    if not action_db:
271
        raise ValueError(
272
            'Unable to pause liveaction "%s" because the action "%s" '
273
            'is not found.' % (liveaction.id, liveaction.action)
274
        )
275
276
    if action_db.runner_type['name'] not in action_constants.WORKFLOW_RUNNER_TYPES:
277
        raise runner_exc.InvalidActionRunnerOperationError(
278
            'Unable to pause liveaction "%s" because it is not supported by the '
279
            '"%s" runner.' % (liveaction.id, action_db.runner_type['name'])
280
        )
281
282
    if (liveaction.status == action_constants.LIVEACTION_STATUS_PAUSING or
283
            liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED):
284
        execution = ActionExecution.get(liveaction__id=str(liveaction.id))
285
        return (liveaction, execution)
286
287
    if liveaction.status != action_constants.LIVEACTION_STATUS_RUNNING:
288
        raise runner_exc.UnexpectedActionExecutionStatusError(
289
            'Unable to pause liveaction "%s" because it is not in a running state.'
290
            % liveaction.id
291
        )
292
293
    liveaction = update_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
294
295
    execution = ActionExecution.get(liveaction__id=str(liveaction.id))
296
297
    return (liveaction, execution)
298
299
300
def request_resume(liveaction, requester):
301
    """
302
    Request resume for a paused action execution.
303
304
    :return: (liveaction, execution)
305
    :rtype: tuple
306
    """
307
    # Validate that the runner type of the action supports pause.
308
    action_db = action_utils.get_action_by_ref(liveaction.action)
309
310
    if not action_db:
311
        raise ValueError(
312
            'Unable to resume liveaction "%s" because the action "%s" '
313
            'is not found.' % (liveaction.id, liveaction.action)
314
        )
315
316
    if action_db.runner_type['name'] not in action_constants.WORKFLOW_RUNNER_TYPES:
317
        raise runner_exc.InvalidActionRunnerOperationError(
318
            'Unable to resume liveaction "%s" because it is not supported by the '
319
            '"%s" runner.' % (liveaction.id, action_db.runner_type['name'])
320
        )
321
322
    if liveaction.status == action_constants.LIVEACTION_STATUS_RUNNING:
323
        execution = ActionExecution.get(liveaction__id=str(liveaction.id))
324
        return (liveaction, execution)
325
326
    if liveaction.status != action_constants.LIVEACTION_STATUS_PAUSED:
327
        raise runner_exc.UnexpectedActionExecutionStatusError(
328
            'Unable to resume liveaction "%s" because it is not in a paused state.'
329
            % liveaction.id
330
        )
331
332
    liveaction = update_status(liveaction, action_constants.LIVEACTION_STATUS_RESUMING)
333
334
    execution = ActionExecution.get(liveaction__id=str(liveaction.id))
335
336
    return (liveaction, execution)
337
338
339
def get_parent_liveaction(liveaction_db):
340
    """Get the liveaction for the parent workflow
341
342
    Useful for finding the parent workflow. Pass in any LiveActionDB instance,
343
    and this function will return the liveaction of the parent workflow.
344
345
    :param liveaction_db: The LiveActionDB instance for which to find the parent.
346
    :rtype: LiveActionDB
347
    """
348
349
    parent = liveaction_db.context.get('parent')
350
351
    if not parent:
352
        return None
353
354
    parent_execution_db = ActionExecution.get(id=parent['execution_id'])
355
    parent_liveaction_db = LiveAction.get(id=parent_execution_db.liveaction['id'])
356
357
    return parent_liveaction_db
358
359
360
def get_parent_execution(execution_db):
361
    """Get the action execution for the parent workflow
362
363
    Useful for finding the parent workflow. Pass in any ActionExecutionDB instance,
364
    and this function will return the action execution of the parent workflow.
365
366
    :param execution_db: The ActionExecutionDB instance for which to find the parent.
367
    :rtype: ActionExecutionDB
368
    """
369
370
    if not execution_db.parent:
371
        return None
372
373
    parent_execution_db = ActionExecution.get(id=execution_db.parent)
374
375
    return parent_execution_db
376
377
378
def get_root_liveaction(liveaction_db):
379
    """Recursively ascends until the root liveaction is found
380
381
    Useful for finding an original parent workflow. Pass in any LiveActionDB instance,
382
    and this function will eventually return the top-most liveaction, even if the two
383
    are one and the same.
384
385
    :param liveaction_db: The LiveActionDB instance for which to find the root parent.
386
    :rtype: LiveActionDB
387
    """
388
389
    parent_liveaction_db = get_parent_liveaction(liveaction_db)
390
391
    return get_root_liveaction(parent_liveaction_db) if parent_liveaction_db else liveaction_db
392
393
394
def get_root_execution(execution_db):
395
    """Recursively ascends until the root action execution is found
396
397
    Useful for finding an original parent workflow. Pass in any ActionExecutionDB instance,
398
    and this function will eventually return the top-most action execution, even if the two
399
    are one and the same.
400
401
    :param execution_db: The ActionExecutionDB instance for which to find the root parent.
402
    :rtype: ActionExecutionDB
403
    """
404
405
    parent_execution_db = get_parent_execution(execution_db)
406
407
    return get_root_execution(parent_execution_db) if parent_execution_db else execution_db
408
409
410
def store_execution_output_data(execution_db, action_db, data, output_type='output',
411
                                timestamp=None):
412
    """
413
    Store output from an execution as a new document in the collection.
414
    """
415
    execution_id = str(execution_db.id)
416
    action_ref = action_db.ref
417
    runner_ref = getattr(action_db, 'runner_type', {}).get('name', 'unknown')
418
    timestamp = timestamp or date_utils.get_datetime_utc_now()
419
420
    output_db = ActionExecutionOutputDB(execution_id=execution_id,
421
                                        action_ref=action_ref,
422
                                        runner_ref=runner_ref,
423
                                        timestamp=timestamp,
424
                                        output_type=output_type,
425
                                        data=data)
426
    output_db = ActionExecutionOutput.add_or_update(output_db, publish=True,
427
                                                    dispatch_trigger=False)
428
429
    return output_db
430
431
432
def is_children_active(liveaction_id):
433
    execution_db = ActionExecution.get(liveaction__id=str(liveaction_id))
434
435
    if execution_db.runner['name'] not in action_constants.WORKFLOW_RUNNER_TYPES:
436
        return False
437
438
    children_execution_dbs = ActionExecution.query(parent=str(execution_db.id))
439
440
    inactive_statuses = (
441
        action_constants.LIVEACTION_COMPLETED_STATES +
442
        [action_constants.LIVEACTION_STATUS_PAUSED, action_constants.LIVEACTION_STATUS_PENDING]
443
    )
444
445
    completed = [
446
        child_exec_db.status in inactive_statuses
447
        for child_exec_db in children_execution_dbs
448
    ]
449
450
    return (not all(completed))
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after return.
Loading history...
451
452
453
def _cleanup_liveaction(liveaction):
454
    try:
455
        LiveAction.delete(liveaction)
456
    except:
457
        LOG.exception('Failed cleaning up LiveAction: %s.', liveaction)
458
        pass
0 ignored issues
show
Unnecessary pass statement
Loading history...
459
460
461
def _is_notify_empty(notify_db):
462
    """
463
    notify_db is considered to be empty if notify_db is None and neither
464
    of on_complete, on_success and on_failure have values.
465
    """
466
    if not notify_db:
467
        return True
468
    return not (notify_db.on_complete or notify_db.on_success or notify_db.on_failure)
469