Issues (8)

st2actions/st2actions/container/base.py (1 issue)

Labels
Severity
1
# Copyright 2019 Extreme Networks, Inc.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
from __future__ import absolute_import
16
17
import sys
18
import traceback
19
20
import six
21
from oslo_config import cfg
22
23
from st2common import log as logging
24
from st2common.util import date as date_utils
25
from st2common.constants import action as action_constants
26
from st2common.content import utils as content_utils
27
from st2common.exceptions import actionrunner
28
from st2common.exceptions.param import ParamException
29
from st2common.models.system.action import ResolvedActionParameters
30
from st2common.persistence.execution import ActionExecution
31
from st2common.services import access, executions, queries
32
from st2common.util.action_db import (get_action_by_ref, get_runnertype_by_name)
33
from st2common.util.action_db import (update_liveaction_status, get_liveaction_by_id)
34
from st2common.util import param as param_utils
35
from st2common.util.config_loader import ContentPackConfigLoader
36
from st2common.metrics.base import CounterWithTimer
37
from st2common.util import jsonify
38
39
from st2common.runners.base import get_runner
40
from st2common.runners.base import AsyncActionRunner, PollingAsyncActionRunner
41
42
LOG = logging.getLogger(__name__)
43
44
__all__ = [
45
    'RunnerContainer',
46
    'get_runner_container'
47
]
48
49
50
class RunnerContainer(object):
51
52
    def dispatch(self, liveaction_db):
53
        action_db = get_action_by_ref(liveaction_db.action)
54
        if not action_db:
55
            raise Exception('Action %s not found in DB.' % (liveaction_db.action))
56
57
        liveaction_db.context['pack'] = action_db.pack
58
59
        runner_type_db = get_runnertype_by_name(action_db.runner_type['name'])
60
61
        extra = {'liveaction_db': liveaction_db, 'runner_type_db': runner_type_db}
62
        LOG.info('Dispatching Action to a runner', extra=extra)
63
64
        # Get runner instance.
65
        runner = self._get_runner(runner_type_db, action_db, liveaction_db)
66
67
        LOG.debug('Runner instance for RunnerType "%s" is: %s', runner_type_db.name, runner)
68
69
        # Process the request.
70
        funcs = {
71
            action_constants.LIVEACTION_STATUS_REQUESTED: self._do_run,
72
            action_constants.LIVEACTION_STATUS_SCHEDULED: self._do_run,
73
            action_constants.LIVEACTION_STATUS_RUNNING: self._do_run,
74
            action_constants.LIVEACTION_STATUS_CANCELING: self._do_cancel,
75
            action_constants.LIVEACTION_STATUS_PAUSING: self._do_pause,
76
            action_constants.LIVEACTION_STATUS_RESUMING: self._do_resume
77
        }
78
79
        if liveaction_db.status not in funcs:
80
            raise actionrunner.ActionRunnerDispatchError(
81
                'Action runner is unable to dispatch the liveaction because it is '
82
                'in an unsupported status of "%s".' % liveaction_db.status
83
            )
84
85
        with CounterWithTimer(key="action.executions"):
86
            liveaction_db = funcs[liveaction_db.status](runner)
87
88
        return liveaction_db.result
89
90
    def _do_run(self, runner):
91
        # Create a temporary auth token which will be available
92
        # for the duration of the action execution.
93
        runner.auth_token = self._create_auth_token(
94
            context=runner.context,
95
            action_db=runner.action,
96
            liveaction_db=runner.liveaction)
97
98
        try:
99
            # Finalized parameters are resolved and then rendered. This process could
100
            # fail. Handle the exception and report the error correctly.
101
            try:
102
                runner_params, action_params = param_utils.render_final_params(
103
                    runner.runner_type.runner_parameters,
104
                    runner.action.parameters,
105
                    runner.liveaction.parameters,
106
                    runner.liveaction.context)
107
108
                runner.runner_parameters = runner_params
