Passed
Push — master ( 6d5581...e733e5 )
by
unknown
03:17
created

RunnerContainer.dispatch()   B

Complexity

Conditions 3

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
dl 0
loc 26
rs 8.8571
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import json
17
import sys
18
import traceback
19
20
from oslo_config import cfg
21
22
from st2common import log as logging
23
from st2common.util import date as date_utils
24
from st2common.constants import action as action_constants
25
from st2common.exceptions import actionrunner
26
from st2common.exceptions.param import ParamException
27
from st2common.models.db.executionstate import ActionExecutionStateDB
28
from st2common.models.system.action import ResolvedActionParameters
29
from st2common.persistence.execution import ActionExecution
30
from st2common.persistence.executionstate import ActionExecutionState
31
from st2common.services import access, executions
32
from st2common.util.action_db import (get_action_by_ref, get_runnertype_by_name)
33
from st2common.util.action_db import (update_liveaction_status, get_liveaction_by_id)
34
from st2common.util import param as param_utils
35
36
from st2actions.container.service import RunnerContainerService
37
from st2common.runners.base import get_runner
38
from st2common.runners.base import AsyncActionRunner
39
40
LOG = logging.getLogger(__name__)
41
42
__all__ = [
43
    'RunnerContainer',
44
    'get_runner_container'
45
]
46
47
48
class RunnerContainer(object):
49
50
    def dispatch(self, liveaction_db):
51
        action_db = get_action_by_ref(liveaction_db.action)
52
        if not action_db:
53
            raise Exception('Action %s not found in DB.' % (liveaction_db.action))
54
55
        liveaction_db.context['pack'] = action_db.pack
56
57
        runnertype_db = get_runnertype_by_name(action_db.runner_type['name'])
58
59
        extra = {'liveaction_db': liveaction_db, 'runnertype_db': runnertype_db}
60
        LOG.info('Dispatching Action to a runner', extra=extra)
61
62
        # Get runner instance.
63
        runner = self._get_runner(runnertype_db, action_db, liveaction_db)
64
65
        LOG.debug('Runner instance for RunnerType "%s" is: %s', runnertype_db.name, runner)
66
67
        # Process the request.
68
        if liveaction_db.status == action_constants.LIVEACTION_STATUS_CANCELING:
69
            liveaction_db = self._do_cancel(runner=runner, runnertype_db=runnertype_db,
70
                                            action_db=action_db, liveaction_db=liveaction_db)
71
        else:
72
            liveaction_db = self._do_run(runner=runner, runnertype_db=runnertype_db,
73
                                         action_db=action_db, liveaction_db=liveaction_db)
74
75
        return liveaction_db.result
76
77
    def _do_run(self, runner, runnertype_db, action_db, liveaction_db):
78
        # Create a temporary auth token which will be available
79
        # for the duration of the action execution.
80
        runner.auth_token = self._create_auth_token(context=runner.context, action_db=action_db,
81
                                                    liveaction_db=liveaction_db)
82
83
        updated_liveaction_db = None
84
        try:
85
            # Finalized parameters are resolved and then rendered. This process could
86
            # fail. Handle the exception and report the error correctly.
87
            try:
88
                runner_params, action_params = param_utils.render_final_params(
89
                    runnertype_db.runner_parameters, action_db.parameters, liveaction_db.parameters,
90
                    liveaction_db.context)
91
                runner.runner_parameters = runner_params
92
            except ParamException as e:
93
                raise actionrunner.ActionRunnerException(str(e))
94
95
            LOG.debug('Performing pre-run for runner: %s', runner.runner_id)
96
            runner.pre_run()
97
98
            # Mask secret parameters in the log context
99
            resolved_action_params = ResolvedActionParameters(action_db=action_db,
100
                                                              runner_type_db=runnertype_db,
101
                                                              runner_parameters=runner_params,
102
                                                              action_parameters=action_params)
103
            extra = {'runner': runner, 'parameters': resolved_action_params}
104
            LOG.debug('Performing run for runner: %s' % (runner.runner_id), extra=extra)
0 ignored issues
show
Bug introduced by
The variable extra was used before it was assigned.
Loading history...
105
            (status, result, context) = runner.run(action_params)
106
107
            try:
108
                result = json.loads(result)
