Test Failed
Push — master ( f06717...a2792c )
by Tomaz
01:48 queued 10s
created

st2actions/st2actions/container/base.py (1 issue)

Labels
Severity
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import sys
19
import traceback
20
21
import six
22
from oslo_config import cfg
23
24
from st2common import log as logging
25
from st2common.util import date as date_utils
26
from st2common.constants import action as action_constants
27
from st2common.content import utils as content_utils
28
from st2common.exceptions import actionrunner
29
from st2common.exceptions.param import ParamException
30
from st2common.models.system.action import ResolvedActionParameters
31
from st2common.persistence.execution import ActionExecution
32
from st2common.services import access, executions, queries
33
from st2common.util.action_db import (get_action_by_ref, get_runnertype_by_name)
34
from st2common.util.action_db import (update_liveaction_status, get_liveaction_by_id)
35
from st2common.util import param as param_utils
36
from st2common.util.config_loader import ContentPackConfigLoader
37
from st2common.metrics.base import CounterWithTimer
38
from st2common.util import jsonify
39
40
from st2common.runners.base import get_runner
41
from st2common.runners.base import AsyncActionRunner, PollingAsyncActionRunner
42
43
LOG = logging.getLogger(__name__)
44
45
__all__ = [
46
    'RunnerContainer',
47
    'get_runner_container'
48
]
49
50
51
class RunnerContainer(object):
52
53
    def dispatch(self, liveaction_db):
54
        action_db = get_action_by_ref(liveaction_db.action)
55
        if not action_db:
56
            raise Exception('Action %s not found in DB.' % (liveaction_db.action))
57
58
        liveaction_db.context['pack'] = action_db.pack
59
60
        runner_type_db = get_runnertype_by_name(action_db.runner_type['name'])
61
62
        extra = {'liveaction_db': liveaction_db, 'runner_type_db': runner_type_db}
63
        LOG.info('Dispatching Action to a runner', extra=extra)
64
65
        # Get runner instance.
66
        runner = self._get_runner(runner_type_db, action_db, liveaction_db)
67
68
        LOG.debug('Runner instance for RunnerType "%s" is: %s', runner_type_db.name, runner)
69
70
        # Process the request.
71
        funcs = {
72
            action_constants.LIVEACTION_STATUS_REQUESTED: self._do_run,
73
            action_constants.LIVEACTION_STATUS_SCHEDULED: self._do_run,
74
            action_constants.LIVEACTION_STATUS_RUNNING: self._do_run,
75
            action_constants.LIVEACTION_STATUS_CANCELING: self._do_cancel,
76
            action_constants.LIVEACTION_STATUS_PAUSING: self._do_pause,
77
            action_constants.LIVEACTION_STATUS_RESUMING: self._do_resume
78
        }
79
80
        if liveaction_db.status not in funcs:
81
            raise actionrunner.ActionRunnerDispatchError(
82
                'Action runner is unable to dispatch the liveaction because it is '
83
                'in an unsupported status of "%s".' % liveaction_db.status
84
            )
85
86
        with CounterWithTimer(key="action.executions"):
87
            liveaction_db = funcs[liveaction_db.status](runner)
88
89
        return liveaction_db.result
90
91
    def _do_run(self, runner):
92
        # Create a temporary auth token which will be available
93
        # for the duration of the action execution.
94
        runner.auth_token = self._create_auth_token(
95
            context=runner.context,
96
            action_db=runner.action,
97
            liveaction_db=runner.liveaction)
98
99
        try:
100
            # Finalized parameters are resolved and then rendered. This process could
101
            # fail. Handle the exception and report the error correctly.
102
            try:
103
                runner_params, action_params = param_utils.render_final_params(
104
                    runner.runner_type.runner_parameters,
105
                    runner.action.parameters,
106
                    runner.liveaction.parameters,
107
                    runner.liveaction.context)
108
109
                runner.runner_parameters = runner_params
