Test Setup Failed
Pull Request — master (#4154)
by W
03:36
created

RunnerContainer._update_live_action_db()   B

Complexity

Conditions 3

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
c 0
b 0
f 0
dl 0
loc 25
rs 8.8571
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import json
18
import sys
19
import traceback
20
21
from oslo_config import cfg
22
23
from st2common import log as logging
24
from st2common.util import date as date_utils
25
from st2common.constants import action as action_constants
26
from st2common.content import utils as content_utils
27
from st2common.exceptions import actionrunner
28
from st2common.exceptions.param import ParamException
29
from st2common.models.system.action import ResolvedActionParameters
30
from st2common.persistence.execution import ActionExecution
31
from st2common.services import access, executions, queries
32
from st2common.util.action_db import (get_action_by_ref, get_runnertype_by_name)
33
from st2common.util.action_db import (update_liveaction_status, get_liveaction_by_id)
34
from st2common.util import param as param_utils
35
from st2common.util.config_loader import ContentPackConfigLoader
36
from st2common.metrics.base import CounterWithTimer, format_metrics_key
37
38
from st2common.runners.base import get_runner
39
from st2common.runners.base import AsyncActionRunner, PollingAsyncActionRunner
40
41
LOG = logging.getLogger(__name__)
42
43
__all__ = [
44
    'RunnerContainer',
45
    'get_runner_container'
46
]
47
48
49
class RunnerContainer(object):
50
51
    def dispatch(self, liveaction_db):
52
        action_db = get_action_by_ref(liveaction_db.action)
53
        if not action_db:
54
            raise Exception('Action %s not found in DB.' % (liveaction_db.action))
55
56
        liveaction_db.context['pack'] = action_db.pack
57
58
        runnertype_db = get_runnertype_by_name(action_db.runner_type['name'])
59
60
        extra = {'liveaction_db': liveaction_db, 'runnertype_db': runnertype_db}
61
        LOG.info('Dispatching Action to a runner', extra=extra)
62
63
        # Get runner instance.
64
        runner = self._get_runner(runnertype_db, action_db, liveaction_db)
65
66
        LOG.debug('Runner instance for RunnerType "%s" is: %s', runnertype_db.name, runner)
67
68
        # Process the request.
69
        funcs = {
70
            action_constants.LIVEACTION_STATUS_REQUESTED: self._do_run,
71
            action_constants.LIVEACTION_STATUS_SCHEDULED: self._do_run,
72
            action_constants.LIVEACTION_STATUS_RUNNING: self._do_run,
73
            action_constants.LIVEACTION_STATUS_CANCELING: self._do_cancel,
74
            action_constants.LIVEACTION_STATUS_PAUSING: self._do_pause,
75
            action_constants.LIVEACTION_STATUS_RESUMING: self._do_resume
76
        }
77
78
        if liveaction_db.status not in funcs:
79
            raise actionrunner.ActionRunnerDispatchError(
80
                'Action runner is unable to dispatch the liveaction because it is '
81
                'in an unsupported status of "%s".' % liveaction_db.status
82
            )
83
84
        with CounterWithTimer(key="st2.action.executions"):
85
            liveaction_db = funcs[liveaction_db.status](
86
                runner=runner,
87
                runnertype_db=runnertype_db,
88
                action_db=action_db,
89
                liveaction_db=liveaction_db
90
            )
91
92
        return liveaction_db.result
93
94
    def _do_run(self, runner, runnertype_db, action_db, liveaction_db):
95
        # Create a temporary auth token which will be available
96
        # for the duration of the action execution.
97
        runner.auth_token = self._create_auth_token(context=runner.context, action_db=action_db,
98
                                                    liveaction_db=liveaction_db)
99
100
        try:
101
            # Finalized parameters are resolved and then rendered. This process could
102
            # fail. Handle the exception and report the error correctly.
103
            try:
104
                runner_params, action_params = param_utils.render_final_params(
105
                    runnertype_db.runner_parameters, action_db.parameters, liveaction_db.parameters,
106
                    liveaction_db.context)
107
                runner.runner_parameters = runner_params
108
            except ParamException as e:
109
                raise actionrunner.ActionRunnerException(str(e))
110
111
            LOG.debug('Performing pre-run for runner: %s', runner.runner_id)
112
            runner.pre_run()
113
114
            # Mask secret parameters in the log context
115
            resolved_action_params = ResolvedActionParameters(action_db=action_db,
116
                                                              runner_type_db=runnertype_db,
117
                                                              runner_parameters=runner_params,
118
                                                              action_parameters=action_params)
119
            extra = {'runner': runner, 'parameters': resolved_action_params}
120
            LOG.debug('Performing run for runner: %s' % (runner.runner_id), extra=extra)
0 ignored issues
show
Bug introduced by
The variable extra was used before it was assigned.
Loading history...
121
            with CounterWithTimer(key=format_metrics_key(
122
                                  action_db=action_db, key='action')):
123
                (status, result, context) = runner.run(action_params)
124
125
            try:
126
                result = json.loads(result)
127
            except:
128
                pass
129
130
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
131
            if (isinstance(runner, PollingAsyncActionRunner) and
132
                    runner.is_polling_enabled() and not action_completed):
133
                queries.setup_query(liveaction_db.id, runnertype_db, context)
134
        except:
135
            LOG.exception('Failed to run action.')
136
            _, ex, tb = sys.exc_info()
137
            # mark execution as failed.
138
            status = action_constants.LIVEACTION_STATUS_FAILED
139
            # include the error message and traceback to try and provide some hints.
140
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
141
            context = None
142
        finally:
143
            # Log action completion
144
            extra = {'result': result, 'status': status}
145
            LOG.debug('Action "%s" completed.' % (action_db.name), extra=extra)
146
147
            # Update the final status of liveaction and corresponding action execution.
148
            liveaction_db = self._update_status(liveaction_db.id, status, result, context)
149
150
            # Always clean-up the auth_token
151
            # This method should be called in the finally block to ensure post_run is not impacted.
152
            self._clean_up_auth_token(runner=runner, status=status)
153
154
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
155
        runner.post_run(status=status, result=result)
156
157
        LOG.debug('Runner do_run result', extra={'result': liveaction_db.result})
158
        LOG.audit('Liveaction completed', extra={'liveaction_db': liveaction_db})
159
160
        return liveaction_db
161
162
    def _do_cancel(self, runner, runnertype_db, action_db, liveaction_db):
163
        try:
164
            extra = {'runner': runner}
165
            LOG.debug('Performing cancel for runner: %s', (runner.runner_id), extra=extra)
166
            (status, result, context) = runner.cancel()
167
168
            # Update the final status of liveaction and corresponding action execution.
169
            # The status is updated here because we want to keep the workflow running
170
            # as is if the cancel operation failed.
171
            liveaction_db = self._update_status(liveaction_db.id, status, result, context)
172
        except:
173
            _, ex, tb = sys.exc_info()
174
            # include the error message and traceback to try and provide some hints.
175
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
176
            LOG.exception('Failed to cancel action %s.' % (liveaction_db.id), extra=result)
177
        finally:
178
            # Always clean-up the auth_token
179
            # This method should be called in the finally block to ensure post_run is not impacted.
180
            self._clean_up_auth_token(runner=runner, status=liveaction_db.status)
181
182
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
183
        result = {'error': 'Execution canceled by user.'}
184
        runner.post_run(status=liveaction_db.status, result=result)
185
186
        return liveaction_db
187
188
    def _do_pause(self, runner, runnertype_db, action_db, liveaction_db):
189
        try:
190
            extra = {'runner': runner}
191
            LOG.debug('Performing pause for runner: %s', (runner.runner_id), extra=extra)
192
            (status, result, context) = runner.pause()
193
        except:
194
            _, ex, tb = sys.exc_info()
195
            # include the error message and traceback to try and provide some hints.
196
            status = action_constants.LIVEACTION_STATUS_FAILED
197
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
198
            context = liveaction_db.context
199
            LOG.exception('Failed to pause action %s.' % (liveaction_db.id), extra=result)
200
        finally:
201
            # Update the final status of liveaction and corresponding action execution.
202
            liveaction_db = self._update_status(liveaction_db.id, status, result, context)
203
204
            # Always clean-up the auth_token
205
            self._clean_up_auth_token(runner=runner, status=liveaction_db.status)
206
207
        return liveaction_db
208
209
    def _do_resume(self, runner, runnertype_db, action_db, liveaction_db):
210
        try:
211
            extra = {'runner': runner}
212
            LOG.debug('Performing resume for runner: %s', (runner.runner_id), extra=extra)
213
214
            (status, result, context) = runner.resume()
215
216
            try:
217
                result = json.loads(result)
218
            except:
219
                pass
220
221
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
222
223
            if (isinstance(runner, PollingAsyncActionRunner) and
224
                    runner.is_polling_enabled() and not action_completed):
225
                queries.setup_query(liveaction_db.id, runnertype_db, context)
226
        except:
227
            _, ex, tb = sys.exc_info()
228
            # include the error message and traceback to try and provide some hints.
229
            status = action_constants.LIVEACTION_STATUS_FAILED
230
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
231
            context = liveaction_db.context
232
            LOG.exception('Failed to resume action %s.' % (liveaction_db.id), extra=result)
233
        finally:
234
            # Update the final status of liveaction and corresponding action execution.
235
            liveaction_db = self._update_status(liveaction_db.id, status, result, context)
236
237
            # Always clean-up the auth_token
238
            # This method should be called in the finally block to ensure post_run is not impacted.
239
            self._clean_up_auth_token(runner=runner, status=liveaction_db.status)
240
241
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
242
        runner.post_run(status=status, result=result)
243
244
        LOG.debug('Runner do_run result', extra={'result': liveaction_db.result})
245
        LOG.audit('Liveaction completed', extra={'liveaction_db': liveaction_db})
246
247
        return liveaction_db
248
249
    def _clean_up_auth_token(self, runner, status):
250
        """
251
        Clean up the temporary auth token for the current action.
252
253
        Note: This method should never throw since it's called inside finally block which assumes
254
        it doesn't throw.
255
        """
256
        # Deletion of the runner generated auth token is delayed until the token expires.
257
        # Async actions such as Mistral workflows uses the auth token to launch other
258
        # actions in the workflow. If the auth token is deleted here, then the actions
259
        # in the workflow will fail with unauthorized exception.
260
        is_async_runner = isinstance(runner, AsyncActionRunner)
261
        action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
262
263
        if not is_async_runner or (is_async_runner and action_completed):
264
            try:
265
                self._delete_auth_token(runner.auth_token)
266
            except:
267
                LOG.exception('Unable to clean-up auth_token.')
268
269
            return True
270
271
        return False
272
273
    def _update_live_action_db(self, liveaction_id, status, result, context):
274
        """
275
        Update LiveActionDB object for the provided liveaction id.
276
        """
277
        liveaction_db = get_liveaction_by_id(liveaction_id)
278
279
        state_changed = (
280
            liveaction_db.status != status and
281
            liveaction_db.status not in action_constants.LIVEACTION_COMPLETED_STATES
282
        )
283
284
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
285
            end_timestamp = date_utils.get_datetime_utc_now()
286
        else:
287
            end_timestamp = None
288
289
        liveaction_db = update_liveaction_status(
290
            status=status if state_changed else liveaction_db.status,
291
            result=result,
292
            context=context,
293
            end_timestamp=end_timestamp,
294
            liveaction_db=liveaction_db
295
        )
296
297
        return (liveaction_db, state_changed)
298
299
    def _update_status(self, liveaction_id, status, result, context):
300
        try:
301
            LOG.debug('Setting status: %s for liveaction: %s', status, liveaction_id)
302
            liveaction_db, state_changed = self._update_live_action_db(
303
                liveaction_id, status, result, context)
304
        except Exception as e:
305
            LOG.exception(
306
                'Cannot update liveaction '
307
                '(id: %s, status: %s, result: %s).' % (
308
                    liveaction_id, status, result)
309
            )
310
            raise e
311
312
        try:
313
            executions.update_execution(liveaction_db, publish=state_changed)
314
            extra = {'liveaction_db': liveaction_db}
315
            LOG.debug('Updated liveaction after run', extra=extra)
316
        except Exception as e:
317
            LOG.exception(
318
                'Cannot update action execution for liveaction '
319
                '(id: %s, status: %s, result: %s).' % (
320
                    liveaction_id, status, result)
321
            )
322
            raise e
323
324
        return liveaction_db
325
326
    def _get_entry_point_abs_path(self, pack, entry_point):
327
        return content_utils.get_entry_point_abs_path(pack=pack, entry_point=entry_point)
328
329
    def _get_action_libs_abs_path(self, pack, entry_point):
330
        return content_utils.get_action_libs_abs_path(pack=pack, entry_point=entry_point)
331
332
    def _get_rerun_reference(self, context):
333
        execution_id = context.get('re-run', {}).get('ref')
334
        return ActionExecution.get_by_id(execution_id) if execution_id else None
335
336
    def _get_runner(self, runnertype_db, action_db, liveaction_db):
337
        resolved_entry_point = self._get_entry_point_abs_path(action_db.pack,
338
                                                              action_db.entry_point)
339
        context = getattr(liveaction_db, 'context', dict())
340
        user = context.get('user', cfg.CONF.system_user.user)
341
342
        # Note: Right now configs are only supported by the Python runner actions
343
        if runnertype_db.runner_module == 'python_runner':
344
            LOG.debug('Loading config for pack')
345
346
            config_loader = ContentPackConfigLoader(pack_name=action_db.pack, user=user)
347
            config = config_loader.get_config()
348
        else:
349
            config = None
350
351
        runner = get_runner(package_name=runnertype_db.runner_package,
352
                            module_name=runnertype_db.runner_module,
353
                            config=config)
354
355
        # TODO: Pass those arguments to the constructor instead of late
356
        # assignment, late assignment is awful
357
        runner.runner_type_db = runnertype_db
358
        runner.action = action_db
359
        runner.action_name = action_db.name
360
        runner.liveaction = liveaction_db
361
        runner.liveaction_id = str(liveaction_db.id)
362
        runner.execution = ActionExecution.get(liveaction__id=runner.liveaction_id)
363
        runner.execution_id = str(runner.execution.id)
364
        runner.entry_point = resolved_entry_point
365
        runner.context = context
366
        runner.callback = getattr(liveaction_db, 'callback', dict())
367
        runner.libs_dir_path = self._get_action_libs_abs_path(action_db.pack,
368
                                                              action_db.entry_point)
369
370
        # For re-run, get the ActionExecutionDB in which the re-run is based on.
371
        rerun_ref_id = runner.context.get('re-run', {}).get('ref')
372
        runner.rerun_ex_ref = ActionExecution.get(id=rerun_ref_id) if rerun_ref_id else None
373
374
        return runner
375
376
    def _create_auth_token(self, context, action_db, liveaction_db):
377
        if not context:
378
            return None
379
380
        user = context.get('user', None)
381
        if not user:
382
            return None
383
384
        metadata = {
385
            'service': 'actions_container',
386
            'action_name': action_db.name,
387
            'live_action_id': str(liveaction_db.id)
388
389
        }
390
391
        ttl = cfg.CONF.auth.service_token_ttl
392
        token_db = access.create_token(username=user, ttl=ttl, metadata=metadata, service=True)
393
        return token_db
394
395
    def _delete_auth_token(self, auth_token):
396
        if auth_token:
397
            access.delete_token(auth_token.token)
398
399
400
def get_runner_container():
401
    return RunnerContainer()
402