109
            except:
110
                pass
111
112
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
113
            if isinstance(runner, AsyncActionRunner) and not action_completed:
114
                self._setup_async_query(liveaction_db.id, runnertype_db, context)
115
        except:
116
            LOG.exception('Failed to run action.')
117
            _, ex, tb = sys.exc_info()
118
            # mark execution as failed.
119
            status = action_constants.LIVEACTION_STATUS_FAILED
120
            # include the error message and traceback to try and provide some hints.
121
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
122
            context = None
123
        finally:
124
            # Log action completion
125
            extra = {'result': result, 'status': status}
126
            LOG.debug('Action "%s" completed.' % (action_db.name), extra=extra)
127
128
            try:
129
                LOG.debug('Setting status: %s for liveaction: %s', status, liveaction_db.id)
130
                updated_liveaction_db = self._update_live_action_db(liveaction_db.id, status,
131
                                                                    result, context)
132
            except Exception as e:
133
                msg = 'Cannot update LiveAction object for id: %s, status: %s, result: %s.' % (
134
                    liveaction_db.id, status, result)
135
                LOG.exception(msg)
136
                raise e
137
138
            try:
139
                executions.update_execution(updated_liveaction_db)
140
                extra = {'liveaction_db': updated_liveaction_db}
141
                LOG.debug('Updated liveaction after run', extra=extra)
142
            except Exception as e:
143
                msg = 'Cannot update ActionExecution object for id: %s, status: %s, result: %s.' % (
144
                    updated_liveaction_db.id, status, result)
145
                LOG.exception(msg)
146
                raise e
147
148
            # Always clean-up the auth_token
149
            # Note: self._clean_up_auth_token should never throw to ensure post_run is always
150
            # called.
151
            self._clean_up_auth_token(runner=runner, status=status)
152
153
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
154
        runner.post_run(status=status, result=result)
155
        runner.container_service = None
156
157
        LOG.debug('Runner do_run result', extra={'result': updated_liveaction_db.result})
158
        LOG.audit('Liveaction completed', extra={'liveaction_db': updated_liveaction_db})
159
160
        return updated_liveaction_db
161
162
    def _do_cancel(self, runner, runnertype_db, action_db, liveaction_db):
163
        try:
164
            extra = {'runner': runner}
165
            LOG.debug('Performing cancel for runner: %s', (runner.runner_id), extra=extra)
166
167
            runner.cancel()
168
169
            liveaction_db = update_liveaction_status(
170
                status=action_constants.LIVEACTION_STATUS_CANCELED,
171
                end_timestamp=date_utils.get_datetime_utc_now(),
172
                liveaction_db=liveaction_db)
173
174
            executions.update_execution(liveaction_db)
175
176
            LOG.debug('Performing post_run for runner: %s', runner.runner_id)
177
            result = {'error': 'Execution canceled by user.'}
178
            runner.post_run(status=liveaction_db.status, result=result)
179
            runner.container_service = None
180
        except:
181
            _, ex, tb = sys.exc_info()
182
            # include the error message and traceback to try and provide some hints.
183
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
184
            LOG.exception('Failed to cancel action %s.' % (liveaction_db.id), extra=result)
185
        finally:
186
            # Always clean-up the auth_token
187
            status = liveaction_db.status
188
            self._clean_up_auth_token(runner=runner, status=status)
189
190
        return liveaction_db
191
192
    def _clean_up_auth_token(self, runner, status):
193
        """
194
        Clean up the temporary auth token for the current action.
195
196
        Note: This method should never throw since it's called inside finally block which assumes
197
        it doesn't throw.
198
        """
199
        # Deletion of the runner generated auth token is delayed until the token expires.
200
        # Async actions such as Mistral workflows uses the auth token to launch other
201
        # actions in the workflow. If the auth token is deleted here, then the actions
202
        # in the workflow will fail with unauthorized exception.
203
        is_async_runner = isinstance(runner, AsyncActionRunner)
204
        action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
205
206
        if not is_async_runner or (is_async_runner and action_completed):
207
            try:
208
                self._delete_auth_token(runner.auth_token)
209
            except:
210
                LOG.exception('Unable to clean-up auth_token.')
211
212
            return True
213
214
        return False
