Passed
Push — master ( ac1734...fff02f )
by
unknown
03:05
created

get_root_liveaction()   A

Complexity

Conditions 2

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 20
rs 9.4285
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import six
17
18
from st2common import log as logging
19
from st2common.constants import action as action_constants
20
from st2common.exceptions import actionrunner as runner_exc
21
from st2common.exceptions import db as db_exc
22
from st2common.exceptions import trace as trace_exc
23
from st2common.persistence.liveaction import LiveAction
24
from st2common.persistence.execution import ActionExecution
25
from st2common.persistence.execution import ActionExecutionOutput
26
from st2common.models.db.execution import ActionExecutionOutputDB
27
from st2common.services import executions
28
from st2common.services import trace as trace_service
0 ignored issues
show
Bug introduced by
The name trace does not seem to exist in module st2common.services.
Loading history...
29
from st2common.util import date as date_utils
30
from st2common.util import action_db as action_utils
31
from st2common.util import schema as util_schema
32
33
34
__all__ = [
35
    'request',
36
    'create_request',
37
    'publish_request',
38
    'is_action_canceled_or_canceling',
39
40
    'request_pause',
41
    'request_resume',
42
43
    'store_execution_output_data',
44
]
45
46
LOG = logging.getLogger(__name__)
47
48
49
def _get_immutable_params(parameters):
50
    if not parameters:
51
        return []
52
    return [k for k, v in six.iteritems(parameters) if v.get('immutable', False)]
53
54
55
def create_request(liveaction):
56
    """
57
    Create an action execution.
58
59
    :return: (liveaction, execution)
60
    :rtype: tuple
61
    """
62
    # Use the user context from the parent action execution. Subtasks in a workflow
63
    # action can be invoked by a system user and so we want to use the user context
64
    # from the original workflow action.
65
    parent_context = executions.get_parent_context(liveaction)
66
    if parent_context:
67
        parent_user = parent_context.get('user', None)
68
        if parent_user:
69
            liveaction.context['user'] = parent_user
70
71
    # Validate action.
72
    action_db = action_utils.get_action_by_ref(liveaction.action)
73
    if not action_db:
74
        raise ValueError('Action "%s" cannot be found.' % liveaction.action)
75
    if not action_db.enabled:
76
        raise ValueError('Unable to execute. Action "%s" is disabled.' % liveaction.action)
77
78
    runnertype_db = action_utils.get_runnertype_by_name(action_db.runner_type['name'])
79
80
    if not hasattr(liveaction, 'parameters'):
81
        liveaction.parameters = dict()
82
83
    # Validate action parameters.
84
    schema = util_schema.get_schema_for_action_parameters(action_db)
85
    validator = util_schema.get_validator()
86
    util_schema.validate(liveaction.parameters, schema, validator, use_default=True,
87
                         allow_default_none=True)
88
89
    # validate that no immutable params are being overriden. Although possible to
90
    # ignore the override it is safer to inform the user to avoid surprises.
91
    immutables = _get_immutable_params(action_db.parameters)
92
    immutables.extend(_get_immutable_params(runnertype_db.runner_parameters))
93
    overridden_immutables = [p for p in six.iterkeys(liveaction.parameters) if p in immutables]
94
    if len(overridden_immutables) > 0:
95
        raise ValueError('Override of immutable parameter(s) %s is unsupported.'
96
                         % str(overridden_immutables))
97
98
    # Set notification settings for action.
99
    # XXX: There are cases when we don't want notifications to be sent for a particular
100
    # execution. So we should look at liveaction.parameters['notify']
101
    # and not set liveaction.notify.
102
    if not _is_notify_empty(action_db.notify):
103
        liveaction.notify = action_db.notify
104
105
    # Write to database and send to message queue.
106
    liveaction.status = action_constants.LIVEACTION_STATUS_REQUESTED
107
    liveaction.start_timestamp = date_utils.get_datetime_utc_now()
108
109
    # Set the "action_is_workflow" attribute
110
    liveaction.action_is_workflow = action_db.is_workflow()
111
112
    # Publish creation after both liveaction and actionexecution are created.
113
    liveaction = LiveAction.add_or_update(liveaction, publish=False)
114
115
    # Get trace_db if it exists. This could throw. If it throws, we have to cleanup
116
    # liveaction object so we don't see things in requested mode.
117
    trace_db = None
118
    try:
119
        _, trace_db = trace_service.get_trace_db_by_live_action(liveaction)
120
    except db_exc.StackStormDBObjectNotFoundError as e:
121
        _cleanup_liveaction(liveaction)
122
        raise trace_exc.TraceNotFoundException(str(e))
123
124
    execution = executions.create_execution_object(liveaction, publish=False)
