Completed
Pull Request — master (#2334)
by Edward
06:02
created

st2actions.container.RunnerContainer   A

Complexity

Total Complexity 31

Size/Duplication

Total Lines 211
Duplicated Lines 0 %
Metric Value
wmc 31
dl 0
loc 211
rs 9.8

11 Methods

Rating   Name   Duplication   Size   Complexity  
A _get_entry_point_abs_path() 0 3 1
A _do_cancel() 0 20 2
A _create_auth_token() 0 7 3
A _delete_auth_token() 0 3 2
A _get_action_libs_abs_path() 0 3 1
A _get_runner() 0 20 1
A dispatch() 0 20 3
A _setup_async_query() 0 10 3
A _update_live_action_db() 0 16 2
F _do_run() 0 86 11
A _create_execution_state() 0 11 2
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import json
17
import sys
18
import traceback
19
20
from st2common import log as logging
21
from st2common.util import date as date_utils
22
from st2common.constants import action as action_constants
23
from st2common.exceptions import actionrunner
24
from st2common.exceptions.param import ParamException
25
from st2common.models.db.executionstate import ActionExecutionStateDB
26
from st2common.models.system.action import ResolvedActionParameters
27
from st2common.persistence.execution import ActionExecution
28
from st2common.persistence.executionstate import ActionExecutionState
29
from st2common.services import access, executions
30
from st2common.util.action_db import (get_action_by_ref, get_runnertype_by_name)
31
from st2common.util.action_db import (update_liveaction_status, get_liveaction_by_id)
32
from st2common.util import param as param_utils
33
34
from st2actions.container.service import RunnerContainerService
35
from st2actions.runners import get_runner, AsyncActionRunner
36
37
LOG = logging.getLogger(__name__)
38
39
__all__ = [
40
    'RunnerContainer',
41
    'get_runner_container'
42
]
43
44
45
class RunnerContainer(object):
46
47
    def _get_runner(self, runnertype_db, action_db, liveaction_db):
48
        runner = get_runner(runnertype_db.runner_module)
49
50
        resolved_entry_point = self._get_entry_point_abs_path(action_db.pack,
51
                                                              action_db.entry_point)
52
53
        runner.container_service = RunnerContainerService()
54
        runner.action = action_db
55
        runner.action_name = action_db.name
56
        runner.liveaction = liveaction_db
57
        runner.liveaction_id = str(liveaction_db.id)
58
        runner.execution = ActionExecution.get(liveaction__id=runner.liveaction_id)
59
        runner.execution_id = str(runner.execution.id)
60
        runner.entry_point = resolved_entry_point
61
        runner.context = getattr(liveaction_db, 'context', dict())
62
        runner.callback = getattr(liveaction_db, 'callback', dict())
63
        runner.libs_dir_path = self._get_action_libs_abs_path(action_db.pack,
64
                                                              action_db.entry_point)
65
66
        return runner
67
68
    def dispatch(self, liveaction_db):
69
        action_db = get_action_by_ref(liveaction_db.action)
70
        if not action_db:
71
            raise Exception('Action %s not found in DB.' % (liveaction_db.action))
72
73
        runnertype_db = get_runnertype_by_name(action_db.runner_type['name'])
74
75
        extra = {'liveaction_db': liveaction_db, 'runnertype_db': runnertype_db}
76
        LOG.info('Dispatching Action to a runner', extra=extra)
77
78
        # Get runner instance.
79
        runner = self._get_runner(runnertype_db, action_db, liveaction_db)
80
        LOG.debug('Runner instance for RunnerType "%s" is: %s', runnertype_db.name, runner)
81
82
        # Process the request.
83
        liveaction_db = (self._do_cancel(runner, runnertype_db, action_db, liveaction_db)
84
                         if liveaction_db.status == action_constants.LIVEACTION_STATUS_CANCELING
85
                         else self._do_run(runner, runnertype_db, action_db, liveaction_db))
86
87
        return liveaction_db.result
88
89
    def _do_run(self, runner, runnertype_db, action_db, liveaction_db):
90
        # Create a temporary auth token which will be available
91
        # for the duration of the action execution.
92
        runner.auth_token = self._create_auth_token(runner.context)
93
94
        updated_liveaction_db = None
95
        try:
96
            # Finalized parameters are resolved and then rendered. This process could
97
            # fail. Handle the exception and report the error correctly.
98
            try:
99
                runner_params, action_params = param_utils.render_final_params(
100
                    runnertype_db.runner_parameters, action_db.parameters, liveaction_db.parameters,
101
                    liveaction_db.context)
102
                runner.runner_parameters = runner_params
103
            except ParamException as e:
104
                raise actionrunner.ActionRunnerException(str(e))
105
106
            LOG.debug('Performing pre-run for runner: %s', runner.runner_id)
107
            runner.pre_run()
108
109
            # Mask secret parameters in the log context
110
            resolved_action_params = ResolvedActionParameters(action_db=action_db,
111
                                                              runner_type_db=runnertype_db,
112
                                                              runner_parameters=runner_params,
113
                                                              action_parameters=action_params)
114
            extra = {'runner': runner, 'parameters': resolved_action_params}
115
            LOG.debug('Performing run for runner: %s' % (runner.runner_id), extra=extra)
0 ignored issues
show
Bug introduced by
The variable extra was used before it was assigned.
Loading history...
116
            (status, result, context) = runner.run(action_params)
117
118
            try:
119
                result = json.loads(result)
120
            except:
121
                pass
122
123
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
124
            if isinstance(runner, AsyncActionRunner) and not action_completed:
0 ignored issues
show
Bug introduced by
The variable action_completed was used before it was assigned.
Loading history...
125
                self._setup_async_query(liveaction_db.id, runnertype_db, context)
126
        except:
127
            LOG.exception('Failed to run action.')
128
            _, ex, tb = sys.exc_info()
129
            # mark execution as failed.
130
            status = action_constants.LIVEACTION_STATUS_FAILED
131
            # include the error message and traceback to try and provide some hints.
132
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
133
            context = None
134
        finally:
135
            # Log action completion
136
            extra = {'result': result, 'status': status}
137
            LOG.debug('Action "%s" completed.' % (action_db.name), extra=extra)
138
139
            # Always clean-up the auth_token
140
            try:
141
                LOG.debug('Setting status: %s for liveaction: %s', status, liveaction_db.id)
142
                updated_liveaction_db = self._update_live_action_db(liveaction_db.id, status,
143
                                                                    result, context)
144
            except:
145
                error = 'Cannot update LiveAction object for id: %s, status: %s, result: %s.' % (
146
                    liveaction_db.id, status, result)
147
                LOG.exception(error)
148
                raise
149
150
            executions.update_execution(updated_liveaction_db)
151
            extra = {'liveaction_db': updated_liveaction_db}
152
            LOG.debug('Updated liveaction after run', extra=extra)
153
154
            # Deletion of the runner generated auth token is delayed until the token expires.
155
            # Async actions such as Mistral workflows uses the auth token to launch other
156
            # actions in the workflow. If the auth token is deleted here, then the actions
157
            # in the workflow will fail with unauthorized exception.
158
            is_async_runner = isinstance(runner, AsyncActionRunner)
159
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
160
161
            if not is_async_runner or (is_async_runner and action_completed):
162
                try:
163
                    self._delete_auth_token(runner.auth_token)
164
                except:
165
                    LOG.exception('Unable to clean-up auth_token.')
166
167
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
168
        runner.post_run(status, result)
169
        runner.container_service = None
170
171
        LOG.debug('Runner do_run result', extra={'result': updated_liveaction_db.result})
172
        LOG.audit('Liveaction completed', extra={'liveaction_db': updated_liveaction_db})
173
174
        return updated_liveaction_db
175
176
    def _do_cancel(self, runner, runnertype_db, action_db, liveaction_db):
177
        try:
178
            extra = {'runner': runner}
179
            LOG.debug('Performing cancel for runner: %s', (runner.runner_id), extra=extra)
180
181
            runner.cancel()
182
183
            liveaction_db = update_liveaction_status(
184
                status=action_constants.LIVEACTION_STATUS_CANCELED,
185
                end_timestamp=date_utils.get_datetime_utc_now(),
186
                liveaction_db=liveaction_db)
187
188
            executions.update_execution(liveaction_db)
189
        except:
190
            _, ex, tb = sys.exc_info()
191
            # include the error message and traceback to try and provide some hints.
192
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
193
            LOG.exception('Failed to cancel action %s.' % (liveaction_db.id), extra=result)
194
195
        return liveaction_db
196
197
    def _update_live_action_db(self, liveaction_id, status, result, context):
198
        """
199
        Update LiveActionDB object for the provided liveaction id.
200
        """
201
        liveaction_db = get_liveaction_by_id(liveaction_id)
202
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
203
            end_timestamp = date_utils.get_datetime_utc_now()
204
        else:
205
            end_timestamp = None
206
207
        liveaction_db = update_liveaction_status(status=status,
208
                                                 result=result,
209
                                                 context=context,
210
                                                 end_timestamp=end_timestamp,
211
                                                 liveaction_db=liveaction_db)
212
        return liveaction_db
213
214
    def _get_entry_point_abs_path(self, pack, entry_point):
215
        return RunnerContainerService.get_entry_point_abs_path(pack=pack,
216
                                                               entry_point=entry_point)
217
218
    def _get_action_libs_abs_path(self, pack, entry_point):
219
        return RunnerContainerService.get_action_libs_abs_path(pack=pack,
220
                                                               entry_point=entry_point)
221
222
    def _create_auth_token(self, context):
223
        if not context:
224
            return None
225
        user = context.get('user', None)
226
        if not user:
227
            return None
228
        return access.create_token(user)
229
230
    def _delete_auth_token(self, auth_token):
231
        if auth_token:
232
            access.delete_token(auth_token.token)
233
234
    def _setup_async_query(self, liveaction_id, runnertype_db, query_context):
235
        query_module = getattr(runnertype_db, 'query_module', None)
236
        if not query_module:
237
            LOG.error('No query module specified for runner %s.', runnertype_db)
238
            return
239
        try:
240
            self._create_execution_state(liveaction_id, runnertype_db, query_context)
241
        except:
242
            LOG.exception('Unable to create action execution state db model ' +
243
                          'for liveaction_id %s', liveaction_id)
244
245
    def _create_execution_state(self, liveaction_id, runnertype_db, query_context):
246
        state_db = ActionExecutionStateDB(
247
            execution_id=liveaction_id,
248
            query_module=runnertype_db.query_module,
249
            query_context=query_context)
250
        try:
251
            return ActionExecutionState.add_or_update(state_db)
252
        except:
253
            LOG.exception('Unable to create execution state db for liveaction_id %s.'
254
                          % liveaction_id)
255
            return None
256
257
258
def get_runner_container():
259
    return RunnerContainer()
260