Test Setup Failed
Pull Request — master (#4154)
by W
03:24
created

RunnerContainer._get_rerun_reference()   A

Complexity

Conditions 2

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import sys
19
import traceback
20
21
from oslo_config import cfg
22
23
from st2common import log as logging
24
from st2common.util import date as date_utils
25
from st2common.constants import action as action_constants
26
from st2common.content import utils as content_utils
27
from st2common.exceptions import actionrunner
28
from st2common.exceptions.param import ParamException
29
from st2common.models.system.action import ResolvedActionParameters
30
from st2common.persistence.execution import ActionExecution
31
from st2common.services import access, executions, queries
32
from st2common.util.action_db import (get_action_by_ref, get_runnertype_by_name)
33
from st2common.util.action_db import (update_liveaction_status, get_liveaction_by_id)
34
from st2common.util import param as param_utils
35
from st2common.util.config_loader import ContentPackConfigLoader
36
from st2common.metrics.base import CounterWithTimer, format_metrics_key
37
from st2common.util import jsonify
38
39
from st2common.runners.base import get_runner
40
from st2common.runners.base import AsyncActionRunner, PollingAsyncActionRunner
41
42
LOG = logging.getLogger(__name__)
43
44
__all__ = [
45
    'RunnerContainer',
46
    'get_runner_container'
47
]
48
49
50
class RunnerContainer(object):
51
52
    def dispatch(self, liveaction_db):
53
        action_db = get_action_by_ref(liveaction_db.action)
54
        if not action_db:
55
            raise Exception('Action %s not found in DB.' % (liveaction_db.action))
56
57
        liveaction_db.context['pack'] = action_db.pack
58
59
        runner_type_db = get_runnertype_by_name(action_db.runner_type['name'])
60
61
        extra = {'liveaction_db': liveaction_db, 'runner_type_db': runner_type_db}
62
        LOG.info('Dispatching Action to a runner', extra=extra)
63
64
        # Get runner instance.
65
        runner = self._get_runner(runner_type_db, action_db, liveaction_db)
66
67
        LOG.debug('Runner instance for RunnerType "%s" is: %s', runner_type_db.name, runner)
68
69
        # Process the request.
70
        funcs = {
71
            action_constants.LIVEACTION_STATUS_REQUESTED: self._do_run,
72
            action_constants.LIVEACTION_STATUS_SCHEDULED: self._do_run,
73
            action_constants.LIVEACTION_STATUS_RUNNING: self._do_run,
74
            action_constants.LIVEACTION_STATUS_CANCELING: self._do_cancel,
75
            action_constants.LIVEACTION_STATUS_PAUSING: self._do_pause,
76
            action_constants.LIVEACTION_STATUS_RESUMING: self._do_resume
77
        }
78
79
        if liveaction_db.status not in funcs:
80
            raise actionrunner.ActionRunnerDispatchError(
81
                'Action runner is unable to dispatch the liveaction because it is '
82
                'in an unsupported status of "%s".' % liveaction_db.status
83
            )
84
85
        with CounterWithTimer(key="st2.action.executions"):
86
            liveaction_db = funcs[liveaction_db.status](runner)
87
88
        return liveaction_db.result
89
90
    def _do_run(self, runner):
91
        # Create a temporary auth token which will be available
92
        # for the duration of the action execution.
93
        runner.auth_token = self._create_auth_token(
94
            context=runner.context,
95
            action_db=runner.action,
96
            liveaction_db=runner.liveaction)
97
98
        try:
99
            # Finalized parameters are resolved and then rendered. This process could
100
            # fail. Handle the exception and report the error correctly.
101
            try:
102
                runner_params, action_params = param_utils.render_final_params(
103
                    runner.runner_type.runner_parameters,
104
                    runner.action.parameters,
105
                    runner.liveaction.parameters,
106
                    runner.liveaction.context)
107
108
                runner.runner_parameters = runner_params
109
            except ParamException as e:
110
                raise actionrunner.ActionRunnerException(str(e))
111
112
            LOG.debug('Performing pre-run for runner: %s', runner.runner_id)
113
            runner.pre_run()
114
115
            # Mask secret parameters in the log context
116
            resolved_action_params = ResolvedActionParameters(
117
                action_db=runner.action,
118
                runner_type_db=runner.runner_type,
119
                runner_parameters=runner_params,
120
                action_parameters=action_params)
121
122
            extra = {'runner': runner, 'parameters': resolved_action_params}
123
            LOG.debug('Performing run for runner: %s' % (runner.runner_id), extra=extra)
0 ignored issues
show
Bug introduced by
The variable extra was used before it was assigned.
Loading history...
124
125
            with CounterWithTimer(key=format_metrics_key(action_db=runner.action, key='action')):
126
                (status, result, context) = runner.run(action_params)
127
                result = jsonify.try_loads(result)
128
129
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
130
131
            if (isinstance(runner, PollingAsyncActionRunner) and
132
                    runner.is_polling_enabled() and not action_completed):
133
                queries.setup_query(runner.liveaction.id, runner.runner_type, context)
134
        except:
135
            LOG.exception('Failed to run action.')
136
            _, ex, tb = sys.exc_info()
137
            # mark execution as failed.
138
            status = action_constants.LIVEACTION_STATUS_FAILED
139
            # include the error message and traceback to try and provide some hints.
140
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
141
            context = None
142
        finally:
143
            # Log action completion
144
            extra = {'result': result, 'status': status}
145
            LOG.debug('Action "%s" completed.' % (runner.action.name), extra=extra)
146
147
            # Update the final status of liveaction and corresponding action execution.
148
            runner.liveaction = self._update_status(runner.liveaction.id, status, result, context)
149
150
            # Always clean-up the auth_token
151
            # This method should be called in the finally block to ensure post_run is not impacted.
152
            self._clean_up_auth_token(runner=runner, status=status)
153
154
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
155
        runner.post_run(status=status, result=result)
156
157
        LOG.debug('Runner do_run result', extra={'result': runner.liveaction.result})
158
        LOG.audit('Liveaction completed', extra={'liveaction_db': runner.liveaction})
159
160
        return runner.liveaction
161
162
    def _do_cancel(self, runner):
163
        try:
164
            extra = {'runner': runner}
165
            LOG.debug('Performing cancel for runner: %s', (runner.runner_id), extra=extra)
166
            (status, result, context) = runner.cancel()
167
168
            # Update the final status of liveaction and corresponding action execution.
169
            # The status is updated here because we want to keep the workflow running
170
            # as is if the cancel operation failed.
171
            runner.liveaction = self._update_status(runner.liveaction.id, status, result, context)
172
        except:
173
            _, ex, tb = sys.exc_info()
174
            # include the error message and traceback to try and provide some hints.
175
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
176
            LOG.exception('Failed to cancel action %s.' % (runner.liveaction.id), extra=result)
177
        finally:
178
            # Always clean-up the auth_token
179
            # This method should be called in the finally block to ensure post_run is not impacted.
180
            self._clean_up_auth_token(runner=runner, status=runner.liveaction.status)
181
182
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
183
        result = {'error': 'Execution canceled by user.'}
184
        runner.post_run(status=runner.liveaction.status, result=result)
185
186
        return runner.liveaction
187
188
    def _do_pause(self, runner):
189
        try:
190
            extra = {'runner': runner}
191
            LOG.debug('Performing pause for runner: %s', (runner.runner_id), extra=extra)
192
            (status, result, context) = runner.pause()
193
        except:
194
            _, ex, tb = sys.exc_info()
195
            # include the error message and traceback to try and provide some hints.
196
            status = action_constants.LIVEACTION_STATUS_FAILED
197
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
198
            context = runner.liveaction.context
199
            LOG.exception('Failed to pause action %s.' % (runner.liveaction.id), extra=result)
200
        finally:
201
            # Update the final status of liveaction and corresponding action execution.
202
            runner.liveaction = self._update_status(runner.liveaction.id, status, result, context)
203
204
            # Always clean-up the auth_token
205
            self._clean_up_auth_token(runner=runner, status=runner.liveaction.status)
206
207
        return runner.liveaction
208
209
    def _do_resume(self, runner):
210
        try:
211
            extra = {'runner': runner}
212
            LOG.debug('Performing resume for runner: %s', (runner.runner_id), extra=extra)
213
            (status, result, context) = runner.resume()
214
            result = jsonify.try_loads(result)
215
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
216
217
            if (isinstance(runner, PollingAsyncActionRunner) and
218
                    runner.is_polling_enabled() and not action_completed):
219
                queries.setup_query(runner.liveaction.id, runner.runner_type, context)
220
        except:
221
            _, ex, tb = sys.exc_info()
222
            # include the error message and traceback to try and provide some hints.
223
            status = action_constants.LIVEACTION_STATUS_FAILED
224
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
225
            context = runner.liveaction.context
226
            LOG.exception('Failed to resume action %s.' % (runner.liveaction.id), extra=result)
227
        finally:
228
            # Update the final status of liveaction and corresponding action execution.
229
            runner.liveaction = self._update_status(runner.liveaction.id, status, result, context)
230
231
            # Always clean-up the auth_token
232
            # This method should be called in the finally block to ensure post_run is not impacted.
233
            self._clean_up_auth_token(runner=runner, status=runner.liveaction.status)
234
235
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
236
        runner.post_run(status=status, result=result)
237
238
        LOG.debug('Runner do_run result', extra={'result': runner.liveaction.result})
239
        LOG.audit('Liveaction completed', extra={'liveaction_db': runner.liveaction})
240
241
        return runner.liveaction
242
243
    def _clean_up_auth_token(self, runner, status):
244
        """
245
        Clean up the temporary auth token for the current action.
246
247
        Note: This method should never throw since it's called inside finally block which assumes
248
        it doesn't throw.
249
        """
250
        # Deletion of the runner generated auth token is delayed until the token expires.
251
        # Async actions such as Mistral workflows uses the auth token to launch other
252
        # actions in the workflow. If the auth token is deleted here, then the actions
253
        # in the workflow will fail with unauthorized exception.
254
        is_async_runner = isinstance(runner, AsyncActionRunner)
255
        action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
256
257
        if not is_async_runner or (is_async_runner and action_completed):
258
            try:
259
                self._delete_auth_token(runner.auth_token)
260
            except:
261
                LOG.exception('Unable to clean-up auth_token.')
262
263
            return True
264
265
        return False
266
267
    def _update_live_action_db(self, liveaction_id, status, result, context):
268
        """
269
        Update LiveActionDB object for the provided liveaction id.
270
        """
271
        liveaction_db = get_liveaction_by_id(liveaction_id)
272
273
        state_changed = (
274
            liveaction_db.status != status and
275
            liveaction_db.status not in action_constants.LIVEACTION_COMPLETED_STATES
276
        )
277
278
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
279
            end_timestamp = date_utils.get_datetime_utc_now()
280
        else:
281
            end_timestamp = None
282
283
        liveaction_db = update_liveaction_status(
284
            status=status if state_changed else liveaction_db.status,
285
            result=result,
286
            context=context,
287
            end_timestamp=end_timestamp,
288
            liveaction_db=liveaction_db
289
        )
290
291
        return (liveaction_db, state_changed)
292
293
    def _update_status(self, liveaction_id, status, result, context):
294
        try:
295
            LOG.debug('Setting status: %s for liveaction: %s', status, liveaction_id)
296
            liveaction_db, state_changed = self._update_live_action_db(
297
                liveaction_id, status, result, context)
298
        except Exception as e:
299
            LOG.exception(
300
                'Cannot update liveaction '
301
                '(id: %s, status: %s, result: %s).' % (
302
                    liveaction_id, status, result)
303
            )
304
            raise e
305
306
        try:
307
            executions.update_execution(liveaction_db, publish=state_changed)
308
            extra = {'liveaction_db': liveaction_db}
309
            LOG.debug('Updated liveaction after run', extra=extra)
310
        except Exception as e:
311
            LOG.exception(
312
                'Cannot update action execution for liveaction '
313
                '(id: %s, status: %s, result: %s).' % (
314
                    liveaction_id, status, result)
315
            )
316
            raise e
317
318
        return liveaction_db
319
320
    def _get_entry_point_abs_path(self, pack, entry_point):
321
        return content_utils.get_entry_point_abs_path(pack=pack, entry_point=entry_point)
322
323
    def _get_action_libs_abs_path(self, pack, entry_point):
324
        return content_utils.get_action_libs_abs_path(pack=pack, entry_point=entry_point)
325
326
    def _get_rerun_reference(self, context):
327
        execution_id = context.get('re-run', {}).get('ref')
328
        return ActionExecution.get_by_id(execution_id) if execution_id else None
329
330
    def _get_runner(self, runner_type_db, action_db, liveaction_db):
331
        resolved_entry_point = self._get_entry_point_abs_path(action_db.pack, action_db.entry_point)
332
        context = getattr(liveaction_db, 'context', dict())
333
        user = context.get('user', cfg.CONF.system_user.user)
334
        config = None
335
336
        # Note: Right now configs are only supported by the Python runner actions
337
        if runner_type_db.runner_module == 'python_runner':
338
            LOG.debug('Loading config from pack for python runner.')
339
            config_loader = ContentPackConfigLoader(pack_name=action_db.pack, user=user)
340
            config = config_loader.get_config()
341
342
        runner = get_runner(
343
            package_name=runner_type_db.runner_package,
344
            module_name=runner_type_db.runner_module,
345
            config=config)
346
347
        # TODO: Pass those arguments to the constructor instead of late
348
        # assignment, late assignment is awful
349
        runner.runner_type = runner_type_db
350
        runner.action = action_db
351
        runner.action_name = action_db.name
352
        runner.liveaction = liveaction_db
353
        runner.liveaction_id = str(liveaction_db.id)
354
        runner.execution = ActionExecution.get(liveaction__id=runner.liveaction_id)
355
        runner.execution_id = str(runner.execution.id)
356
        runner.entry_point = resolved_entry_point
357
        runner.context = context
358
        runner.callback = getattr(liveaction_db, 'callback', dict())
359
        runner.libs_dir_path = self._get_action_libs_abs_path(action_db.pack,
360
                                                              action_db.entry_point)
361
362
        # For re-run, get the ActionExecutionDB in which the re-run is based on.
363
        rerun_ref_id = runner.context.get('re-run', {}).get('ref')
364
        runner.rerun_ex_ref = ActionExecution.get(id=rerun_ref_id) if rerun_ref_id else None
365
366
        return runner
367
368
    def _create_auth_token(self, context, action_db, liveaction_db):
369
        if not context:
370
            return None
371
372
        user = context.get('user', None)
373
        if not user:
374
            return None
375
376
        metadata = {
377
            'service': 'actions_container',
378
            'action_name': action_db.name,
379
            'live_action_id': str(liveaction_db.id)
380
381
        }
382
383
        ttl = cfg.CONF.auth.service_token_ttl
384
        token_db = access.create_token(username=user, ttl=ttl, metadata=metadata, service=True)
385
        return token_db
386
387
    def _delete_auth_token(self, auth_token):
388
        if auth_token:
389
            access.delete_token(auth_token.token)
390
391
392
def get_runner_container():
393
    return RunnerContainer()
394