Passed
Push — master ( 4ea570...c730ee )
by
unknown
04:53
created

is_action_paused_or_pausing()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 3
rs 10
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import six
17
18
from st2common import log as logging
19
from st2common.constants import action as action_constants
20
from st2common.exceptions import actionrunner as runner_exc
21
from st2common.exceptions import db as db_exc
22
from st2common.exceptions import trace as trace_exc
23
from st2common.persistence.liveaction import LiveAction
24
from st2common.persistence.execution import ActionExecution
25
from st2common.services import executions
26
from st2common.services import trace as trace_service
0 ignored issues
show
Bug introduced by
The name trace does not seem to exist in module st2common.services.
Loading history...
27
from st2common.util import date as date_utils
28
from st2common.util import action_db as action_utils
29
from st2common.util import schema as util_schema
30
31
32
__all__ = [
33
    'request',
34
    'create_request',
35
    'publish_request',
36
    'is_action_canceled_or_canceling'
37
]
38
39
LOG = logging.getLogger(__name__)
40
41
42
def _get_immutable_params(parameters):
43
    if not parameters:
44
        return []
45
    return [k for k, v in six.iteritems(parameters) if v.get('immutable', False)]
46
47
48
def create_request(liveaction):
49
    """
50
    Create an action execution.
51
52
    :return: (liveaction, execution)
53
    :rtype: tuple
54
    """
55
    # Use the user context from the parent action execution. Subtasks in a workflow
56
    # action can be invoked by a system user and so we want to use the user context
57
    # from the original workflow action.
58
    parent_context = executions.get_parent_context(liveaction)
59
    if parent_context:
60
        parent_user = parent_context.get('user', None)
61
        if parent_user:
62
            liveaction.context['user'] = parent_user
63
64
    # Validate action.
65
    action_db = action_utils.get_action_by_ref(liveaction.action)
66
    if not action_db:
67
        raise ValueError('Action "%s" cannot be found.' % liveaction.action)
68
    if not action_db.enabled:
69
        raise ValueError('Unable to execute. Action "%s" is disabled.' % liveaction.action)
70
71
    runnertype_db = action_utils.get_runnertype_by_name(action_db.runner_type['name'])
72
73
    if not hasattr(liveaction, 'parameters'):
74
        liveaction.parameters = dict()
75
76
    # Validate action parameters.
77
    schema = util_schema.get_schema_for_action_parameters(action_db)
78
    validator = util_schema.get_validator()
79
    util_schema.validate(liveaction.parameters, schema, validator, use_default=True,
80
                         allow_default_none=True)
81
82
    # validate that no immutable params are being overriden. Although possible to
83
    # ignore the override it is safer to inform the user to avoid surprises.
84
    immutables = _get_immutable_params(action_db.parameters)
85
    immutables.extend(_get_immutable_params(runnertype_db.runner_parameters))
86
    overridden_immutables = [p for p in six.iterkeys(liveaction.parameters) if p in immutables]
87
    if len(overridden_immutables) > 0:
88
        raise ValueError('Override of immutable parameter(s) %s is unsupported.'
89
                         % str(overridden_immutables))
90
91
    # Set notification settings for action.
92
    # XXX: There are cases when we don't want notifications to be sent for a particular
93
    # execution. So we should look at liveaction.parameters['notify']
94
    # and not set liveaction.notify.
95
    if not _is_notify_empty(action_db.notify):
96
        liveaction.notify = action_db.notify
97
98
    # Write to database and send to message queue.
99
    liveaction.status = action_constants.LIVEACTION_STATUS_REQUESTED
100
    liveaction.start_timestamp = date_utils.get_datetime_utc_now()
101
102
    # Set the "action_is_workflow" attribute
103
    liveaction.action_is_workflow = action_db.is_workflow()
104
105
    # Publish creation after both liveaction and actionexecution are created.
106
    liveaction = LiveAction.add_or_update(liveaction, publish=False)
107
108
    # Get trace_db if it exists. This could throw. If it throws, we have to cleanup