109
            except ParamException as e:
110
                raise actionrunner.ActionRunnerException(six.text_type(e))
111
112
            LOG.debug('Performing pre-run for runner: %s', runner.runner_id)
113
            runner.pre_run()
114
115
            # Mask secret parameters in the log context
116
            resolved_action_params = ResolvedActionParameters(
117
                action_db=runner.action,
118
                runner_type_db=runner.runner_type,
119
                runner_parameters=runner_params,
120
                action_parameters=action_params)
121
122
            extra = {'runner': runner, 'parameters': resolved_action_params}
123
            LOG.debug('Performing run for runner: %s' % (runner.runner_id), extra=extra)
0 ignored issues
show
The variable extra was used before it was assigned.
Loading history...
124
125
            with CounterWithTimer(key='action.executions'):
126
                with CounterWithTimer(key='action.%s.executions' % (runner.action.ref)):
127
                    (status, result, context) = runner.run(action_params)
128
                    result = jsonify.try_loads(result)
129
130
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
131
132
            if (isinstance(runner, PollingAsyncActionRunner) and
133
                    runner.is_polling_enabled() and not action_completed):
134
                queries.setup_query(runner.liveaction.id, runner.runner_type, context)
135
        except:
136
            LOG.exception('Failed to run action.')
137
            _, ex, tb = sys.exc_info()
138
            # mark execution as failed.
139
            status = action_constants.LIVEACTION_STATUS_FAILED
140
            # include the error message and traceback to try and provide some hints.
141
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
142
            context = None
143
        finally:
144
            # Log action completion
145
            extra = {'result': result, 'status': status}
146
            LOG.debug('Action "%s" completed.' % (runner.action.name), extra=extra)
147
148
            # Update the final status of liveaction and corresponding action execution.
149
            runner.liveaction = self._update_status(runner.liveaction.id, status, result, context)
150
151
            # Always clean-up the auth_token
152
            # This method should be called in the finally block to ensure post_run is not impacted.
153
            self._clean_up_auth_token(runner=runner, status=status)
154
155
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
156
        runner.post_run(status=status, result=result)
157
158
        LOG.debug('Runner do_run result', extra={'result': runner.liveaction.result})
159
        LOG.audit('Liveaction completed', extra={'liveaction_db': runner.liveaction})
160
161
        return runner.liveaction
162
163
    def _do_cancel(self, runner):
164
        try:
165
            extra = {'runner': runner}
166
            LOG.debug('Performing cancel for runner: %s', (runner.runner_id), extra=extra)
167
            (status, result, context) = runner.cancel()
168
169
            # Update the final status of liveaction and corresponding action execution.
170
            # The status is updated here because we want to keep the workflow running
171
            # as is if the cancel operation failed.
172
            runner.liveaction = self._update_status(runner.liveaction.id, status, result, context)
173
        except:
174
            _, ex, tb = sys.exc_info()
175
            # include the error message and traceback to try and provide some hints.
176
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
177
            LOG.exception('Failed to cancel action %s.' % (runner.liveaction.id), extra=result)
178
        finally:
179
            # Always clean-up the auth_token
180
            # This method should be called in the finally block to ensure post_run is not impacted.
181
            self._clean_up_auth_token(runner=runner, status=runner.liveaction.status)
182
183
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
184
        result = {'error': 'Execution canceled by user.'}
185
        runner.post_run(status=runner.liveaction.status, result=result)
186
187
        return runner.liveaction
188
189
    def _do_pause(self, runner):
190
        try:
191
            extra = {'runner': runner}
192
            LOG.debug('Performing pause for runner: %s', (runner.runner_id), extra=extra)
193
            (status, result, context) = runner.pause()
194
        except:
195
            _, ex, tb = sys.exc_info()
196
            # include the error message and traceback to try and provide some hints.
197
            status = action_constants.LIVEACTION_STATUS_FAILED
