Completed
Pull Request — master (#2383)
by W
06:17
created

st2actions.container.RunnerContainer   A

Complexity

Total Complexity 31

Size/Duplication

Total Lines 215
Duplicated Lines 0 %
Metric Value
wmc 31
dl 0
loc 215
rs 9.8

11 Methods

Rating   Name   Duplication   Size   Complexity  
A _get_runner() 0 20 1
A dispatch() 0 20 3
F _do_run() 0 86 11
A _get_entry_point_abs_path() 0 3 1
B _do_cancel() 0 24 2
A _create_auth_token() 0 7 3
A _delete_auth_token() 0 3 2
A _get_action_libs_abs_path() 0 3 1
A _setup_async_query() 0 10 3
A _update_live_action_db() 0 16 2
A _create_execution_state() 0 11 2
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import json
17
import sys
18
import traceback
19
20
from st2common import log as logging
21
from st2common.util import date as date_utils
22
from st2common.constants import action as action_constants
23
from st2common.exceptions import actionrunner
24
from st2common.exceptions.param import ParamException
25
from st2common.models.db.executionstate import ActionExecutionStateDB
26
from st2common.models.system.action import ResolvedActionParameters
27
from st2common.persistence.execution import ActionExecution
28
from st2common.persistence.executionstate import ActionExecutionState
29
from st2common.services import access, executions
30
from st2common.util.action_db import (get_action_by_ref, get_runnertype_by_name)
31
from st2common.util.action_db import (update_liveaction_status, get_liveaction_by_id)
32
from st2common.util import param as param_utils
33
34
from st2actions.container.service import RunnerContainerService
35
from st2actions.runners import get_runner, AsyncActionRunner
36
37
LOG = logging.getLogger(__name__)
38
39
__all__ = [
40
    'RunnerContainer',
41
    'get_runner_container'
42
]
43
44
45
class RunnerContainer(object):
46
47
    def _get_runner(self, runnertype_db, action_db, liveaction_db):
48
        runner = get_runner(runnertype_db.runner_module)
49
50
        resolved_entry_point = self._get_entry_point_abs_path(action_db.pack,
51
                                                              action_db.entry_point)
52
53
        runner.container_service = RunnerContainerService()
54
        runner.action = action_db
55
        runner.action_name = action_db.name
56
        runner.liveaction = liveaction_db
57
        runner.liveaction_id = str(liveaction_db.id)
58
        runner.execution = ActionExecution.get(liveaction__id=runner.liveaction_id)
59
        runner.execution_id = str(runner.execution.id)
60
        runner.entry_point = resolved_entry_point
61
        runner.context = getattr(liveaction_db, 'context', dict())
62
        runner.callback = getattr(liveaction_db, 'callback', dict())
63
        runner.libs_dir_path = self._get_action_libs_abs_path(action_db.pack,
64
                                                              action_db.entry_point)
65
66
        return runner
67
68
    def dispatch(self, liveaction_db):
69
        action_db = get_action_by_ref(liveaction_db.action)
70
        if not action_db:
71
            raise Exception('Action %s not found in DB.' % (liveaction_db.action))
72
73
        runnertype_db = get_runnertype_by_name(action_db.runner_type['name'])
74
75
        extra = {'liveaction_db': liveaction_db, 'runnertype_db': runnertype_db}
76
        LOG.info('Dispatching Action to a runner', extra=extra)
77
78
        # Get runner instance.
79
        runner = self._get_runner(runnertype_db, action_db, liveaction_db)
80
        LOG.debug('Runner instance for RunnerType "%s" is: %s', runnertype_db.name, runner)
81
82
        # Process the request.
83
        liveaction_db = (self._do_cancel(runner, runnertype_db, action_db, liveaction_db)
84
                         if liveaction_db.status == action_constants.LIVEACTION_STATUS_CANCELING
85
                         else self._do_run(runner, runnertype_db, action_db, liveaction_db))
86
87
        return liveaction_db.result
88
89
    def _do_run(self, runner, runnertype_db, action_db, liveaction_db):
90
        # Create a temporary auth token which will be available
91
        # for the duration of the action execution.
92
        runner.auth_token = self._create_auth_token(runner.context)
93
94
        updated_liveaction_db = None
95
        try:
96
            # Finalized parameters are resolved and then rendered. This process could
97
            # fail. Handle the exception and report the error correctly.
98
            try:
99
                runner_params, action_params = param_utils.render_final_params(
100
                    runnertype_db.runner_parameters, action_db.parameters, liveaction_db.parameters,
101
                    liveaction_db.context)
102
                runner.runner_parameters = runner_params
103
            except ParamException as e:
104
                raise actionrunner.ActionRunnerException(str(e))
105
106
            LOG.debug('Performing pre-run for runner: %s', runner.runner_id)
107
            runner.pre_run()
108
109
            # Mask secret parameters in the log context
110
            resolved_action_params = ResolvedActionParameters(action_db=action_db,
111
                                                              runner_type_db=runnertype_db,
112
                                                              runner_parameters=runner_params,
113
                                                              action_parameters=action_params)
114
            extra = {'runner': runner, 'parameters': resolved_action_params}
115
            LOG.debug('Performing run for runner: %s' % (runner.runner_id), extra=extra)
0 ignored issues
show
Bug introduced by
The variable extra was used before it was assigned.
Loading history...
116
            (status, result, context) = runner.run(action_params)
117
118
            try:
119
                result = json.loads(result)
120
            except:
121
                pass
122
123
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
124
            if isinstance(runner, AsyncActionRunner) and not action_completed:
0 ignored issues
show
Bug introduced by
The variable action_completed was used before it was assigned.
Loading history...
125
                self._setup_async_query(liveaction_db.id, runnertype_db, context)
126
        except:
127
            LOG.exception('Failed to run action.')
128
            _, ex, tb = sys.exc_info()
129
            # mark execution as failed.
130
            status = action_constants.LIVEACTION_STATUS_FAILED
131
            # include the error message and traceback to try and provide some hints.
132
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
133
            context = None
134
        finally:
135
            # Log action completion
136
            extra = {'result': result, 'status': status}
137
            LOG.debug('Action "%s" completed.' % (action_db.name), extra=extra)
138
139
            # Always clean-up the auth_token
140
            try:
141
                LOG.debug('Setting status: %s for liveaction: %s', status, liveaction_db.id)
142
                updated_liveaction_db = self._update_live_action_db(liveaction_db.id, status,
143
                                                                    result, context)
144
            except:
145
                error = 'Cannot update LiveAction object for id: %s, status: %s, result: %s.' % (
146
                    liveaction_db.id, status, result)
147
                LOG.exception(error)
148
                raise
149
150
            executions.update_execution(updated_liveaction_db)
151
            extra = {'liveaction_db': updated_liveaction_db}
152
            LOG.debug('Updated liveaction after run', extra=extra)
153
154
            # Deletion of the runner generated auth token is delayed until the token expires.
155
            # Async actions such as Mistral workflows uses the auth token to launch other
156
            # actions in the workflow. If the auth token is deleted here, then the actions
157
            # in the workflow will fail with unauthorized exception.
158
            is_async_runner = isinstance(runner, AsyncActionRunner)
159
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
160
161
            if not is_async_runner or (is_async_runner and action_completed):
162
                try:
163
                    self._delete_auth_token(runner.auth_token)
164
                except:
165
                    LOG.exception('Unable to clean-up auth_token.')
166
167
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
168
        runner.post_run(status, result)
169
        runner.container_service = None
170
171
        LOG.debug('Runner do_run result', extra={'result': updated_liveaction_db.result})
172
        LOG.audit('Liveaction completed', extra={'liveaction_db': updated_liveaction_db})
173
174
        return updated_liveaction_db
175
176
    def _do_cancel(self, runner, runnertype_db, action_db, liveaction_db):
177
        try:
178
            extra = {'runner': runner}
179
            LOG.debug('Performing cancel for runner: %s', (runner.runner_id), extra=extra)
180
181
            runner.cancel()
182
183
            liveaction_db = update_liveaction_status(
184
                status=action_constants.LIVEACTION_STATUS_CANCELED,
185
                end_timestamp=date_utils.get_datetime_utc_now(),
186
                liveaction_db=liveaction_db)
187
188
            executions.update_execution(liveaction_db)
189
190
            LOG.debug('Performing post_run for runner: %s', runner.runner_id)
191
            runner.post_run(liveaction_db.status, {'error': 'Execution canceled by user.'})
192
            runner.container_service = None
193
        except:
194
            _, ex, tb = sys.exc_info()
195
            # include the error message and traceback to try and provide some hints.
196
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
197
            LOG.exception('Failed to cancel action %s.' % (liveaction_db.id), extra=result)
198
199
        return liveaction_db
200
201
    def _update_live_action_db(self, liveaction_id, status, result, context):
202
        """
203
        Update LiveActionDB object for the provided liveaction id.
204
        """
205
        liveaction_db = get_liveaction_by_id(liveaction_id)
206
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
207
            end_timestamp = date_utils.get_datetime_utc_now()
208
        else:
209
            end_timestamp = None
210
211
        liveaction_db = update_liveaction_status(status=status,
212
                                                 result=result,
213
                                                 context=context,
214
                                                 end_timestamp=end_timestamp,
215
                                                 liveaction_db=liveaction_db)
216
        return liveaction_db
217
218
    def _get_entry_point_abs_path(self, pack, entry_point):
219
        return RunnerContainerService.get_entry_point_abs_path(pack=pack,
220
                                                               entry_point=entry_point)
221
222
    def _get_action_libs_abs_path(self, pack, entry_point):
223
        return RunnerContainerService.get_action_libs_abs_path(pack=pack,
224
                                                               entry_point=entry_point)
225
226
    def _create_auth_token(self, context):
227
        if not context:
228
            return None
229
        user = context.get('user', None)
230
        if not user:
231
            return None
232
        return access.create_token(user)
233
234
    def _delete_auth_token(self, auth_token):
235
        if auth_token:
236
            access.delete_token(auth_token.token)
237
238
    def _setup_async_query(self, liveaction_id, runnertype_db, query_context):
239
        query_module = getattr(runnertype_db, 'query_module', None)
240
        if not query_module:
241
            LOG.error('No query module specified for runner %s.', runnertype_db)
242
            return
243
        try:
244
            self._create_execution_state(liveaction_id, runnertype_db, query_context)
245
        except:
246
            LOG.exception('Unable to create action execution state db model ' +
247
                          'for liveaction_id %s', liveaction_id)
248
249
    def _create_execution_state(self, liveaction_id, runnertype_db, query_context):
250
        state_db = ActionExecutionStateDB(
251
            execution_id=liveaction_id,
252
            query_module=runnertype_db.query_module,
253
            query_context=query_context)
254
        try:
255
            return ActionExecutionState.add_or_update(state_db)
256
        except:
257
            LOG.exception('Unable to create execution state db for liveaction_id %s.'
258
                          % liveaction_id)
259
            return None
260
261
262
def get_runner_container():
263
    return RunnerContainer()
264