109
    # liveaction object so we don't see things in requested mode.
110
    trace_db = None
111
    try:
112
        _, trace_db = trace_service.get_trace_db_by_live_action(liveaction)
113
    except db_exc.StackStormDBObjectNotFoundError as e:
114
        _cleanup_liveaction(liveaction)
115
        raise trace_exc.TraceNotFoundException(str(e))
116
117
    execution = executions.create_execution_object(liveaction, publish=False)
118
119
    if trace_db:
120
        trace_service.add_or_update_given_trace_db(
121
            trace_db=trace_db,
122
            action_executions=[
123
                trace_service.get_trace_component_for_action_execution(execution, liveaction)
124
            ])
125
126
    return liveaction, execution
127
128
129
def publish_request(liveaction, execution):
130
    """
131
    Publish an action execution.
132
133
    :return: (liveaction, execution)
134
    :rtype: tuple
135
    """
136
    # Assume that this is a creation.
137
    LiveAction.publish_create(liveaction)
138
    LiveAction.publish_status(liveaction)
139
    ActionExecution.publish_create(execution)
140
141
    extra = {'liveaction_db': liveaction, 'execution_db': execution}
142
    LOG.audit('Action execution requested. LiveAction.id=%s, ActionExecution.id=%s' %
143
              (liveaction.id, execution.id), extra=extra)
144
145
    return liveaction, execution
146
147
148
def request(liveaction):
149
    liveaction, execution = create_request(liveaction)
150
    liveaction, execution = publish_request(liveaction, execution)
151
152
    return liveaction, execution
153
154
155
def update_status(liveaction, new_status, result=None, publish=True):
156
    if liveaction.status == new_status:
157
        return liveaction
158
159
    old_status = liveaction.status
160
161
    liveaction = action_utils.update_liveaction_status(
162
        status=new_status, result=result, liveaction_id=liveaction.id, publish=False)
163
164
    action_execution = executions.update_execution(liveaction)
165
166
    msg = ('The status of action execution is changed from %s to %s. '
167
           '<LiveAction.id=%s, ActionExecution.id=%s>' % (old_status,
168
           new_status, liveaction.id, action_execution.id))
169
170
    extra = {
171
        'action_execution_db': action_execution,
172
        'liveaction_db': liveaction
173
    }
174
175
    LOG.audit(msg, extra=extra)
176
    LOG.info(msg)
177
178
    if publish:
179
        LiveAction.publish_status(liveaction)
180
181
    return liveaction
182
183
184
def is_action_canceled_or_canceling(liveaction_id):
185
    liveaction_db = action_utils.get_liveaction_by_id(liveaction_id)
186
    return liveaction_db.status in [action_constants.LIVEACTION_STATUS_CANCELED,
187
                                    action_constants.LIVEACTION_STATUS_CANCELING]
188
189
190
def is_action_paused_or_pausing(liveaction_id):
191
    liveaction_db = action_utils.get_liveaction_by_id(liveaction_id)
192
    return liveaction_db.status in [action_constants.LIVEACTION_STATUS_PAUSED,
193
                                    action_constants.LIVEACTION_STATUS_PAUSING]
194
195
196
def request_cancellation(liveaction, requester):
197
    """
198
    Request cancellation of an action execution.
199
200
    :return: (liveaction, execution)
201
    :rtype: tuple
202
    """
203
    if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELING:
204
        return liveaction
205
206
    if liveaction.status not in action_constants.LIVEACTION_CANCELABLE_STATES:
207
        raise Exception(
208
            'Unable to cancel liveaction "%s" because it is already in a '
209
            'completed state.' % liveaction.id
210
        )
211
212
    result = {
213
        'message': 'Action canceled by user.',
214
        'user': requester
215
    }
216
217
    # There is real work only when liveaction is still running.
218
    status = (action_constants.LIVEACTION_STATUS_CANCELING
219
              if liveaction.status == action_constants.LIVEACTION_STATUS_RUNNING
220
              else action_constants.LIVEACTION_STATUS_CANCELED)