198
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
199
            context = runner.liveaction.context
200
            LOG.exception('Failed to pause action %s.' % (runner.liveaction.id), extra=result)
201
        finally:
202
            # Update the final status of liveaction and corresponding action execution.
203
            runner.liveaction = self._update_status(runner.liveaction.id, status, result, context)
204
205
            # Always clean-up the auth_token
206
            self._clean_up_auth_token(runner=runner, status=runner.liveaction.status)
207
208
        return runner.liveaction
209
210
    def _do_resume(self, runner):
211
        try:
212
            extra = {'runner': runner}
213
            LOG.debug('Performing resume for runner: %s', (runner.runner_id), extra=extra)
214
            (status, result, context) = runner.resume()
215
            result = jsonify.try_loads(result)
216
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
217
218
            if (isinstance(runner, PollingAsyncActionRunner) and
219
                    runner.is_polling_enabled() and not action_completed):
220
                queries.setup_query(runner.liveaction.id, runner.runner_type, context)
221
        except:
222
            _, ex, tb = sys.exc_info()
223
            # include the error message and traceback to try and provide some hints.
224
            status = action_constants.LIVEACTION_STATUS_FAILED
225
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
226
            context = runner.liveaction.context
227
            LOG.exception('Failed to resume action %s.' % (runner.liveaction.id), extra=result)
228
        finally:
229
            # Update the final status of liveaction and corresponding action execution.
230
            runner.liveaction = self._update_status(runner.liveaction.id, status, result, context)
231
232
            # Always clean-up the auth_token
233
            # This method should be called in the finally block to ensure post_run is not impacted.
234
            self._clean_up_auth_token(runner=runner, status=runner.liveaction.status)
235
236
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
237
        runner.post_run(status=status, result=result)
238
239
        LOG.debug('Runner do_run result', extra={'result': runner.liveaction.result})
240
        LOG.audit('Liveaction completed', extra={'liveaction_db': runner.liveaction})
241
242
        return runner.liveaction
243
244
    def _clean_up_auth_token(self, runner, status):
245
        """
246
        Clean up the temporary auth token for the current action.
247
248
        Note: This method should never throw since it's called inside finally block which assumes
249
        it doesn't throw.
250
        """
251
        # Deletion of the runner generated auth token is delayed until the token expires.
252
        # Async actions such as Mistral workflows uses the auth token to launch other
253
        # actions in the workflow. If the auth token is deleted here, then the actions
254
        # in the workflow will fail with unauthorized exception.
255
        is_async_runner = isinstance(runner, AsyncActionRunner)
256
        action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
257
258
        if not is_async_runner or (is_async_runner and action_completed):
259
            try:
260
                self._delete_auth_token(runner.auth_token)
261
            except:
262
                LOG.exception('Unable to clean-up auth_token.')
263
264
            return True
265
266
        return False
267
268
    def _update_live_action_db(self, liveaction_id, status, result, context):
269
        """
270
        Update LiveActionDB object for the provided liveaction id.
271
        """
272
        liveaction_db = get_liveaction_by_id(liveaction_id)
273
274
        state_changed = (
275
            liveaction_db.status != status and
276
            liveaction_db.status not in action_constants.LIVEACTION_COMPLETED_STATES
277
        )
278
279
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
280
            end_timestamp = date_utils.get_datetime_utc_now()
281
        else:
282
            end_timestamp = None
283
284
        liveaction_db = update_liveaction_status(
285
            status=status if state_changed else liveaction_db.status,
286
            result=result,
287
            context=context,
288
            end_timestamp=end_timestamp,
289
            liveaction_db=liveaction_db
290
        )
291
292
        return (liveaction_db, state_changed)
293
294
    def _update_status(self, liveaction_id, status, result, context):
295
        try:
296
            LOG.debug('Setting status: %s for liveaction: %s', status, liveaction_id)
297
            liveaction_db, state_changed = self._update_live_action_db(
298
                liveaction_id, status, result, context)