110
            except ParamException as e:
111
                raise actionrunner.ActionRunnerException(six.text_type(e))
112
113
            LOG.debug('Performing pre-run for runner: %s', runner.runner_id)
114
            runner.pre_run()
115
116
            # Mask secret parameters in the log context
117
            resolved_action_params = ResolvedActionParameters(
118
                action_db=runner.action,
119
                runner_type_db=runner.runner_type,
120
                runner_parameters=runner_params,
121
                action_parameters=action_params)
122
123
            extra = {'runner': runner, 'parameters': resolved_action_params}
124
            LOG.debug('Performing run for runner: %s' % (runner.runner_id), extra=extra)
0 ignored issues
show
The variable extra was used before it was assigned.
Loading history...
125
126
            with CounterWithTimer(key='action.executions'):
127
                with CounterWithTimer(key='action.%s.executions' % (runner.action.ref)):
128
                    (status, result, context) = runner.run(action_params)
129
                    result = jsonify.try_loads(result)
130
131
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
132
133
            if (isinstance(runner, PollingAsyncActionRunner) and
134
                    runner.is_polling_enabled() and not action_completed):
135
                queries.setup_query(runner.liveaction.id, runner.runner_type, context)
136
        except:
137
            LOG.exception('Failed to run action.')
138
            _, ex, tb = sys.exc_info()
139
            # mark execution as failed.
140
            status = action_constants.LIVEACTION_STATUS_FAILED
141
            # include the error message and traceback to try and provide some hints.
142
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
143
            context = None
144
        finally:
145
            # Log action completion
146
            extra = {'result': result, 'status': status}
147
            LOG.debug('Action "%s" completed.' % (runner.action.name), extra=extra)
148
149
            # Update the final status of liveaction and corresponding action execution.
150
            runner.liveaction = self._update_status(runner.liveaction.id, status, result, context)
151
152
            # Always clean-up the auth_token
153
            # This method should be called in the finally block to ensure post_run is not impacted.
154
            self._clean_up_auth_token(runner=runner, status=status)
155
156
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
157
        runner.post_run(status=status, result=result)
158
159
        LOG.debug('Runner do_run result', extra={'result': runner.liveaction.result})
160
        LOG.audit('Liveaction completed', extra={'liveaction_db': runner.liveaction})
161
162
        return runner.liveaction
163
164
    def _do_cancel(self, runner):
165
        try:
166
            extra = {'runner': runner}
167
            LOG.debug('Performing cancel for runner: %s', (runner.runner_id), extra=extra)
168
            (status, result, context) = runner.cancel()
169
170
            # Update the final status of liveaction and corresponding action execution.
171
            # The status is updated here because we want to keep the workflow running
172
            # as is if the cancel operation failed.
173
            runner.liveaction = self._update_status(runner.liveaction.id, status, result, context)
174
        except:
175
            _, ex, tb = sys.exc_info()
176
            # include the error message and traceback to try and provide some hints.
177
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
178
            LOG.exception('Failed to cancel action %s.' % (runner.liveaction.id), extra=result)
179
        finally:
180
            # Always clean-up the auth_token
181
            # This method should be called in the finally block to ensure post_run is not impacted.
182
            self._clean_up_auth_token(runner=runner, status=runner.liveaction.status)
183
184
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
185
        result = {'error': 'Execution canceled by user.'}
186
        runner.post_run(status=runner.liveaction.status, result=result)
187
188
        return runner.liveaction
189
190
    def _do_pause(self, runner):
191
        try:
192
            extra = {'runner': runner}
193
            LOG.debug('Performing pause for runner: %s', (runner.runner_id), extra=extra)
194
            (status, result, context) = runner.pause()
195
        except:
196
            _, ex, tb = sys.exc_info()
197
            # include the error message and traceback to try and provide some hints.
198
            status = action_constants.LIVEACTION_STATUS_FAILED
