Passed
Pull Request — master (#4000)
by W
05:52
created

is_children_active()   A

Complexity

Conditions 3

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
c 0
b 0
f 0
dl 0
loc 19
rs 9.4285
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import six
18
19
from st2common import log as logging
20
from st2common.constants import action as action_constants
21
from st2common.exceptions import actionrunner as runner_exc
22
from st2common.exceptions import db as db_exc
23
from st2common.exceptions import trace as trace_exc
24
from st2common.persistence.liveaction import LiveAction
25
from st2common.persistence.execution import ActionExecution
26
from st2common.persistence.execution import ActionExecutionOutput
27
from st2common.models.db.execution import ActionExecutionOutputDB
28
from st2common.runners import utils as runners_utils
29
from st2common.services import executions
30
from st2common.services import trace as trace_service
31
from st2common.util import date as date_utils
32
from st2common.util import action_db as action_utils
33
from st2common.util import schema as util_schema
34
35
36
__all__ = [
37
    'request',
38
    'create_request',
39
    'publish_request',
40
    'is_action_canceled_or_canceling',
41
42
    'request_pause',
43
    'request_resume',
44
45
    'store_execution_output_data',
46
]
47
48
LOG = logging.getLogger(__name__)
49
50
51
def _get_immutable_params(parameters):
52
    if not parameters:
53
        return []
54
    return [k for k, v in six.iteritems(parameters) if v.get('immutable', False)]
55
56
57
def create_request(liveaction):
58
    """
59
    Create an action execution.
60
61
    :return: (liveaction, execution)
62
    :rtype: tuple
63
    """
64
    # Use the user context from the parent action execution. Subtasks in a workflow
65
    # action can be invoked by a system user and so we want to use the user context
66
    # from the original workflow action.
67
    parent_context = executions.get_parent_context(liveaction)
68
    if parent_context:
69
        parent_user = parent_context.get('user', None)
70
        if parent_user:
71
            liveaction.context['user'] = parent_user
72
73
    # Validate action.
74
    action_db = action_utils.get_action_by_ref(liveaction.action)
75
    if not action_db:
76
        raise ValueError('Action "%s" cannot be found.' % liveaction.action)
77
    if not action_db.enabled:
78
        raise ValueError('Unable to execute. Action "%s" is disabled.' % liveaction.action)
79
80
    runnertype_db = action_utils.get_runnertype_by_name(action_db.runner_type['name'])
81
82
    if not hasattr(liveaction, 'parameters'):
83
        liveaction.parameters = dict()
84
85
    # Validate action parameters.
86
    schema = util_schema.get_schema_for_action_parameters(action_db)
87
    validator = util_schema.get_validator()
88
    util_schema.validate(liveaction.parameters, schema, validator, use_default=True,
89
                         allow_default_none=True)
90
91
    # validate that no immutable params are being overriden. Although possible to
92
    # ignore the override it is safer to inform the user to avoid surprises.
93
    immutables = _get_immutable_params(action_db.parameters)
94
    immutables.extend(_get_immutable_params(runnertype_db.runner_parameters))
95
    overridden_immutables = [p for p in six.iterkeys(liveaction.parameters) if p in immutables]
96
    if len(overridden_immutables) > 0:
97
        raise ValueError('Override of immutable parameter(s) %s is unsupported.'
98
                         % str(overridden_immutables))
99
100
    # Set notification settings for action.
101
    # XXX: There are cases when we don't want notifications to be sent for a particular
102
    # execution. So we should look at liveaction.parameters['notify']
103
    # and not set liveaction.notify.
104
    if not _is_notify_empty(action_db.notify):
105
        liveaction.notify = action_db.notify
106
107
    # Write to database and send to message queue.
108
    liveaction.status = action_constants.LIVEACTION_STATUS_REQUESTED
109
    liveaction.start_timestamp = date_utils.get_datetime_utc_now()
110
111
    # Set the "action_is_workflow" attribute
112
    liveaction.action_is_workflow = action_db.is_workflow()
113
114
    # Publish creation after both liveaction and actionexecution are created.
115
    liveaction = LiveAction.add_or_update(liveaction, publish=False)
116
117
    # Get trace_db if it exists. This could throw. If it throws, we have to cleanup
118
    # liveaction object so we don't see things in requested mode.
119
    trace_db = None
120
    try:
121
        _, trace_db = trace_service.get_trace_db_by_live_action(liveaction)
122
    except db_exc.StackStormDBObjectNotFoundError as e:
123
        _cleanup_liveaction(liveaction)
124
        raise trace_exc.TraceNotFoundException(str(e))
125
126
    execution = executions.create_execution_object(liveaction, publish=False)
127
128
    if trace_db:
129
        trace_service.add_or_update_given_trace_db(
130
            trace_db=trace_db,
131
            action_executions=[
132
                trace_service.get_trace_component_for_action_execution(execution, liveaction)
133
            ])
134
135
    return liveaction, execution
136
137
138
def publish_request(liveaction, execution):
139
    """
140
    Publish an action execution.
141
142
    :return: (liveaction, execution)
143
    :rtype: tuple
144
    """
145
    # Assume that this is a creation.
146
    LiveAction.publish_create(liveaction)
147
    LiveAction.publish_status(liveaction)
148
    ActionExecution.publish_create(execution)
149
150
    extra = {'liveaction_db': liveaction, 'execution_db': execution}
151
    LOG.audit('Action execution requested. LiveAction.id=%s, ActionExecution.id=%s' %
152
              (liveaction.id, execution.id), extra=extra)
153
154
    return liveaction, execution
155
156
157
def request(liveaction):
158
    liveaction, execution = create_request(liveaction)
159
    liveaction, execution = publish_request(liveaction, execution)
160
161
    return liveaction, execution
162
163
164
def update_status(liveaction, new_status, result=None, publish=True):
165
    if liveaction.status == new_status:
166
        return liveaction
167
168
    old_status = liveaction.status
169
170
    updates = {
171
        'liveaction_id': liveaction.id,
172
        'status': new_status,
173
        'result': result,
174
        'publish': False
175
    }
176
177
    if new_status in action_constants.LIVEACTION_COMPLETED_STATES:
178
        updates['end_timestamp'] = date_utils.get_datetime_utc_now()
179
180
    liveaction = action_utils.update_liveaction_status(**updates)
181
    action_execution = executions.update_execution(liveaction)
182
183
    msg = ('The status of action execution is changed from %s to %s. '
184
           '<LiveAction.id=%s, ActionExecution.id=%s>' % (old_status,
185
           new_status, liveaction.id, action_execution.id))
186
187
    extra = {
188
        'action_execution_db': action_execution,
189
        'liveaction_db': liveaction
190
    }
191
192
    LOG.audit(msg, extra=extra)
193
    LOG.info(msg)
194
195
    # Invoke post run if liveaction status is completed or paused.
196
    if (new_status in action_constants.LIVEACTION_COMPLETED_STATES or
197
            new_status == action_constants.LIVEACTION_STATUS_PAUSED):
198
        runners_utils.invoke_post_run(liveaction)
199
200
    if publish:
201
        LiveAction.publish_status(liveaction)
202
203
    return liveaction
204
205
206
def is_action_canceled_or_canceling(liveaction_id):
207
    liveaction_db = action_utils.get_liveaction_by_id(liveaction_id)
208
    return liveaction_db.status in [action_constants.LIVEACTION_STATUS_CANCELED,
209
                                    action_constants.LIVEACTION_STATUS_CANCELING]
210
211
212
def is_action_paused_or_pausing(liveaction_id):
213
    liveaction_db = action_utils.get_liveaction_by_id(liveaction_id)
214
    return liveaction_db.status in [action_constants.LIVEACTION_STATUS_PAUSED,
215
                                    action_constants.LIVEACTION_STATUS_PAUSING]
216
217
218
def request_cancellation(liveaction, requester):
219
    """
220
    Request cancellation of an action execution.
221
222
    :return: (liveaction, execution)
223
    :rtype: tuple
224
    """
225
    if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELING:
226
        return liveaction
227
228
    if liveaction.status not in action_constants.LIVEACTION_CANCELABLE_STATES:
229
        raise Exception(
230
            'Unable to cancel liveaction "%s" because it is already in a '
231
            'completed state.' % liveaction.id
232
        )
233
234
    result = {
235
        'message': 'Action canceled by user.',
236
        'user': requester
237
    }
238
239
    # Run cancelation sequence for liveaction that is in running state or
240
    # if the liveaction is operating under a workflow.
241
    if ('parent' in liveaction.context or
242
            liveaction.status in action_constants.LIVEACTION_STATUS_RUNNING):
243
        status = action_constants.LIVEACTION_STATUS_CANCELING
244
    else:
245
        status = action_constants.LIVEACTION_STATUS_CANCELED
246
247
    liveaction = update_status(liveaction, status, result=result)
248
249
    execution = ActionExecution.get(liveaction__id=str(liveaction.id))
250
251
    return (liveaction, execution)
252
253
254
def request_pause(liveaction, requester):
255
    """
256
    Request pause for a running action execution.
257
258
    :return: (liveaction, execution)
259
    :rtype: tuple
260
    """
261
    # Validate that the runner type of the action supports pause.
262
    action_db = action_utils.get_action_by_ref(liveaction.action)
263
264
    if not action_db:
265
        raise ValueError(
266
            'Unable to pause liveaction "%s" because the action "%s" '
267
            'is not found.' % (liveaction.id, liveaction.action)
268
        )
269
270
    if action_db.runner_type['name'] not in action_constants.WORKFLOW_RUNNER_TYPES:
271
        raise runner_exc.InvalidActionRunnerOperationError(
272
            'Unable to pause liveaction "%s" because it is not supported by the '
273
            '"%s" runner.' % (liveaction.id, action_db.runner_type['name'])
274
        )
275
276
    if (liveaction.status == action_constants.LIVEACTION_STATUS_PAUSING or
277
            liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED):
278
        execution = ActionExecution.get(liveaction__id=str(liveaction.id))
279
        return (liveaction, execution)
280
281
    if liveaction.status != action_constants.LIVEACTION_STATUS_RUNNING:
282
        raise runner_exc.UnexpectedActionExecutionStatusError(
283
            'Unable to pause liveaction "%s" because it is not in a running state.'
284
            % liveaction.id
285
        )
286
287
    liveaction = update_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
288
289
    execution = ActionExecution.get(liveaction__id=str(liveaction.id))
290
291
    return (liveaction, execution)
292
293
294
def request_resume(liveaction, requester):
295
    """
296
    Request resume for a paused action execution.
297
298
    :return: (liveaction, execution)
299
    :rtype: tuple
300
    """
301
    # Validate that the runner type of the action supports pause.
302
    action_db = action_utils.get_action_by_ref(liveaction.action)
303
304
    if not action_db:
305
        raise ValueError(
306
            'Unable to resume liveaction "%s" because the action "%s" '
307
            'is not found.' % (liveaction.id, liveaction.action)
308
        )
309
310
    if action_db.runner_type['name'] not in action_constants.WORKFLOW_RUNNER_TYPES:
311
        raise runner_exc.InvalidActionRunnerOperationError(
312
            'Unable to resume liveaction "%s" because it is not supported by the '
313
            '"%s" runner.' % (liveaction.id, action_db.runner_type['name'])
314
        )
315
316
    if liveaction.status == action_constants.LIVEACTION_STATUS_RUNNING:
317
        execution = ActionExecution.get(liveaction__id=str(liveaction.id))
318
        return (liveaction, execution)
319
320
    if liveaction.status != action_constants.LIVEACTION_STATUS_PAUSED:
321
        raise runner_exc.UnexpectedActionExecutionStatusError(
322
            'Unable to resume liveaction "%s" because it is not in a paused state.'
323
            % liveaction.id
324
        )
325
326
    liveaction = update_status(liveaction, action_constants.LIVEACTION_STATUS_RESUMING)
327
328
    execution = ActionExecution.get(liveaction__id=str(liveaction.id))
329
330
    return (liveaction, execution)
331
332
333
def get_root_liveaction(liveaction_db):
334
    """Recursively ascends until the root liveaction is found
335
336
    Useful for finding an original parent workflow. Pass in
337
    any LiveActionDB instance, and this function will eventually
338
    return the top-most liveaction, even if the two are one and
339
    the same
340
341
    :param liveaction_db: The LiveActionDB instance for which to find
342
                          the root parent
343
    :rtype: LiveActionDB
344
    """
345
346
    parent = liveaction_db.context.get("parent")
347
    if parent:
348
        parent_execution = ActionExecution.get(id=parent['execution_id'])
349
        parent_liveaction = LiveAction.get(id=parent_execution.liveaction['id'])
350
        return get_root_liveaction(parent_liveaction)
351
    else:
352
        return liveaction_db
353
354
355
def store_execution_output_data(execution_db, action_db, data, output_type='output',
356
                                timestamp=None):
357
    """
358
    Store output from an execution as a new document in the collection.
359
    """
360
    execution_id = str(execution_db.id)
361
    action_ref = action_db.ref
362
    runner_ref = getattr(action_db, 'runner_type', {}).get('name', 'unknown')
363
    timestamp = timestamp or date_utils.get_datetime_utc_now()
364
365
    output_db = ActionExecutionOutputDB(execution_id=execution_id,
366
                                        action_ref=action_ref,
367
                                        runner_ref=runner_ref,
368
                                        timestamp=timestamp,
369
                                        output_type=output_type,
370
                                        data=data)
371
    output_db = ActionExecutionOutput.add_or_update(output_db, publish=True,
372
                                                    dispatch_trigger=False)
373
374
    return output_db
375
376
377
def is_children_active(liveaction_id):
378
    execution_db = ActionExecution.get(liveaction__id=str(liveaction_id))
379
380
    if execution_db.runner['name'] not in action_constants.WORKFLOW_RUNNER_TYPES:
381
        return False
382
383
    children_execution_dbs = ActionExecution.query(parent=str(execution_db.id))
384
385
    inactive_statuses = (
386
        action_constants.LIVEACTION_COMPLETED_STATES +
387
        [action_constants.LIVEACTION_STATUS_PAUSED]
388
    )
389
390
    completed = [
391
        child_exec_db.status in inactive_statuses
392
        for child_exec_db in children_execution_dbs
393
    ]
394
395
    return (not all(completed))
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after return.
Loading history...
396
397
398
def _cleanup_liveaction(liveaction):
399
    try:
400
        LiveAction.delete(liveaction)
401
    except:
402
        LOG.exception('Failed cleaning up LiveAction: %s.', liveaction)
403
        pass
0 ignored issues
show
Unused Code introduced by
Unnecessary pass statement
Loading history...
404
405
406
def _is_notify_empty(notify_db):
407
    """
408
    notify_db is considered to be empty if notify_db is None and neither
409
    of on_complete, on_success and on_failure have values.
410
    """
411
    if not notify_db:
412
        return True
413
    return not (notify_db.on_complete or notify_db.on_success or notify_db.on_failure)
414