215
216
    def _update_live_action_db(self, liveaction_id, status, result, context):
217
        """
218
        Update LiveActionDB object for the provided liveaction id.
219
        """
220
        liveaction_db = get_liveaction_by_id(liveaction_id)
221
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
222
            end_timestamp = date_utils.get_datetime_utc_now()
223
        else:
224
            end_timestamp = None
225
226
        liveaction_db = update_liveaction_status(status=status,
227
                                                 result=result,
228
                                                 context=context,
229
                                                 end_timestamp=end_timestamp,
230
                                                 liveaction_db=liveaction_db)
231
        return liveaction_db
232
233
    def _get_entry_point_abs_path(self, pack, entry_point):
234
        return RunnerContainerService.get_entry_point_abs_path(pack=pack,
235
                                                               entry_point=entry_point)
236
237
    def _get_action_libs_abs_path(self, pack, entry_point):
238
        return RunnerContainerService.get_action_libs_abs_path(pack=pack,
239
                                                               entry_point=entry_point)
240
241
    def _get_rerun_reference(self, context):
242
        execution_id = context.get('re-run', {}).get('ref')
243
        return ActionExecution.get_by_id(execution_id) if execution_id else None
244
245
    def _get_runner(self, runnertype_db, action_db, liveaction_db):
246
        runner = get_runner(runnertype_db.runner_module)
247
248
        resolved_entry_point = self._get_entry_point_abs_path(action_db.pack,
249
                                                              action_db.entry_point)
250
251
        runner.runner_type_db = runnertype_db
252
        runner.container_service = RunnerContainerService()
253
        runner.action = action_db
254
        runner.action_name = action_db.name
255
        runner.liveaction = liveaction_db
256
        runner.liveaction_id = str(liveaction_db.id)
257
        runner.execution = ActionExecution.get(liveaction__id=runner.liveaction_id)
258
        runner.execution_id = str(runner.execution.id)
259
        runner.entry_point = resolved_entry_point
260
        runner.context = getattr(liveaction_db, 'context', dict())
261
        runner.callback = getattr(liveaction_db, 'callback', dict())
262
        runner.libs_dir_path = self._get_action_libs_abs_path(action_db.pack,
263
                                                              action_db.entry_point)
264
265
        # For re-run, get the ActionExecutionDB in which the re-run is based on.
266
        rerun_ref_id = runner.context.get('re-run', {}).get('ref')
267
        runner.rerun_ex_ref = ActionExecution.get(id=rerun_ref_id) if rerun_ref_id else None
268
269
        return runner
270
271
    def _create_auth_token(self, context, action_db, liveaction_db):
272
        if not context:
273
            return None
274
275
        user = context.get('user', None)
276
        if not user:
277
            return None
278
279
        metadata = {
280
            'service': 'actions_container',
281
            'action_name': action_db.name,
282
            'live_action_id': str(liveaction_db.id)
283
284
        }
285
286
        ttl = cfg.CONF.auth.service_token_ttl
287
        token_db = access.create_token(username=user, ttl=ttl, metadata=metadata, service=True)
288
        return token_db
289
290
    def _delete_auth_token(self, auth_token):
291
        if auth_token:
292
            access.delete_token(auth_token.token)
293
294
    def _setup_async_query(self, liveaction_id, runnertype_db, query_context):
295
        query_module = getattr(runnertype_db, 'query_module', None)
296
        if not query_module:
297
            LOG.error('No query module specified for runner %s.', runnertype_db)
298
            return
299
        try:
300
            self._create_execution_state(liveaction_id, runnertype_db, query_context)
301
        except:
302
            LOG.exception('Unable to create action execution state db model ' +
303
                          'for liveaction_id %s', liveaction_id)
304
305
    def _create_execution_state(self, liveaction_id, runnertype_db, query_context):
306
        state_db = ActionExecutionStateDB(
307
            execution_id=liveaction_id,
308
            query_module=runnertype_db.query_module,
309
            query_context=query_context)
310
        try:
311
            return ActionExecutionState.add_or_update(state_db)
312
        except:
313
            LOG.exception('Unable to create execution state db for liveaction_id %s.'
314
                          % liveaction_id)
315
            return None
316
317
318
def get_runner_container():
319
    return RunnerContainer()
320