199
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
200
            context = runner.liveaction.context
201
            LOG.exception('Failed to pause action %s.' % (runner.liveaction.id), extra=result)
202
        finally:
203
            # Update the final status of liveaction and corresponding action execution.
204
            runner.liveaction = self._update_status(runner.liveaction.id, status, result, context)
205
206
            # Always clean-up the auth_token
207
            self._clean_up_auth_token(runner=runner, status=runner.liveaction.status)
208
209
        return runner.liveaction
210
211
    def _do_resume(self, runner):
212
        try:
213
            extra = {'runner': runner}
214
            LOG.debug('Performing resume for runner: %s', (runner.runner_id), extra=extra)
215
            (status, result, context) = runner.resume()
216
            result = jsonify.try_loads(result)
217
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
218
219
            if (isinstance(runner, PollingAsyncActionRunner) and
220
                    runner.is_polling_enabled() and not action_completed):
221
                queries.setup_query(runner.liveaction.id, runner.runner_type, context)
222
        except:
223
            _, ex, tb = sys.exc_info()
224
            # include the error message and traceback to try and provide some hints.
225
            status = action_constants.LIVEACTION_STATUS_FAILED
226
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
227
            context = runner.liveaction.context
228
            LOG.exception('Failed to resume action %s.' % (runner.liveaction.id), extra=result)
229
        finally:
230
            # Update the final status of liveaction and corresponding action execution.
231
            runner.liveaction = self._update_status(runner.liveaction.id, status, result, context)
232
233
            # Always clean-up the auth_token
234
            # This method should be called in the finally block to ensure post_run is not impacted.
235
            self._clean_up_auth_token(runner=runner, status=runner.liveaction.status)
236
237
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
238
        runner.post_run(status=status, result=result)
239
240
        LOG.debug('Runner do_run result', extra={'result': runner.liveaction.result})
241
        LOG.audit('Liveaction completed', extra={'liveaction_db': runner.liveaction})
242
243
        return runner.liveaction
244
245
    def _clean_up_auth_token(self, runner, status):
246
        """
247
        Clean up the temporary auth token for the current action.
248
249
        Note: This method should never throw since it's called inside finally block which assumes
250
        it doesn't throw.
251
        """
252
        # Deletion of the runner generated auth token is delayed until the token expires.
253
        # Async actions such as Mistral workflows uses the auth token to launch other
254
        # actions in the workflow. If the auth token is deleted here, then the actions
255
        # in the workflow will fail with unauthorized exception.
256
        is_async_runner = isinstance(runner, AsyncActionRunner)
257
        action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
258
259
        if not is_async_runner or (is_async_runner and action_completed):
260
            try:
261
                self._delete_auth_token(runner.auth_token)
262
            except:
263
                LOG.exception('Unable to clean-up auth_token.')
264
265
            return True
266
267
        return False
268
269
    def _update_live_action_db(self, liveaction_id, status, result, context):
270
        """
271
        Update LiveActionDB object for the provided liveaction id.
272
        """
273
        liveaction_db = get_liveaction_by_id(liveaction_id)
274
275
        state_changed = (
276
            liveaction_db.status != status and
277
            liveaction_db.status not in action_constants.LIVEACTION_COMPLETED_STATES
278
        )
279
280
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
281
            end_timestamp = date_utils.get_datetime_utc_now()
282
        else:
283
            end_timestamp = None
284
285
        liveaction_db = update_liveaction_status(
286
            status=status if state_changed else liveaction_db.status,
287
            result=result,
288
            context=context,
289
            end_timestamp=end_timestamp,
290
            liveaction_db=liveaction_db
291
        )
292
293
        return (liveaction_db, state_changed)
294
295
    def _update_status(self, liveaction_id, status, result, context):
296
        try:
297
            LOG.debug('Setting status: %s for liveaction: %s', status, liveaction_id)
298
            liveaction_db, state_changed = self._update_live_action_db(
299
                liveaction_id, status, result, context)
