Passed
Pull Request — master (#4000)
by W
05:55
created

RunnerContainer._setup_async_query()   A

Complexity

Conditions 3

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
dl 0
loc 10
rs 9.4285
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import json
18
import sys
19
import traceback
20
21
from oslo_config import cfg
22
23
from st2common import log as logging
24
from st2common.util import date as date_utils
25
from st2common.constants import action as action_constants
26
from st2common.content import utils as content_utils
27
from st2common.exceptions import actionrunner
28
from st2common.exceptions.param import ParamException
29
from st2common.models.system.action import ResolvedActionParameters
30
from st2common.persistence.execution import ActionExecution
31
from st2common.services import access, executions, queries
32
from st2common.util.action_db import (get_action_by_ref, get_runnertype_by_name)
33
from st2common.util.action_db import (update_liveaction_status, get_liveaction_by_id)
34
from st2common.util import param as param_utils
35
from st2common.util.config_loader import ContentPackConfigLoader
36
37
from st2common.runners.base import get_runner
38
from st2common.runners.base import AsyncActionRunner, PollingAsyncActionRunner
39
40
LOG = logging.getLogger(__name__)
41
42
__all__ = [
43
    'RunnerContainer',
44
    'get_runner_container'
45
]
46
47
48
class RunnerContainer(object):
49
50
    def dispatch(self, liveaction_db):
51
        action_db = get_action_by_ref(liveaction_db.action)
52
        if not action_db:
53
            raise Exception('Action %s not found in DB.' % (liveaction_db.action))
54
55
        liveaction_db.context['pack'] = action_db.pack
56
57
        runnertype_db = get_runnertype_by_name(action_db.runner_type['name'])
58
59
        extra = {'liveaction_db': liveaction_db, 'runnertype_db': runnertype_db}
60
        LOG.info('Dispatching Action to a runner', extra=extra)
61
62
        # Get runner instance.
63
        runner = self._get_runner(runnertype_db, action_db, liveaction_db)
64
65
        LOG.debug('Runner instance for RunnerType "%s" is: %s', runnertype_db.name, runner)
66
67
        # Process the request.
68
        funcs = {
69
            action_constants.LIVEACTION_STATUS_REQUESTED: self._do_run,
70
            action_constants.LIVEACTION_STATUS_SCHEDULED: self._do_run,
71
            action_constants.LIVEACTION_STATUS_RUNNING: self._do_run,
72
            action_constants.LIVEACTION_STATUS_CANCELING: self._do_cancel,
73
            action_constants.LIVEACTION_STATUS_PAUSING: self._do_pause,
74
            action_constants.LIVEACTION_STATUS_RESUMING: self._do_resume
75
        }
76
77
        if liveaction_db.status not in funcs:
78
            raise actionrunner.ActionRunnerDispatchError(
79
                'Action runner is unable to dispatch the liveaction because it is '
80
                'in an unsupported status of "%s".' % liveaction_db.status
81
            )
82
83
        liveaction_db = funcs[liveaction_db.status](
84
            runner=runner,
85
            runnertype_db=runnertype_db,
86
            action_db=action_db,
87
            liveaction_db=liveaction_db
88
        )
89
90
        return liveaction_db.result
91
92
    def _do_run(self, runner, runnertype_db, action_db, liveaction_db):
93
        # Create a temporary auth token which will be available
94
        # for the duration of the action execution.
95
        runner.auth_token = self._create_auth_token(context=runner.context, action_db=action_db,
96
                                                    liveaction_db=liveaction_db)
97
98
        try:
99
            # Finalized parameters are resolved and then rendered. This process could
100
            # fail. Handle the exception and report the error correctly.
101
            try:
102
                runner_params, action_params = param_utils.render_final_params(
103
                    runnertype_db.runner_parameters, action_db.parameters, liveaction_db.parameters,
104
                    liveaction_db.context)
105
                runner.runner_parameters = runner_params
106
            except ParamException as e:
107
                raise actionrunner.ActionRunnerException(str(e))
108
109
            LOG.debug('Performing pre-run for runner: %s', runner.runner_id)
110
            runner.pre_run()
111
112
            # Mask secret parameters in the log context
113
            resolved_action_params = ResolvedActionParameters(action_db=action_db,
114
                                                              runner_type_db=runnertype_db,
115
                                                              runner_parameters=runner_params,
116
                                                              action_parameters=action_params)
117
            extra = {'runner': runner, 'parameters': resolved_action_params}
118
            LOG.debug('Performing run for runner: %s' % (runner.runner_id), extra=extra)
0 ignored issues
show
Bug introduced by
The variable extra was used before it was assigned.
Loading history...
119
            (status, result, context) = runner.run(action_params)
120
121
            try:
122
                result = json.loads(result)
123
            except:
124
                pass
125
126
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
127
            if isinstance(runner, PollingAsyncActionRunner) and not action_completed:
128
                queries.setup_query(liveaction_db.id, runnertype_db, context)
129
        except:
130
            LOG.exception('Failed to run action.')
131
            _, ex, tb = sys.exc_info()
132
            # mark execution as failed.
133
            status = action_constants.LIVEACTION_STATUS_FAILED
134
            # include the error message and traceback to try and provide some hints.
135
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
136
            context = None
137
        finally:
138
            # Log action completion
139
            extra = {'result': result, 'status': status}
140
            LOG.debug('Action "%s" completed.' % (action_db.name), extra=extra)
141
142
            # Update the final status of liveaction and corresponding action execution.
143
            liveaction_db = self._update_status(liveaction_db.id, status, result, context)
144
145
            # Always clean-up the auth_token
146
            # This method should be called in the finally block to ensure post_run is not impacted.
147
            self._clean_up_auth_token(runner=runner, status=status)
148
149
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
150
        runner.post_run(status=status, result=result)
151
152
        LOG.debug('Runner do_run result', extra={'result': liveaction_db.result})
153
        LOG.audit('Liveaction completed', extra={'liveaction_db': liveaction_db})
154
155
        return liveaction_db
156
157
    def _do_cancel(self, runner, runnertype_db, action_db, liveaction_db):
158
        try:
159
            extra = {'runner': runner}
160
            LOG.debug('Performing cancel for runner: %s', (runner.runner_id), extra=extra)
161
            (status, result, context) = runner.cancel()
162
163
            # Update the final status of liveaction and corresponding action execution.
164
            # The status is updated here because we want to keep the workflow running
165
            # as is if the cancel operation failed.
166
            liveaction_db = self._update_status(liveaction_db.id, status, result, context)
167
        except:
168
            _, ex, tb = sys.exc_info()
169
            # include the error message and traceback to try and provide some hints.
170
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
171
            LOG.exception('Failed to cancel action %s.' % (liveaction_db.id), extra=result)
172
        finally:
173
            # Always clean-up the auth_token
174
            # This method should be called in the finally block to ensure post_run is not impacted.
175
            self._clean_up_auth_token(runner=runner, status=liveaction_db.status)
176
177
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
178
        result = {'error': 'Execution canceled by user.'}
179
        runner.post_run(status=liveaction_db.status, result=result)
180
181
        return liveaction_db
182
183
    def _do_pause(self, runner, runnertype_db, action_db, liveaction_db):
184
        try:
185
            extra = {'runner': runner}
186
            LOG.debug('Performing pause for runner: %s', (runner.runner_id), extra=extra)
187
            (status, result, context) = runner.pause()
188
        except:
189
            _, ex, tb = sys.exc_info()
190
            # include the error message and traceback to try and provide some hints.
191
            status = action_constants.LIVEACTION_STATUS_FAILED
192
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
193
            context = liveaction_db.context
194
            LOG.exception('Failed to pause action %s.' % (liveaction_db.id), extra=result)
195
        finally:
196
            # Update the final status of liveaction and corresponding action execution.
197
            liveaction_db = self._update_status(liveaction_db.id, status, result, context)
198
199
            # Always clean-up the auth_token
200
            self._clean_up_auth_token(runner=runner, status=liveaction_db.status)
201
202
        return liveaction_db
203
204
    def _do_resume(self, runner, runnertype_db, action_db, liveaction_db):
205
        try:
206
            extra = {'runner': runner}
207
            LOG.debug('Performing resume for runner: %s', (runner.runner_id), extra=extra)
208
209
            (status, result, context) = runner.resume()
210
211
            try:
212
                result = json.loads(result)
213
            except:
214
                pass
215
216
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
217
218
            if isinstance(runner, PollingAsyncActionRunner) and not action_completed:
219
                queries.setup_query(liveaction_db.id, runnertype_db, context)
220
        except:
221
            _, ex, tb = sys.exc_info()
222
            # include the error message and traceback to try and provide some hints.
223
            status = action_constants.LIVEACTION_STATUS_FAILED
224
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
225
            context = liveaction_db.context
226
            LOG.exception('Failed to resume action %s.' % (liveaction_db.id), extra=result)
227
        finally:
228
            # Update the final status of liveaction and corresponding action execution.
229
            liveaction_db = self._update_status(liveaction_db.id, status, result, context)
230
231
            # Always clean-up the auth_token
232
            # This method should be called in the finally block to ensure post_run is not impacted.
233
            self._clean_up_auth_token(runner=runner, status=liveaction_db.status)
234
235
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
236
        runner.post_run(status=status, result=result)
237
238
        LOG.debug('Runner do_run result', extra={'result': liveaction_db.result})
239
        LOG.audit('Liveaction completed', extra={'liveaction_db': liveaction_db})
240
241
        return liveaction_db
242
243
    def _clean_up_auth_token(self, runner, status):
244
        """
245
        Clean up the temporary auth token for the current action.
246
247
        Note: This method should never throw since it's called inside finally block which assumes
248
        it doesn't throw.
249
        """
250
        # Deletion of the runner generated auth token is delayed until the token expires.
251
        # Async actions such as Mistral workflows uses the auth token to launch other
252
        # actions in the workflow. If the auth token is deleted here, then the actions
253
        # in the workflow will fail with unauthorized exception.
254
        is_async_runner = isinstance(runner, AsyncActionRunner)
255
        action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
256
257
        if not is_async_runner or (is_async_runner and action_completed):
258
            try:
259
                self._delete_auth_token(runner.auth_token)
260
            except:
261
                LOG.exception('Unable to clean-up auth_token.')
262
263
            return True
264
265
        return False
266
267
    def _update_live_action_db(self, liveaction_id, status, result, context):
268
        """
269
        Update LiveActionDB object for the provided liveaction id.
270
        """
271
        liveaction_db = get_liveaction_by_id(liveaction_id)
272
273
        state_changed = (liveaction_db.status != status)
274
275
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
276
            end_timestamp = date_utils.get_datetime_utc_now()
277
        else:
278
            end_timestamp = None
279
280
        liveaction_db = update_liveaction_status(status=status,
281
                                                 result=result,
282
                                                 context=context,
283
                                                 end_timestamp=end_timestamp,
284
                                                 liveaction_db=liveaction_db)
285
        return (liveaction_db, state_changed)
286
287
    def _update_status(self, liveaction_id, status, result, context):
288
        try:
289
            LOG.debug('Setting status: %s for liveaction: %s', status, liveaction_id)
290
            liveaction_db, state_changed = self._update_live_action_db(
291
                liveaction_id, status, result, context)
292
        except Exception as e:
293
            LOG.exception(
294
                'Cannot update liveaction '
295
                '(id: %s, status: %s, result: %s).' % (
296
                    liveaction_id, status, result)
297
            )
298
            raise e
299
300
        try:
301
            executions.update_execution(liveaction_db, publish=state_changed)
302
            extra = {'liveaction_db': liveaction_db}
303
            LOG.debug('Updated liveaction after run', extra=extra)
304
        except Exception as e:
305
            LOG.exception(
306
                'Cannot update action execution for liveaction '
307
                '(id: %s, status: %s, result: %s).' % (
308
                    liveaction_id, status, result)
309
            )
310
            raise e
311
312
        return liveaction_db
313
314
    def _get_entry_point_abs_path(self, pack, entry_point):
315
        return content_utils.get_entry_point_abs_path(pack=pack, entry_point=entry_point)
316
317
    def _get_action_libs_abs_path(self, pack, entry_point):
318
        return content_utils.get_action_libs_abs_path(pack=pack, entry_point=entry_point)
319
320
    def _get_rerun_reference(self, context):
321
        execution_id = context.get('re-run', {}).get('ref')
322
        return ActionExecution.get_by_id(execution_id) if execution_id else None
323
324
    def _get_runner(self, runnertype_db, action_db, liveaction_db):
325
        resolved_entry_point = self._get_entry_point_abs_path(action_db.pack,
326
                                                              action_db.entry_point)
327
        context = getattr(liveaction_db, 'context', dict())
328
        user = context.get('user', cfg.CONF.system_user.user)
329
330
        # Note: Right now configs are only supported by the Python runner actions
331
        if runnertype_db.runner_module == 'python_runner':
332
            LOG.debug('Loading config for pack')
333
334
            config_loader = ContentPackConfigLoader(pack_name=action_db.pack, user=user)
335
            config = config_loader.get_config()
336
        else:
337
            config = None
338
339
        runner = get_runner(module_name=runnertype_db.runner_module, config=config)
340
341
        # TODO: Pass those arguments to the constructor instead of late
342
        # assignment, late assignment is awful
343
        runner.runner_type_db = runnertype_db
344
        runner.action = action_db
345
        runner.action_name = action_db.name
346
        runner.liveaction = liveaction_db
347
        runner.liveaction_id = str(liveaction_db.id)
348
        runner.execution = ActionExecution.get(liveaction__id=runner.liveaction_id)
349
        runner.execution_id = str(runner.execution.id)
350
        runner.entry_point = resolved_entry_point
351
        runner.context = context
352
        runner.callback = getattr(liveaction_db, 'callback', dict())
353
        runner.libs_dir_path = self._get_action_libs_abs_path(action_db.pack,
354
                                                              action_db.entry_point)
355
356
        # For re-run, get the ActionExecutionDB in which the re-run is based on.
357
        rerun_ref_id = runner.context.get('re-run', {}).get('ref')
358
        runner.rerun_ex_ref = ActionExecution.get(id=rerun_ref_id) if rerun_ref_id else None
359
360
        return runner
361
362
    def _create_auth_token(self, context, action_db, liveaction_db):
363
        if not context:
364
            return None
365
366
        user = context.get('user', None)
367
        if not user:
368
            return None
369
370
        metadata = {
371
            'service': 'actions_container',
372
            'action_name': action_db.name,
373
            'live_action_id': str(liveaction_db.id)
374
375
        }
376
377
        ttl = cfg.CONF.auth.service_token_ttl
378
        token_db = access.create_token(username=user, ttl=ttl, metadata=metadata, service=True)
379
        return token_db
380
381
    def _delete_auth_token(self, auth_token):
382
        if auth_token:
383
            access.delete_token(auth_token.token)
384
385
386
def get_runner_container():
387
    return RunnerContainer()
388