Completed
Pull Request — master (#2380)
by W
06:39
created

_get_rerun_reference()   A

Complexity

Conditions 2

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 3
rs 10
cc 2
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import json
17
import sys
18
import traceback
19
20
from st2common import log as logging
21
from st2common.util import date as date_utils
22
from st2common.constants import action as action_constants
23
from st2common.exceptions import actionrunner
24
from st2common.exceptions.param import ParamException
25
from st2common.models.db.executionstate import ActionExecutionStateDB
26
from st2common.models.system.action import ResolvedActionParameters
27
from st2common.persistence.execution import ActionExecution
28
from st2common.persistence.executionstate import ActionExecutionState
29
from st2common.services import access, executions
30
from st2common.util.action_db import (get_action_by_ref, get_runnertype_by_name)
31
from st2common.util.action_db import (update_liveaction_status, get_liveaction_by_id)
32
from st2common.util import param as param_utils
33
34
from st2actions.container.service import RunnerContainerService
35
from st2actions.runners import get_runner, AsyncActionRunner
36
37
LOG = logging.getLogger(__name__)
38
39
__all__ = [
40
    'RunnerContainer',
41
    'get_runner_container'
42
]
43
44
45
class RunnerContainer(object):
46
47
    def _get_rerun_reference(self, context):
48
        execution_id = context.get('re-run', {}).get('ref')
49
        return ActionExecution.get_by_id(execution_id) if execution_id else None
50
51
    def _get_runner(self, runnertype_db, action_db, liveaction_db):
52
        runner = get_runner(runnertype_db.runner_module)
53
54
        resolved_entry_point = self._get_entry_point_abs_path(action_db.pack,
55
                                                              action_db.entry_point)
56
57
        runner.container_service = RunnerContainerService()
58
        runner.action = action_db
59
        runner.action_name = action_db.name
60
        runner.liveaction = liveaction_db
61
        runner.liveaction_id = str(liveaction_db.id)
62
        runner.execution = ActionExecution.get(liveaction__id=runner.liveaction_id)
63
        runner.execution_id = str(runner.execution.id)
64
        runner.entry_point = resolved_entry_point
65
        runner.context = getattr(liveaction_db, 'context', dict())
66
        runner.callback = getattr(liveaction_db, 'callback', dict())
67
        runner.libs_dir_path = self._get_action_libs_abs_path(action_db.pack,
68
                                                              action_db.entry_point)
69
70
        # For re-run, get the ActionExecutionDB in which the re-run is based on.
71
        rerun_ref_id = runner.context.get('re-run', {}).get('ref')
72
        runner.rerun_ex_ref = ActionExecution.get(id=rerun_ref_id) if rerun_ref_id else None
73
74
        return runner
75
76
    def dispatch(self, liveaction_db):
77
        action_db = get_action_by_ref(liveaction_db.action)
78
        if not action_db:
79
            raise Exception('Action %s not found in DB.' % (liveaction_db.action))
80
81
        runnertype_db = get_runnertype_by_name(action_db.runner_type['name'])
82
83
        extra = {'liveaction_db': liveaction_db, 'runnertype_db': runnertype_db}
84
        LOG.info('Dispatching Action to a runner', extra=extra)
85
86
        # Get runner instance.
87
        runner = self._get_runner(runnertype_db, action_db, liveaction_db)
88
        LOG.debug('Runner instance for RunnerType "%s" is: %s', runnertype_db.name, runner)
89
90
        # Process the request.
91
        liveaction_db = (self._do_cancel(runner, runnertype_db, action_db, liveaction_db)
92
                         if liveaction_db.status == action_constants.LIVEACTION_STATUS_CANCELING
93
                         else self._do_run(runner, runnertype_db, action_db, liveaction_db))
94
95
        return liveaction_db.result
96
97
    def _do_run(self, runner, runnertype_db, action_db, liveaction_db):
98
        # Create a temporary auth token which will be available
99
        # for the duration of the action execution.
100
        runner.auth_token = self._create_auth_token(runner.context)
101
102
        updated_liveaction_db = None
103
        try:
104
            # Finalized parameters are resolved and then rendered. This process could
105
            # fail. Handle the exception and report the error correctly.
106
            try:
107
                runner_params, action_params = param_utils.render_final_params(
108
                    runnertype_db.runner_parameters, action_db.parameters, liveaction_db.parameters,
109
                    liveaction_db.context)
110
                runner.runner_parameters = runner_params
111
            except ParamException as e:
112
                raise actionrunner.ActionRunnerException(str(e))
113
114
            LOG.debug('Performing pre-run for runner: %s', runner.runner_id)
115
            runner.pre_run()
116
117
            # Mask secret parameters in the log context
118
            resolved_action_params = ResolvedActionParameters(action_db=action_db,
119
                                                              runner_type_db=runnertype_db,
120
                                                              runner_parameters=runner_params,
121
                                                              action_parameters=action_params)
122
            extra = {'runner': runner, 'parameters': resolved_action_params}
123
            LOG.debug('Performing run for runner: %s' % (runner.runner_id), extra=extra)
0 ignored issues
show
Bug introduced by
The variable extra was used before it was assigned.
Loading history...
124
            (status, result, context) = runner.run(action_params)
125
126
            try:
127
                result = json.loads(result)
128
            except:
129
                pass
130
131
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
132
            if isinstance(runner, AsyncActionRunner) and not action_completed:
0 ignored issues
show
Bug introduced by
The variable action_completed was used before it was assigned.
Loading history...
133
                self._setup_async_query(liveaction_db.id, runnertype_db, context)
134
        except:
135
            LOG.exception('Failed to run action.')
136
            _, ex, tb = sys.exc_info()
137
            # mark execution as failed.
138
            status = action_constants.LIVEACTION_STATUS_FAILED
139
            # include the error message and traceback to try and provide some hints.
140
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
141
            context = None
142
        finally:
143
            # Log action completion
144
            extra = {'result': result, 'status': status}
145
            LOG.debug('Action "%s" completed.' % (action_db.name), extra=extra)
146
147
            # Always clean-up the auth_token
148
            try:
149
                LOG.debug('Setting status: %s for liveaction: %s', status, liveaction_db.id)
150
                updated_liveaction_db = self._update_live_action_db(liveaction_db.id, status,
151
                                                                    result, context)
152
            except:
153
                error = 'Cannot update LiveAction object for id: %s, status: %s, result: %s.' % (
154
                    liveaction_db.id, status, result)
155
                LOG.exception(error)
156
                raise
157
158
            executions.update_execution(updated_liveaction_db)
159
            extra = {'liveaction_db': updated_liveaction_db}
160
            LOG.debug('Updated liveaction after run', extra=extra)
161
162
            # Deletion of the runner generated auth token is delayed until the token expires.
163
            # Async actions such as Mistral workflows uses the auth token to launch other
164
            # actions in the workflow. If the auth token is deleted here, then the actions
165
            # in the workflow will fail with unauthorized exception.
166
            is_async_runner = isinstance(runner, AsyncActionRunner)
167
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
168
169
            if not is_async_runner or (is_async_runner and action_completed):
170
                try:
171
                    self._delete_auth_token(runner.auth_token)
172
                except:
173
                    LOG.exception('Unable to clean-up auth_token.')
174
175
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
176
        runner.post_run(status, result)
177
        runner.container_service = None
178
179
        LOG.debug('Runner do_run result', extra={'result': updated_liveaction_db.result})
180
        LOG.audit('Liveaction completed', extra={'liveaction_db': updated_liveaction_db})
181
182
        return updated_liveaction_db
183
184
    def _do_cancel(self, runner, runnertype_db, action_db, liveaction_db):
185
        try:
186
            extra = {'runner': runner}
187
            LOG.debug('Performing cancel for runner: %s', (runner.runner_id), extra=extra)
188
189
            runner.cancel()
190
191
            liveaction_db = update_liveaction_status(
192
                status=action_constants.LIVEACTION_STATUS_CANCELED,
193
                end_timestamp=date_utils.get_datetime_utc_now(),
194
                liveaction_db=liveaction_db)
195
196
            executions.update_execution(liveaction_db)
197
        except:
198
            _, ex, tb = sys.exc_info()
199
            # include the error message and traceback to try and provide some hints.
200
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
201
            LOG.exception('Failed to cancel action %s.' % (liveaction_db.id), extra=result)
202
203
        return liveaction_db
204
205
    def _update_live_action_db(self, liveaction_id, status, result, context):
206
        """
207
        Update LiveActionDB object for the provided liveaction id.
208
        """
209
        liveaction_db = get_liveaction_by_id(liveaction_id)
210
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
211
            end_timestamp = date_utils.get_datetime_utc_now()
212
        else:
213
            end_timestamp = None
214
215
        liveaction_db = update_liveaction_status(status=status,
216
                                                 result=result,
217
                                                 context=context,
218
                                                 end_timestamp=end_timestamp,
219
                                                 liveaction_db=liveaction_db)
220
        return liveaction_db
221
222
    def _get_entry_point_abs_path(self, pack, entry_point):
223
        return RunnerContainerService.get_entry_point_abs_path(pack=pack,
224
                                                               entry_point=entry_point)
225
226
    def _get_action_libs_abs_path(self, pack, entry_point):
227
        return RunnerContainerService.get_action_libs_abs_path(pack=pack,
228
                                                               entry_point=entry_point)
229
230
    def _create_auth_token(self, context):
231
        if not context:
232
            return None
233
        user = context.get('user', None)
234
        if not user:
235
            return None
236
        return access.create_token(user)
237
238
    def _delete_auth_token(self, auth_token):
239
        if auth_token:
240
            access.delete_token(auth_token.token)
241
242
    def _setup_async_query(self, liveaction_id, runnertype_db, query_context):
243
        query_module = getattr(runnertype_db, 'query_module', None)
244
        if not query_module:
245
            LOG.error('No query module specified for runner %s.', runnertype_db)
246
            return
247
        try:
248
            self._create_execution_state(liveaction_id, runnertype_db, query_context)
249
        except:
250
            LOG.exception('Unable to create action execution state db model ' +
251
                          'for liveaction_id %s', liveaction_id)
252
253
    def _create_execution_state(self, liveaction_id, runnertype_db, query_context):
254
        state_db = ActionExecutionStateDB(
255
            execution_id=liveaction_id,
256
            query_module=runnertype_db.query_module,
257
            query_context=query_context)
258
        try:
259
            return ActionExecutionState.add_or_update(state_db)
260
        except:
261
            LOG.exception('Unable to create execution state db for liveaction_id %s.'
262
                          % liveaction_id)
263
            return None
264
265
266
def get_runner_container():
267
    return RunnerContainer()
268