300
        except Exception as e:
301
            LOG.exception(
302
                'Cannot update liveaction '
303
                '(id: %s, status: %s, result: %s).' % (
304
                    liveaction_id, status, result)
305
            )
306
            raise e
307
308
        try:
309
            executions.update_execution(liveaction_db, publish=state_changed)
310
            extra = {'liveaction_db': liveaction_db}
311
            LOG.debug('Updated liveaction after run', extra=extra)
312
        except Exception as e:
313
            LOG.exception(
314
                'Cannot update action execution for liveaction '
315
                '(id: %s, status: %s, result: %s).' % (
316
                    liveaction_id, status, result)
317
            )
318
            raise e
319
320
        return liveaction_db
321
322
    def _get_entry_point_abs_path(self, pack, entry_point):
323
        return content_utils.get_entry_point_abs_path(pack=pack, entry_point=entry_point)
324
325
    def _get_action_libs_abs_path(self, pack, entry_point):
326
        return content_utils.get_action_libs_abs_path(pack=pack, entry_point=entry_point)
327
328
    def _get_rerun_reference(self, context):
329
        execution_id = context.get('re-run', {}).get('ref')
330
        return ActionExecution.get_by_id(execution_id) if execution_id else None
331
332
    def _get_runner(self, runner_type_db, action_db, liveaction_db):
333
        resolved_entry_point = self._get_entry_point_abs_path(action_db.pack, action_db.entry_point)
334
        context = getattr(liveaction_db, 'context', dict())
335
        user = context.get('user', cfg.CONF.system_user.user)
336
        config = None
337
338
        # Note: Right now configs are only supported by the Python runner actions
339
        if (runner_type_db.name == 'python-script' or
340
                runner_type_db.runner_module == 'python_runner'):
341
            LOG.debug('Loading config from pack for python runner.')
342
            config_loader = ContentPackConfigLoader(pack_name=action_db.pack, user=user)
343
            config = config_loader.get_config()
344
345
        runner = get_runner(
346
            name=runner_type_db.name,
347
            config=config)
348
349
        # TODO: Pass those arguments to the constructor instead of late
350
        # assignment, late assignment is awful
351
        runner.runner_type = runner_type_db
352
        runner.action = action_db
353
        runner.action_name = action_db.name
354
        runner.liveaction = liveaction_db
355
        runner.liveaction_id = str(liveaction_db.id)
356
        runner.execution = ActionExecution.get(liveaction__id=runner.liveaction_id)
357
        runner.execution_id = str(runner.execution.id)
358
        runner.entry_point = resolved_entry_point
359
        runner.context = context
360
        runner.callback = getattr(liveaction_db, 'callback', dict())
361
        runner.libs_dir_path = self._get_action_libs_abs_path(action_db.pack,
362
                                                              action_db.entry_point)
363
364
        # For re-run, get the ActionExecutionDB in which the re-run is based on.
365
        rerun_ref_id = runner.context.get('re-run', {}).get('ref')
366
        runner.rerun_ex_ref = ActionExecution.get(id=rerun_ref_id) if rerun_ref_id else None
367
368
        return runner
369
370
    def _create_auth_token(self, context, action_db, liveaction_db):
371
        if not context:
372
            return None
373
374
        user = context.get('user', None)
375
        if not user:
376
            return None
377
378
        metadata = {
379
            'service': 'actions_container',
380
            'action_name': action_db.name,
381
            'live_action_id': str(liveaction_db.id)
382
383
        }
384
385
        ttl = cfg.CONF.auth.service_token_ttl
386
        token_db = access.create_token(username=user, ttl=ttl, metadata=metadata, service=True)
387
        return token_db
388
389
    def _delete_auth_token(self, auth_token):
390
        if auth_token:
391
            access.delete_token(auth_token.token)
392
393
394
def get_runner_container():
395
    return RunnerContainer()
396