299
        except Exception as e:
300
            LOG.exception(
301
                'Cannot update liveaction '
302
                '(id: %s, status: %s, result: %s).' % (
303
                    liveaction_id, status, result)
304
            )
305
            raise e
306
307
        try:
308
            executions.update_execution(liveaction_db, publish=state_changed)
309
            extra = {'liveaction_db': liveaction_db}
310
            LOG.debug('Updated liveaction after run', extra=extra)
311
        except Exception as e:
312
            LOG.exception(
313
                'Cannot update action execution for liveaction '
314
                '(id: %s, status: %s, result: %s).' % (
315
                    liveaction_id, status, result)
316
            )
317
            raise e
318
319
        return liveaction_db
320
321
    def _get_entry_point_abs_path(self, pack, entry_point):
322
        return content_utils.get_entry_point_abs_path(pack=pack, entry_point=entry_point)
323
324
    def _get_action_libs_abs_path(self, pack, entry_point):
325
        return content_utils.get_action_libs_abs_path(pack=pack, entry_point=entry_point)
326
327
    def _get_rerun_reference(self, context):
328
        execution_id = context.get('re-run', {}).get('ref')
329
        return ActionExecution.get_by_id(execution_id) if execution_id else None
330
331
    def _get_runner(self, runner_type_db, action_db, liveaction_db):
332
        resolved_entry_point = self._get_entry_point_abs_path(action_db.pack, action_db.entry_point)
333
        context = getattr(liveaction_db, 'context', dict())
334
        user = context.get('user', cfg.CONF.system_user.user)
335
        config = None
336
337
        # Note: Right now configs are only supported by the Python runner actions
338
        if (runner_type_db.name == 'python-script' or
339
                runner_type_db.runner_module == 'python_runner'):
340
            LOG.debug('Loading config from pack for python runner.')
341
            config_loader = ContentPackConfigLoader(pack_name=action_db.pack, user=user)
342
            config = config_loader.get_config()
343
344
        runner = get_runner(
345
            name=runner_type_db.name,
346
            config=config)
347
348
        # TODO: Pass those arguments to the constructor instead of late
349
        # assignment, late assignment is awful
350
        runner.runner_type = runner_type_db
351
        runner.action = action_db
352
        runner.action_name = action_db.name
353
        runner.liveaction = liveaction_db
354
        runner.liveaction_id = str(liveaction_db.id)
355
        runner.execution = ActionExecution.get(liveaction__id=runner.liveaction_id)
356
        runner.execution_id = str(runner.execution.id)
357
        runner.entry_point = resolved_entry_point
358
        runner.context = context
359
        runner.callback = getattr(liveaction_db, 'callback', dict())
360
        runner.libs_dir_path = self._get_action_libs_abs_path(action_db.pack,
361
                                                              action_db.entry_point)
362
363
        # For re-run, get the ActionExecutionDB in which the re-run is based on.
364
        rerun_ref_id = runner.context.get('re-run', {}).get('ref')
365
        runner.rerun_ex_ref = ActionExecution.get(id=rerun_ref_id) if rerun_ref_id else None
366
367
        return runner
368
369
    def _create_auth_token(self, context, action_db, liveaction_db):
370
        if not context:
371
            return None
372
373
        user = context.get('user', None)
374
        if not user:
375
            return None
376
377
        metadata = {
378
            'service': 'actions_container',
379
            'action_name': action_db.name,
380
            'live_action_id': str(liveaction_db.id)
381
382
        }
383
384
        ttl = cfg.CONF.auth.service_token_ttl
385
        token_db = access.create_token(username=user, ttl=ttl, metadata=metadata, service=True)
386
        return token_db
387
388
    def _delete_auth_token(self, auth_token):
389
        if auth_token:
390
            access.delete_token(auth_token.token)
391
392
393
def get_runner_container():
394
    return RunnerContainer()
395