125
126
    if trace_db:
127
        trace_service.add_or_update_given_trace_db(
128
            trace_db=trace_db,
129
            action_executions=[
130
                trace_service.get_trace_component_for_action_execution(execution, liveaction)
131
            ])
132
133
    return liveaction, execution
134
135
136
def publish_request(liveaction, execution):
137
    """
138
    Publish an action execution.
139
140
    :return: (liveaction, execution)
141
    :rtype: tuple
142
    """
143
    # Assume that this is a creation.
144
    LiveAction.publish_create(liveaction)
145
    LiveAction.publish_status(liveaction)
146
    ActionExecution.publish_create(execution)
147
148
    extra = {'liveaction_db': liveaction, 'execution_db': execution}
149
    LOG.audit('Action execution requested. LiveAction.id=%s, ActionExecution.id=%s' %
150
              (liveaction.id, execution.id), extra=extra)
151
152
    return liveaction, execution
153
154
155
def request(liveaction):
156
    liveaction, execution = create_request(liveaction)
157
    liveaction, execution = publish_request(liveaction, execution)
158
159
    return liveaction, execution
160
161
162
def update_status(liveaction, new_status, result=None, publish=True):
163
    if liveaction.status == new_status:
164
        return liveaction
165
166
    old_status = liveaction.status
167
168
    liveaction = action_utils.update_liveaction_status(
169
        status=new_status, result=result, liveaction_id=liveaction.id, publish=False)
170
171
    action_execution = executions.update_execution(liveaction)
172
173
    msg = ('The status of action execution is changed from %s to %s. '
174
           '<LiveAction.id=%s, ActionExecution.id=%s>' % (old_status,
175
           new_status, liveaction.id, action_execution.id))
176
177
    extra = {
178
        'action_execution_db': action_execution,
179
        'liveaction_db': liveaction
180
    }
181
182
    LOG.audit(msg, extra=extra)
183
    LOG.info(msg)
184
185
    if publish:
186
        LiveAction.publish_status(liveaction)
187
188
    return liveaction
189
190
191
def is_action_canceled_or_canceling(liveaction_id):
192
    liveaction_db = action_utils.get_liveaction_by_id(liveaction_id)
193
    return liveaction_db.status in [action_constants.LIVEACTION_STATUS_CANCELED,
194
                                    action_constants.LIVEACTION_STATUS_CANCELING]
195
196
197
def is_action_paused_or_pausing(liveaction_id):
198
    liveaction_db = action_utils.get_liveaction_by_id(liveaction_id)
199
    return liveaction_db.status in [action_constants.LIVEACTION_STATUS_PAUSED,
200
                                    action_constants.LIVEACTION_STATUS_PAUSING]
201
202
203
def request_cancellation(liveaction, requester):
204
    """
205
    Request cancellation of an action execution.
206
207
    :return: (liveaction, execution)
208
    :rtype: tuple
209
    """
210
    if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELING:
211
        return liveaction
212
213
    if liveaction.status not in action_constants.LIVEACTION_CANCELABLE_STATES:
214
        raise Exception(
215
            'Unable to cancel liveaction "%s" because it is already in a '
216
            'completed state.' % liveaction.id
217
        )
218
219
    result = {
220
        'message': 'Action canceled by user.',
221
        'user': requester
222
    }
223
224
    # Run cancelation sequence for liveaction that is in running state or
225
    # if the liveaction is operating under a workflow.
226
    if ('parent' in liveaction.context or
227
            liveaction.status in action_constants.LIVEACTION_STATUS_RUNNING):
228
        status = action_constants.LIVEACTION_STATUS_CANCELING
229
    else:
230
        status = action_constants.LIVEACTION_STATUS_CANCELED
231
232
    liveaction = update_status(liveaction, status, result=result)
233
234
    execution = ActionExecution.get(liveaction__id=str(liveaction.id))
235
236
    return (liveaction, execution)
237
238
239
def request_pause(liveaction, requester):
240
    """
241
    Request pause for a running action execution.
242
243
    :return: (liveaction, execution)
244
    :rtype: tuple
245
    """
246
    # Validate that the runner type of the action supports pause.
247
    action_db = action_utils.get_action_by_ref(liveaction.action)
248
249
    if not action_db:
250
        raise ValueError(
251
            'Unable to pause liveaction "%s" because the action "%s" '
252
            'is not found.' % (liveaction.id, liveaction.action)
253
        )
254
255
    if action_db.runner_type['name'] not in action_constants.WORKFLOW_RUNNER_TYPES:
256
        raise runner_exc.InvalidActionRunnerOperationError(
257
            'Unable to pause liveaction "%s" because it is not supported by the '
258
            '"%s" runner.' % (liveaction.id, action_db.runner_type['name'])
259
        )
