Passed
Pull Request — master (#3507)
by W
07:37 queued 01:55
created

request_resume()   B

Complexity

Conditions 5

Size

Total Lines 37

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
dl 0
loc 37
rs 8.0894
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import six
17
18
from st2common import log as logging
19
from st2common.constants import action as action_constants
20
from st2common.exceptions import actionrunner as runner_exc
21
from st2common.exceptions import db as db_exc
22
from st2common.exceptions import trace as trace_exc
23
from st2common.persistence.liveaction import LiveAction
24
from st2common.persistence.execution import ActionExecution
25
from st2common.services import executions
26
from st2common.services import trace as trace_service
0 ignored issues
show
Bug introduced by
The name trace does not seem to exist in module st2common.services.
Loading history...
27
from st2common.util import date as date_utils
28
from st2common.util import action_db as action_utils
29
from st2common.util import schema as util_schema
30
31
32
__all__ = [
33
    'request',
34
    'create_request',
35
    'publish_request',
36
    'is_action_canceled_or_canceling'
37
]
38
39
LOG = logging.getLogger(__name__)
40
41
42
def _get_immutable_params(parameters):
43
    if not parameters:
44
        return []
45
    return [k for k, v in six.iteritems(parameters) if v.get('immutable', False)]
46
47
48
def create_request(liveaction):
49
    """
50
    Create an action execution.
51
52
    :return: (liveaction, execution)
53
    :rtype: tuple
54
    """
55
    # Use the user context from the parent action execution. Subtasks in a workflow
56
    # action can be invoked by a system user and so we want to use the user context
57
    # from the original workflow action.
58
    parent_context = executions.get_parent_context(liveaction)
59
    if parent_context:
60
        parent_user = parent_context.get('user', None)
61
        if parent_user:
62
            liveaction.context['user'] = parent_user
63
64
    # Validate action.
65
    action_db = action_utils.get_action_by_ref(liveaction.action)
66
    if not action_db:
67
        raise ValueError('Action "%s" cannot be found.' % liveaction.action)
68
    if not action_db.enabled:
69
        raise ValueError('Unable to execute. Action "%s" is disabled.' % liveaction.action)
70
71
    runnertype_db = action_utils.get_runnertype_by_name(action_db.runner_type['name'])
72
73
    if not hasattr(liveaction, 'parameters'):
74
        liveaction.parameters = dict()
75
76
    # Validate action parameters.
77
    schema = util_schema.get_schema_for_action_parameters(action_db)
78
    validator = util_schema.get_validator()
79
    util_schema.validate(liveaction.parameters, schema, validator, use_default=True,
80
                         allow_default_none=True)
81
82
    # validate that no immutable params are being overriden. Although possible to
83
    # ignore the override it is safer to inform the user to avoid surprises.
84
    immutables = _get_immutable_params(action_db.parameters)
85
    immutables.extend(_get_immutable_params(runnertype_db.runner_parameters))
86
    overridden_immutables = [p for p in six.iterkeys(liveaction.parameters) if p in immutables]
87
    if len(overridden_immutables) > 0:
88
        raise ValueError('Override of immutable parameter(s) %s is unsupported.'
89
                         % str(overridden_immutables))
90
91
    # Set notification settings for action.
92
    # XXX: There are cases when we don't want notifications to be sent for a particular
93
    # execution. So we should look at liveaction.parameters['notify']
94
    # and not set liveaction.notify.
95
    if not _is_notify_empty(action_db.notify):
96
        liveaction.notify = action_db.notify
97
98
    # Write to database and send to message queue.
99
    liveaction.status = action_constants.LIVEACTION_STATUS_REQUESTED
100
    liveaction.start_timestamp = date_utils.get_datetime_utc_now()
101
102
    # Set the "action_is_workflow" attribute
103
    liveaction.action_is_workflow = action_db.is_workflow()
104
105
    # Publish creation after both liveaction and actionexecution are created.
106
    liveaction = LiveAction.add_or_update(liveaction, publish=False)
107
108
    # Get trace_db if it exists. This could throw. If it throws, we have to cleanup
109
    # liveaction object so we don't see things in requested mode.
110
    trace_db = None
111
    try:
112
        _, trace_db = trace_service.get_trace_db_by_live_action(liveaction)
113
    except db_exc.StackStormDBObjectNotFoundError as e:
114
        _cleanup_liveaction(liveaction)
115
        raise trace_exc.TraceNotFoundException(str(e))
116
117
    execution = executions.create_execution_object(liveaction, publish=False)
118
119
    if trace_db:
120
        trace_service.add_or_update_given_trace_db(
121
            trace_db=trace_db,
122
            action_executions=[
123
                trace_service.get_trace_component_for_action_execution(execution, liveaction)
124
            ])
125
126
    return liveaction, execution
127
128
129
def publish_request(liveaction, execution):
130
    """
131
    Publish an action execution.
132
133
    :return: (liveaction, execution)
134
    :rtype: tuple
135
    """
136
    # Assume that this is a creation.
137
    LiveAction.publish_create(liveaction)
138
    LiveAction.publish_status(liveaction)
139
    ActionExecution.publish_create(execution)
140
141
    extra = {'liveaction_db': liveaction, 'execution_db': execution}
142
    LOG.audit('Action execution requested. LiveAction.id=%s, ActionExecution.id=%s' %
143
              (liveaction.id, execution.id), extra=extra)
144
145
    return liveaction, execution
146
147
148
def request(liveaction):
149
    liveaction, execution = create_request(liveaction)
150
    liveaction, execution = publish_request(liveaction, execution)
151
152
    return liveaction, execution
153
154
155
def update_status(liveaction, new_status, result=None, publish=True):
156
    if liveaction.status == new_status:
157
        return liveaction
158
159
    old_status = liveaction.status
160
161
    liveaction = action_utils.update_liveaction_status(
162
        status=new_status, result=result, liveaction_id=liveaction.id, publish=False)
163
164
    action_execution = executions.update_execution(liveaction)
165
166
    msg = ('The status of action execution is changed from %s to %s. '
167
           '<LiveAction.id=%s, ActionExecution.id=%s>' % (old_status,
168
           new_status, liveaction.id, action_execution.id))
169
170
    extra = {
171
        'action_execution_db': action_execution,
172
        'liveaction_db': liveaction
173
    }
174
175
    LOG.audit(msg, extra=extra)
176
    LOG.info(msg)
177
178
    if publish:
179
        LiveAction.publish_status(liveaction)
180
181
    return liveaction
182
183
184
def is_action_canceled_or_canceling(liveaction_id):
185
    liveaction_db = action_utils.get_liveaction_by_id(liveaction_id)
186
    return liveaction_db.status in [action_constants.LIVEACTION_STATUS_CANCELED,
187
                                    action_constants.LIVEACTION_STATUS_CANCELING]
188
189
190
def request_cancellation(liveaction, requester):
191
    """
192
    Request cancellation of an action execution.
193
194
    :return: (liveaction, execution)
195
    :rtype: tuple
196
    """
197
    if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELING:
198
        return liveaction
199
200
    if liveaction.status not in action_constants.LIVEACTION_CANCELABLE_STATES:
201
        raise Exception(
202
            'Unable to cancel liveaction "%s" because it is already in a '
203
            'completed state.' % liveaction.id
204
        )
205
206
    result = {
207
        'message': 'Action canceled by user.',
208
        'user': requester
209
    }
210
211
    # There is real work only when liveaction is still running.
212
    status = (action_constants.LIVEACTION_STATUS_CANCELING
213
              if liveaction.status == action_constants.LIVEACTION_STATUS_RUNNING
214
              else action_constants.LIVEACTION_STATUS_CANCELED)
215
216
    update_status(liveaction, status, result=result)
217
218
    execution = ActionExecution.get(liveaction__id=str(liveaction.id))
219
220
    return (liveaction, execution)
221
222
223
def request_pause(liveaction, requester):
224
    """
225
    Request pause for a running action execution.
226
227
    :return: (liveaction, execution)
228
    :rtype: tuple
229
    """
230
    # Validate that the runner type of the action supports pause.
231
    action_db = action_utils.get_action_by_ref(liveaction.action)
232
233
    if not action_db:
234
        raise ValueError(
235
            'Unable to pause liveaction "%s" because the action "%s" '
236
            'is not found.' % (liveaction.id, liveaction.action)
237
        )
238
239
    if action_db.runner_type['name'] not in action_constants.WORKFLOW_RUNNER_TYPES:
240
        raise runner_exc.InvalidActionRunnerOperationError(
241
            'Unable to pause liveaction "%s" because it is not supported by the '
242
            '"%s" runner.' % (liveaction.id, action_db.runner_type['name'])
243
        )
244
245
    if (liveaction.status == action_constants.LIVEACTION_STATUS_PAUSING or
246
            liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED):
247
        execution = ActionExecution.get(liveaction__id=str(liveaction.id))
248
        return (liveaction, execution)
249
250
    if liveaction.status != action_constants.LIVEACTION_STATUS_RUNNING:
251
        raise runner_exc.UnexpectedActionExecutionStatusError(
252
            'Unable to pause liveaction "%s" because it is not in a running state.'
253
            % liveaction.id
254
        )
255
256
    update_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
257
258
    execution = ActionExecution.get(liveaction__id=str(liveaction.id))
259
260
    return (liveaction, execution)
261
262
263
def request_resume(liveaction, requester):
264
    """
265
    Request resume for a paused action execution.
266
267
    :return: (liveaction, execution)
268
    :rtype: tuple
269
    """
270
    # Validate that the runner type of the action supports pause.
271
    action_db = action_utils.get_action_by_ref(liveaction.action)
272
273
    if not action_db:
274
        raise ValueError(
275
            'Unable to resume liveaction "%s" because the action "%s" '
276
            'is not found.' % (liveaction.id, liveaction.action)
277
        )
278
279
    if action_db.runner_type['name'] not in action_constants.WORKFLOW_RUNNER_TYPES:
280
        raise runner_exc.InvalidActionRunnerOperationError(
281
            'Unable to resume liveaction "%s" because it is not supported by the '
282
            '"%s" runner.' % (liveaction.id, action_db.runner_type['name'])
283
        )
284
285
    if liveaction.status == action_constants.LIVEACTION_STATUS_RUNNING:
286
        execution = ActionExecution.get(liveaction__id=str(liveaction.id))
287
        return (liveaction, execution)
288
289
    if liveaction.status != action_constants.LIVEACTION_STATUS_PAUSED:
290
        raise runner_exc.UnexpectedActionExecutionStatusError(
291
            'Unable to resume liveaction "%s" because it is not in a paused state.'
292
            % liveaction.id
293
        )
294
295
    update_status(liveaction, action_constants.LIVEACTION_STATUS_RESUMING)
296
297
    execution = ActionExecution.get(liveaction__id=str(liveaction.id))
298
299
    return (liveaction, execution)
300
301
302
def _cleanup_liveaction(liveaction):
303
    try:
304
        LiveAction.delete(liveaction)
305
    except:
306
        LOG.exception('Failed cleaning up LiveAction: %s.', liveaction)
307
        pass
0 ignored issues
show
Unused Code introduced by
Unnecessary pass statement
Loading history...
308
309
310
def _is_notify_empty(notify_db):
311
    """
312
    notify_db is considered to be empty if notify_db is None and neither
313
    of on_complete, on_success and on_failure have values.
314
    """
315
    if not notify_db:
316
        return True
317
    return not (notify_db.on_complete or notify_db.on_success or notify_db.on_failure)
318