221
222
    liveaction = update_status(liveaction, status, result=result)
223
224
    execution = ActionExecution.get(liveaction__id=str(liveaction.id))
225
226
    return (liveaction, execution)
227
228
229
def request_pause(liveaction, requester):
230
    """
231
    Request pause for a running action execution.
232
233
    :return: (liveaction, execution)
234
    :rtype: tuple
235
    """
236
    # Validate that the runner type of the action supports pause.
237
    action_db = action_utils.get_action_by_ref(liveaction.action)
238
239
    if not action_db:
240
        raise ValueError(
241
            'Unable to pause liveaction "%s" because the action "%s" '
242
            'is not found.' % (liveaction.id, liveaction.action)
243
        )
244
245
    if action_db.runner_type['name'] not in action_constants.WORKFLOW_RUNNER_TYPES:
246
        raise runner_exc.InvalidActionRunnerOperationError(
247
            'Unable to pause liveaction "%s" because it is not supported by the '
248
            '"%s" runner.' % (liveaction.id, action_db.runner_type['name'])
249
        )
250
251
    if (liveaction.status == action_constants.LIVEACTION_STATUS_PAUSING or
252
            liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED):
253
        execution = ActionExecution.get(liveaction__id=str(liveaction.id))
254
        return (liveaction, execution)
255
256
    if liveaction.status != action_constants.LIVEACTION_STATUS_RUNNING:
257
        raise runner_exc.UnexpectedActionExecutionStatusError(
258
            'Unable to pause liveaction "%s" because it is not in a running state.'
259
            % liveaction.id
260
        )
261
262
    liveaction = update_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
263
264
    execution = ActionExecution.get(liveaction__id=str(liveaction.id))
265
266
    return (liveaction, execution)
267
268
269
def request_resume(liveaction, requester):
270
    """
271
    Request resume for a paused action execution.
272
273
    :return: (liveaction, execution)
274
    :rtype: tuple
275
    """
276
    # Validate that the runner type of the action supports pause.
277
    action_db = action_utils.get_action_by_ref(liveaction.action)
278
279
    if not action_db:
280
        raise ValueError(
281
            'Unable to resume liveaction "%s" because the action "%s" '
282
            'is not found.' % (liveaction.id, liveaction.action)
283
        )
284
285
    if action_db.runner_type['name'] not in action_constants.WORKFLOW_RUNNER_TYPES:
286
        raise runner_exc.InvalidActionRunnerOperationError(
287
            'Unable to resume liveaction "%s" because it is not supported by the '
288
            '"%s" runner.' % (liveaction.id, action_db.runner_type['name'])
289
        )
290
291
    if liveaction.status == action_constants.LIVEACTION_STATUS_RUNNING:
292
        execution = ActionExecution.get(liveaction__id=str(liveaction.id))
293
        return (liveaction, execution)
294
295
    if liveaction.status != action_constants.LIVEACTION_STATUS_PAUSED:
296
        raise runner_exc.UnexpectedActionExecutionStatusError(
297
            'Unable to resume liveaction "%s" because it is not in a paused state.'
298
            % liveaction.id
299
        )
300
301
    liveaction = update_status(liveaction, action_constants.LIVEACTION_STATUS_RESUMING)
302
303
    execution = ActionExecution.get(liveaction__id=str(liveaction.id))
304
305
    return (liveaction, execution)
306
307
308
def _cleanup_liveaction(liveaction):
309
    try:
310
        LiveAction.delete(liveaction)
311
    except:
312
        LOG.exception('Failed cleaning up LiveAction: %s.', liveaction)
313
        pass
0 ignored issues
show
Unused Code introduced by
Unnecessary pass statement
Loading history...
314
315
316
def _is_notify_empty(notify_db):
317
    """
318
    notify_db is considered to be empty if notify_db is None and neither
319
    of on_complete, on_success and on_failure have values.
320
    """
321
    if not notify_db:
322
        return True
323
    return not (notify_db.on_complete or notify_db.on_success or notify_db.on_failure)
324