Passed
Pull Request — master (#3640)
by Lakshmi
06:19
created

request_pause()   B

Complexity

Conditions 6

Size

Total Lines 38

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
dl 0
loc 38
rs 7.5384
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import six
17
18
from st2common import log as logging
19
from st2common.constants import action as action_constants
20
from st2common.exceptions import actionrunner as runner_exc
21
from st2common.exceptions import db as db_exc
22
from st2common.exceptions import trace as trace_exc
23
from st2common.persistence.liveaction import LiveAction
24
from st2common.persistence.execution import ActionExecution
25
from st2common.services import executions
26
from st2common.services import trace as trace_service
0 ignored issues
show
Bug introduced by
The name trace does not seem to exist in module st2common.services.
Loading history...
27
from st2common.util import date as date_utils
28
from st2common.util import action_db as action_utils
29
from st2common.util import schema as util_schema
30
31
32
__all__ = [
33
    'request',
34
    'create_request',
35
    'publish_request',
36
    'is_action_canceled_or_canceling'
37
]
38
39
LOG = logging.getLogger(__name__)
40
41
42
def _get_immutable_params(parameters):
43
    if not parameters:
44
        return []
45
    return [k for k, v in six.iteritems(parameters) if v.get('immutable', False)]
46
47
48
def create_request(liveaction):
49
    """
50
    Create an action execution.
51
52
    :return: (liveaction, execution)
53
    :rtype: tuple
54
    """
55
    # Use the user context from the parent action execution. Subtasks in a workflow
56
    # action can be invoked by a system user and so we want to use the user context
57
    # from the original workflow action.
58
    parent_context = executions.get_parent_context(liveaction)
59
    if parent_context:
60
        parent_user = parent_context.get('user', None)
61
        if parent_user:
62
            liveaction.context['user'] = parent_user
63
64
    # Validate action.
65
    action_db = action_utils.get_action_by_ref(liveaction.action)
66
    if not action_db:
67
        raise ValueError('Action "%s" cannot be found.' % liveaction.action)
68
    if not action_db.enabled:
69
        raise ValueError('Unable to execute. Action "%s" is disabled.' % liveaction.action)
70
71
    runnertype_db = action_utils.get_runnertype_by_name(action_db.runner_type['name'])
72
73
    if not hasattr(liveaction, 'parameters'):
74
        liveaction.parameters = dict()
75
76
    # Validate action parameters.
77
    schema = util_schema.get_schema_for_action_parameters(action_db)
78
    validator = util_schema.get_validator()
79
    util_schema.validate(liveaction.parameters, schema, validator, use_default=True,
80
                         allow_default_none=True)
81
82
    # validate that no immutable params are being overriden. Although possible to
83
    # ignore the override it is safer to inform the user to avoid surprises.
84
    immutables = _get_immutable_params(action_db.parameters)
85
    immutables.extend(_get_immutable_params(runnertype_db.runner_parameters))
86
    overridden_immutables = [p for p in six.iterkeys(liveaction.parameters) if p in immutables]
87
    if len(overridden_immutables) > 0:
88
        raise ValueError('Override of immutable parameter(s) %s is unsupported.'
89
                         % str(overridden_immutables))
90
91
    # Set notification settings for action.
92
    # XXX: There are cases when we don't want notifications to be sent for a particular
93
    # execution. So we should look at liveaction.parameters['notify']
94
    # and not set liveaction.notify.
95
    if not _is_notify_empty(action_db.notify):
96
        liveaction.notify = action_db.notify
97
98
    # Write to database and send to message queue.
99
    liveaction.status = action_constants.LIVEACTION_STATUS_REQUESTED
100
    liveaction.start_timestamp = date_utils.get_datetime_utc_now()
101
102
    # Set the "action_is_workflow" attribute
103
    liveaction.action_is_workflow = action_db.is_workflow()
104
105
    # Publish creation after both liveaction and actionexecution are created.
106
    liveaction = LiveAction.add_or_update(liveaction, publish=False)
107
108
    # Get trace_db if it exists. This could throw. If it throws, we have to cleanup
109
    # liveaction object so we don't see things in requested mode.
110
    trace_db = None
111
    try:
112
        _, trace_db = trace_service.get_trace_db_by_live_action(liveaction)
113
    except db_exc.StackStormDBObjectNotFoundError as e:
114
        _cleanup_liveaction(liveaction)
115
        raise trace_exc.TraceNotFoundException(str(e))
116
117
    execution = executions.create_execution_object(liveaction, publish=False)
118
119
    if trace_db:
120
        trace_service.add_or_update_given_trace_db(
121
            trace_db=trace_db,
122
            action_executions=[
123
                trace_service.get_trace_component_for_action_execution(execution, liveaction)
124
            ])
125
126
    return liveaction, execution
127
128
129
def publish_request(liveaction, execution):
130
    """
131
    Publish an action execution.
132
133
    :return: (liveaction, execution)
134
    :rtype: tuple
135
    """
136
    # Assume that this is a creation.
137
    LiveAction.publish_create(liveaction)
138
    LiveAction.publish_status(liveaction)
139
    ActionExecution.publish_create(execution)
140
141
    extra = {'liveaction_db': liveaction, 'execution_db': execution}
142
    LOG.audit('Action execution requested. LiveAction.id=%s, ActionExecution.id=%s' %
143
              (liveaction.id, execution.id), extra=extra)
144
145
    return liveaction, execution
146
147
148
def request(liveaction):
149
    liveaction, execution = create_request(liveaction)
150
    liveaction, execution = publish_request(liveaction, execution)
151
152
    return liveaction, execution
153
154
155
def update_status(liveaction, new_status, result=None, publish=True):
156
    if liveaction.status == new_status:
157
        return liveaction
158
159
    old_status = liveaction.status
160
161
    liveaction = action_utils.update_liveaction_status(
162
        status=new_status, result=result, liveaction_id=liveaction.id, publish=False)
163
164
    action_execution = executions.update_execution(liveaction)
165
166
    msg = ('The status of action execution is changed from %s to %s. '
167
           '<LiveAction.id=%s, ActionExecution.id=%s>' % (old_status,
168
           new_status, liveaction.id, action_execution.id))
169
170
    extra = {
171
        'action_execution_db': action_execution,
172
        'liveaction_db': liveaction
173
    }
174
175
    LOG.audit(msg, extra=extra)
176
    LOG.info(msg)
177
178
    if publish:
179
        LiveAction.publish_status(liveaction)
180
181
    return liveaction
182
183
184
def is_action_canceled_or_canceling(liveaction_id):
185
    liveaction_db = action_utils.get_liveaction_by_id(liveaction_id)
186
    return liveaction_db.status in [action_constants.LIVEACTION_STATUS_CANCELED,
187
                                    action_constants.LIVEACTION_STATUS_CANCELING]
188
189
190
def is_action_paused_or_pausing(liveaction_id):
191
    liveaction_db = action_utils.get_liveaction_by_id(liveaction_id)
192
    return liveaction_db.status in [action_constants.LIVEACTION_STATUS_PAUSED,
193
                                    action_constants.LIVEACTION_STATUS_PAUSING]
194
195
196
def request_cancellation(liveaction, requester):
197
    """
198
    Request cancellation of an action execution.
199
200
    :return: (liveaction, execution)
201
    :rtype: tuple
202
    """
203
    if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELING:
204
        return liveaction
205
206
    if liveaction.status not in action_constants.LIVEACTION_CANCELABLE_STATES:
207
        raise Exception(
208
            'Unable to cancel liveaction "%s" because it is already in a '
209
            'completed state.' % liveaction.id
210
        )
211
212
    result = {
213
        'message': 'Action canceled by user.',
214
        'user': requester
215
    }
216
217
    # Run cancelation sequence for liveaction that is in running state or
218
    # if the liveaction is operating under a workflow.
219
    if ('parent' in liveaction.context or
220
            liveaction.status in action_constants.LIVEACTION_STATUS_RUNNING):
221
        status = action_constants.LIVEACTION_STATUS_CANCELING
222
    else:
223
        status = action_constants.LIVEACTION_STATUS_CANCELED
224
225
    liveaction = update_status(liveaction, status, result=result)
226
227
    execution = ActionExecution.get(liveaction__id=str(liveaction.id))
228
229
    return (liveaction, execution)
230
231
232
def request_pause(liveaction, requester):
233
    """
234
    Request pause for a running action execution.
235
236
    :return: (liveaction, execution)
237
    :rtype: tuple
238
    """
239
    # Validate that the runner type of the action supports pause.
240
    action_db = action_utils.get_action_by_ref(liveaction.action)
241
242
    if not action_db:
243
        raise ValueError(
244
            'Unable to pause liveaction "%s" because the action "%s" '
245
            'is not found.' % (liveaction.id, liveaction.action)
246
        )
247
248
    if action_db.runner_type['name'] not in action_constants.WORKFLOW_RUNNER_TYPES:
249
        raise runner_exc.InvalidActionRunnerOperationError(
250
            'Unable to pause liveaction "%s" because it is not supported by the '
251
            '"%s" runner.' % (liveaction.id, action_db.runner_type['name'])
252
        )
253
254
    if (liveaction.status == action_constants.LIVEACTION_STATUS_PAUSING or
255
            liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED):
256
        execution = ActionExecution.get(liveaction__id=str(liveaction.id))
257
        return (liveaction, execution)
258
259
    if liveaction.status != action_constants.LIVEACTION_STATUS_RUNNING:
260
        raise runner_exc.UnexpectedActionExecutionStatusError(
261
            'Unable to pause liveaction "%s" because it is not in a running state.'
262
            % liveaction.id
263
        )
264
265
    liveaction = update_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
266
267
    execution = ActionExecution.get(liveaction__id=str(liveaction.id))
268
269
    return (liveaction, execution)
270
271
272
def request_resume(liveaction, requester):
273
    """
274
    Request resume for a paused action execution.
275
276
    :return: (liveaction, execution)
277
    :rtype: tuple
278
    """
279
    # Validate that the runner type of the action supports pause.
280
    action_db = action_utils.get_action_by_ref(liveaction.action)
281
282
    if not action_db:
283
        raise ValueError(
284
            'Unable to resume liveaction "%s" because the action "%s" '
285
            'is not found.' % (liveaction.id, liveaction.action)
286
        )
287
288
    if action_db.runner_type['name'] not in action_constants.WORKFLOW_RUNNER_TYPES:
289
        raise runner_exc.InvalidActionRunnerOperationError(
290
            'Unable to resume liveaction "%s" because it is not supported by the '
291
            '"%s" runner.' % (liveaction.id, action_db.runner_type['name'])
292
        )
293
294
    if liveaction.status == action_constants.LIVEACTION_STATUS_RUNNING:
295
        execution = ActionExecution.get(liveaction__id=str(liveaction.id))
296
        return (liveaction, execution)
297
298
    if liveaction.status != action_constants.LIVEACTION_STATUS_PAUSED:
299
        raise runner_exc.UnexpectedActionExecutionStatusError(
300
            'Unable to resume liveaction "%s" because it is not in a paused state.'
301
            % liveaction.id
302
        )
303
304
    liveaction = update_status(liveaction, action_constants.LIVEACTION_STATUS_RESUMING)
305
306
    execution = ActionExecution.get(liveaction__id=str(liveaction.id))
307
308
    return (liveaction, execution)
309
310
311
def _cleanup_liveaction(liveaction):
312
    try:
313
        LiveAction.delete(liveaction)
314
    except:
315
        LOG.exception('Failed cleaning up LiveAction: %s.', liveaction)
316
        pass
0 ignored issues
show
Unused Code introduced by
Unnecessary pass statement
Loading history...
317
318
319
def _is_notify_empty(notify_db):
320
    """
321
    notify_db is considered to be empty if notify_db is None and neither
322
    of on_complete, on_success and on_failure have values.
323
    """
324
    if not notify_db:
325
        return True
326
    return not (notify_db.on_complete or notify_db.on_success or notify_db.on_failure)
327