260
261
    if (liveaction.status == action_constants.LIVEACTION_STATUS_PAUSING or
262
            liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED):
263
        execution = ActionExecution.get(liveaction__id=str(liveaction.id))
264
        return (liveaction, execution)
265
266
    if liveaction.status != action_constants.LIVEACTION_STATUS_RUNNING:
267
        raise runner_exc.UnexpectedActionExecutionStatusError(
268
            'Unable to pause liveaction "%s" because it is not in a running state.'
269
            % liveaction.id
270
        )
271
272
    liveaction = update_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
273
274
    execution = ActionExecution.get(liveaction__id=str(liveaction.id))
275
276
    return (liveaction, execution)
277
278
279
def request_resume(liveaction, requester):
280
    """
281
    Request resume for a paused action execution.
282
283
    :return: (liveaction, execution)
284
    :rtype: tuple
285
    """
286
    # Validate that the runner type of the action supports pause.
287
    action_db = action_utils.get_action_by_ref(liveaction.action)
288
289
    if not action_db:
290
        raise ValueError(
291
            'Unable to resume liveaction "%s" because the action "%s" '
292
            'is not found.' % (liveaction.id, liveaction.action)
293
        )
294
295
    if action_db.runner_type['name'] not in action_constants.WORKFLOW_RUNNER_TYPES:
296
        raise runner_exc.InvalidActionRunnerOperationError(
297
            'Unable to resume liveaction "%s" because it is not supported by the '
298
            '"%s" runner.' % (liveaction.id, action_db.runner_type['name'])
299
        )
300
301
    if liveaction.status == action_constants.LIVEACTION_STATUS_RUNNING:
302
        execution = ActionExecution.get(liveaction__id=str(liveaction.id))
303
        return (liveaction, execution)
304
305
    if liveaction.status != action_constants.LIVEACTION_STATUS_PAUSED:
306
        raise runner_exc.UnexpectedActionExecutionStatusError(
307
            'Unable to resume liveaction "%s" because it is not in a paused state.'
308
            % liveaction.id
309
        )
310
311
    liveaction = update_status(liveaction, action_constants.LIVEACTION_STATUS_RESUMING)
312
313
    execution = ActionExecution.get(liveaction__id=str(liveaction.id))
314
315
    return (liveaction, execution)
316
317
318
def get_root_liveaction(liveaction_db):
319
    """Recursively ascends until the root liveaction is found
320
321
    Useful for finding an original parent workflow. Pass in
322
    any LiveActionDB instance, and this function will eventually
323
    return the top-most liveaction, even if the two are one and
324
    the same
325
326
    :param liveaction_db: The LiveActionDB instance for which to find
327
                          the root parent
328
    :rtype: LiveActionDB
329
    """
330
331
    parent = liveaction_db.context.get("parent")
332
    if parent:
333
        parent_execution = ActionExecution.get(id=parent['execution_id'])
334
        parent_liveaction = LiveAction.get(id=parent_execution.liveaction['id'])
335
        return get_root_liveaction(parent_liveaction)
336
    else:
337
        return liveaction_db
338
339
340
def store_execution_output_data(execution_db, action_db, data, output_type='output',
341
                                timestamp=None):
342
    """
343
    Store output from an execution as a new document in the collection.
344
    """
345
    execution_id = str(execution_db.id)
346
    action_ref = action_db.ref
347
    runner_ref = getattr(action_db, 'runner_type', {}).get('name', 'unknown')
348
    timestamp = timestamp or date_utils.get_datetime_utc_now()
349
350
    output_db = ActionExecutionOutputDB(execution_id=execution_id,
351
                                        action_ref=action_ref,
352
                                        runner_ref=runner_ref,
353
                                        timestamp=timestamp,
354
                                        output_type=output_type,
355
                                        data=data)
356
    output_db = ActionExecutionOutput.add_or_update(output_db, publish=True,
357
                                                    dispatch_trigger=False)
358
359
    return output_db
360
361
362
def _cleanup_liveaction(liveaction):
363
    try:
364
        LiveAction.delete(liveaction)
365
    except:
366
        LOG.exception('Failed cleaning up LiveAction: %s.', liveaction)
367
        pass
0 ignored issues
show
Unused Code introduced by
Unnecessary pass statement
Loading history...
368
369
370
def _is_notify_empty(notify_db):
371
    """
372
    notify_db is considered to be empty if notify_db is None and neither
373
    of on_complete, on_success and on_failure have values.
374
    """
375
    if not notify_db:
376
        return True
377
    return not (notify_db.on_complete or notify_db.on_success or notify_db.on_failure)
378