Passed
Pull Request — master (#4000)
by W
05:39
created

RunnerContainer._do_run()   C

Complexity

Conditions 7

Size

Total Lines 65

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
dl 0
loc 65
rs 6.0105
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import json
18
import sys
19
import traceback
20
21
from oslo_config import cfg
22
23
from st2common import log as logging
24
from st2common.util import date as date_utils
25
from st2common.constants import action as action_constants
26
from st2common.content import utils as content_utils
27
from st2common.exceptions import actionrunner
28
from st2common.exceptions.param import ParamException
29
from st2common.models.system.action import ResolvedActionParameters
30
from st2common.persistence.execution import ActionExecution
31
from st2common.services import access, executions, queries
32
from st2common.util.action_db import (get_action_by_ref, get_runnertype_by_name)
33
from st2common.util.action_db import (update_liveaction_status, get_liveaction_by_id)
34
from st2common.util import param as param_utils
35
from st2common.util.config_loader import ContentPackConfigLoader
36
37
from st2common.runners.base import get_runner
38
from st2common.runners.base import AsyncActionRunner, PollingAsyncActionRunner
39
40
LOG = logging.getLogger(__name__)
41
42
__all__ = [
43
    'RunnerContainer',
44
    'get_runner_container'
45
]
46
47
48
class RunnerContainer(object):
49
50
    def dispatch(self, liveaction_db):
51
        action_db = get_action_by_ref(liveaction_db.action)
52
        if not action_db:
53
            raise Exception('Action %s not found in DB.' % (liveaction_db.action))
54
55
        liveaction_db.context['pack'] = action_db.pack
56
57
        runnertype_db = get_runnertype_by_name(action_db.runner_type['name'])
58
59
        extra = {'liveaction_db': liveaction_db, 'runnertype_db': runnertype_db}
60
        LOG.info('Dispatching Action to a runner', extra=extra)
61
62
        # Get runner instance.
63
        runner = self._get_runner(runnertype_db, action_db, liveaction_db)
64
65
        LOG.debug('Runner instance for RunnerType "%s" is: %s', runnertype_db.name, runner)
66
67
        # Process the request.
68
        funcs = {
69
            action_constants.LIVEACTION_STATUS_REQUESTED: self._do_run,
70
            action_constants.LIVEACTION_STATUS_SCHEDULED: self._do_run,
71
            action_constants.LIVEACTION_STATUS_RUNNING: self._do_run,
72
            action_constants.LIVEACTION_STATUS_CANCELING: self._do_cancel,
73
            action_constants.LIVEACTION_STATUS_PAUSING: self._do_pause,
74
            action_constants.LIVEACTION_STATUS_RESUMING: self._do_resume
75
        }
76
77
        if liveaction_db.status not in funcs:
78
            raise actionrunner.ActionRunnerDispatchError(
79
                'Action runner is unable to dispatch the liveaction because it is '
80
                'in an unsupported status of "%s".' % liveaction_db.status
81
            )
82
83
        liveaction_db = funcs[liveaction_db.status](
84
            runner=runner,
85
            runnertype_db=runnertype_db,
86
            action_db=action_db,
87
            liveaction_db=liveaction_db
88
        )
89
90
        return liveaction_db.result
91
92
    def _do_run(self, runner, runnertype_db, action_db, liveaction_db):
93
        # Create a temporary auth token which will be available
94
        # for the duration of the action execution.
95
        runner.auth_token = self._create_auth_token(context=runner.context, action_db=action_db,
96
                                                    liveaction_db=liveaction_db)
97
98
        try:
99
            # Finalized parameters are resolved and then rendered. This process could
100
            # fail. Handle the exception and report the error correctly.
101
            try:
102
                runner_params, action_params = param_utils.render_final_params(
103
                    runnertype_db.runner_parameters, action_db.parameters, liveaction_db.parameters,
104
                    liveaction_db.context)
105
                runner.runner_parameters = runner_params
106
            except ParamException as e:
107
                raise actionrunner.ActionRunnerException(str(e))
108
109
            LOG.debug('Performing pre-run for runner: %s', runner.runner_id)
110
            runner.pre_run()
111
112
            # Mask secret parameters in the log context
113
            resolved_action_params = ResolvedActionParameters(action_db=action_db,
114
                                                              runner_type_db=runnertype_db,
115
                                                              runner_parameters=runner_params,
116
                                                              action_parameters=action_params)
117
            extra = {'runner': runner, 'parameters': resolved_action_params}
118
            LOG.debug('Performing run for runner: %s' % (runner.runner_id), extra=extra)
0 ignored issues
show
Bug introduced by
The variable extra was used before it was assigned.
Loading history...
119
            (status, result, context) = runner.run(action_params)
120
121
            try:
122
                result = json.loads(result)
123
            except:
124
                pass
125
126
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
127
            if (isinstance(runner, PollingAsyncActionRunner) and
128
                    runner.is_polling_enabled() and not action_completed):
129
                queries.setup_query(liveaction_db.id, runnertype_db, context)
130
        except:
131
            LOG.exception('Failed to run action.')
132
            _, ex, tb = sys.exc_info()
133
            # mark execution as failed.
134
            status = action_constants.LIVEACTION_STATUS_FAILED
135
            # include the error message and traceback to try and provide some hints.
136
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
137
            context = None
138
        finally:
139
            # Log action completion
140
            extra = {'result': result, 'status': status}
141
            LOG.debug('Action "%s" completed.' % (action_db.name), extra=extra)
142
143
            # Update the final status of liveaction and corresponding action execution.
144
            liveaction_db = self._update_status(liveaction_db.id, status, result, context)
145
146
            # Always clean-up the auth_token
147
            # This method should be called in the finally block to ensure post_run is not impacted.
148
            self._clean_up_auth_token(runner=runner, status=status)
149
150
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
151
        runner.post_run(status=status, result=result)
152
153
        LOG.debug('Runner do_run result', extra={'result': liveaction_db.result})
154
        LOG.audit('Liveaction completed', extra={'liveaction_db': liveaction_db})
155
156
        return liveaction_db
157
158
    def _do_cancel(self, runner, runnertype_db, action_db, liveaction_db):
159
        try:
160
            extra = {'runner': runner}
161
            LOG.debug('Performing cancel for runner: %s', (runner.runner_id), extra=extra)
162
            (status, result, context) = runner.cancel()
163
164
            # Update the final status of liveaction and corresponding action execution.
165
            # The status is updated here because we want to keep the workflow running
166
            # as is if the cancel operation failed.
167
            liveaction_db = self._update_status(liveaction_db.id, status, result, context)
168
        except:
169
            _, ex, tb = sys.exc_info()
170
            # include the error message and traceback to try and provide some hints.
171
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
172
            LOG.exception('Failed to cancel action %s.' % (liveaction_db.id), extra=result)
173
        finally:
174
            # Always clean-up the auth_token
175
            # This method should be called in the finally block to ensure post_run is not impacted.
176
            self._clean_up_auth_token(runner=runner, status=liveaction_db.status)
177
178
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
179
        result = {'error': 'Execution canceled by user.'}
180
        runner.post_run(status=liveaction_db.status, result=result)
181
182
        return liveaction_db
183
184
    def _do_pause(self, runner, runnertype_db, action_db, liveaction_db):
185
        try:
186
            extra = {'runner': runner}
187
            LOG.debug('Performing pause for runner: %s', (runner.runner_id), extra=extra)
188
            (status, result, context) = runner.pause()
189
        except:
190
            _, ex, tb = sys.exc_info()
191
            # include the error message and traceback to try and provide some hints.
192
            status = action_constants.LIVEACTION_STATUS_FAILED
193
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
194
            context = liveaction_db.context
195
            LOG.exception('Failed to pause action %s.' % (liveaction_db.id), extra=result)
196
        finally:
197
            # Update the final status of liveaction and corresponding action execution.
198
            liveaction_db = self._update_status(liveaction_db.id, status, result, context)
199
200
            # Always clean-up the auth_token
201
            self._clean_up_auth_token(runner=runner, status=liveaction_db.status)
202
203
        return liveaction_db
204
205
    def _do_resume(self, runner, runnertype_db, action_db, liveaction_db):
206
        try:
207
            extra = {'runner': runner}
208
            LOG.debug('Performing resume for runner: %s', (runner.runner_id), extra=extra)
209
210
            (status, result, context) = runner.resume()
211
212
            try:
213
                result = json.loads(result)
214
            except:
215
                pass
216
217
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
218
219
            if (isinstance(runner, PollingAsyncActionRunner) and
220
                    runner.is_polling_enabled() and not action_completed):
221
                queries.setup_query(liveaction_db.id, runnertype_db, context)
222
        except:
223
            _, ex, tb = sys.exc_info()
224
            # include the error message and traceback to try and provide some hints.
225
            status = action_constants.LIVEACTION_STATUS_FAILED
226
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
227
            context = liveaction_db.context
228
            LOG.exception('Failed to resume action %s.' % (liveaction_db.id), extra=result)
229
        finally:
230
            # Update the final status of liveaction and corresponding action execution.
231
            liveaction_db = self._update_status(liveaction_db.id, status, result, context)
232
233
            # Always clean-up the auth_token
234
            # This method should be called in the finally block to ensure post_run is not impacted.
235
            self._clean_up_auth_token(runner=runner, status=liveaction_db.status)
236
237
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
238
        runner.post_run(status=status, result=result)
239
240
        LOG.debug('Runner do_run result', extra={'result': liveaction_db.result})
241
        LOG.audit('Liveaction completed', extra={'liveaction_db': liveaction_db})
242
243
        return liveaction_db
244
245
    def _clean_up_auth_token(self, runner, status):
246
        """
247
        Clean up the temporary auth token for the current action.
248
249
        Note: This method should never throw since it's called inside finally block which assumes
250
        it doesn't throw.
251
        """
252
        # Deletion of the runner generated auth token is delayed until the token expires.
253
        # Async actions such as Mistral workflows uses the auth token to launch other
254
        # actions in the workflow. If the auth token is deleted here, then the actions
255
        # in the workflow will fail with unauthorized exception.
256
        is_async_runner = isinstance(runner, AsyncActionRunner)
257
        action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
258
259
        if not is_async_runner or (is_async_runner and action_completed):
260
            try:
261
                self._delete_auth_token(runner.auth_token)
262
            except:
263
                LOG.exception('Unable to clean-up auth_token.')
264
265
            return True
266
267
        return False
268
269
    def _update_live_action_db(self, liveaction_id, status, result, context):
270
        """
271
        Update LiveActionDB object for the provided liveaction id.
272
        """
273
        liveaction_db = get_liveaction_by_id(liveaction_id)
274
275
        state_changed = (liveaction_db.status != status)
276
277
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
278
            end_timestamp = date_utils.get_datetime_utc_now()
279
        else:
280
            end_timestamp = None
281
282
        liveaction_db = update_liveaction_status(status=status,
283
                                                 result=result,
284
                                                 context=context,
285
                                                 end_timestamp=end_timestamp,
286
                                                 liveaction_db=liveaction_db)
287
        return (liveaction_db, state_changed)
288
289
    def _update_status(self, liveaction_id, status, result, context):
290
        try:
291
            LOG.debug('Setting status: %s for liveaction: %s', status, liveaction_id)
292
            liveaction_db, state_changed = self._update_live_action_db(
293
                liveaction_id, status, result, context)
294
        except Exception as e:
295
            LOG.exception(
296
                'Cannot update liveaction '
297
                '(id: %s, status: %s, result: %s).' % (
298
                    liveaction_id, status, result)
299
            )
300
            raise e
301
302
        try:
303
            executions.update_execution(liveaction_db, publish=state_changed)
304
            extra = {'liveaction_db': liveaction_db}
305
            LOG.debug('Updated liveaction after run', extra=extra)
306
        except Exception as e:
307
            LOG.exception(
308
                'Cannot update action execution for liveaction '
309
                '(id: %s, status: %s, result: %s).' % (
310
                    liveaction_id, status, result)
311
            )
312
            raise e
313
314
        return liveaction_db
315
316
    def _get_entry_point_abs_path(self, pack, entry_point):
317
        return content_utils.get_entry_point_abs_path(pack=pack, entry_point=entry_point)
318
319
    def _get_action_libs_abs_path(self, pack, entry_point):
320
        return content_utils.get_action_libs_abs_path(pack=pack, entry_point=entry_point)
321
322
    def _get_rerun_reference(self, context):
323
        execution_id = context.get('re-run', {}).get('ref')
324
        return ActionExecution.get_by_id(execution_id) if execution_id else None
325
326
    def _get_runner(self, runnertype_db, action_db, liveaction_db):
327
        resolved_entry_point = self._get_entry_point_abs_path(action_db.pack,
328
                                                              action_db.entry_point)
329
        context = getattr(liveaction_db, 'context', dict())
330
        user = context.get('user', cfg.CONF.system_user.user)
331
332
        # Note: Right now configs are only supported by the Python runner actions
333
        if runnertype_db.runner_module == 'python_runner':
334
            LOG.debug('Loading config for pack')
335
336
            config_loader = ContentPackConfigLoader(pack_name=action_db.pack, user=user)
337
            config = config_loader.get_config()
338
        else:
339
            config = None
340
341
        runner = get_runner(package_name=runnertype_db.runner_package,
342
                            module_name=runnertype_db.runner_module,
343
                            config=config)
344
345
        # TODO: Pass those arguments to the constructor instead of late
346
        # assignment, late assignment is awful
347
        runner.runner_type_db = runnertype_db
348
        runner.action = action_db
349
        runner.action_name = action_db.name
350
        runner.liveaction = liveaction_db
351
        runner.liveaction_id = str(liveaction_db.id)
352
        runner.execution = ActionExecution.get(liveaction__id=runner.liveaction_id)
353
        runner.execution_id = str(runner.execution.id)
354
        runner.entry_point = resolved_entry_point
355
        runner.context = context
356
        runner.callback = getattr(liveaction_db, 'callback', dict())
357
        runner.libs_dir_path = self._get_action_libs_abs_path(action_db.pack,
358
                                                              action_db.entry_point)
359
360
        # For re-run, get the ActionExecutionDB in which the re-run is based on.
361
        rerun_ref_id = runner.context.get('re-run', {}).get('ref')
362
        runner.rerun_ex_ref = ActionExecution.get(id=rerun_ref_id) if rerun_ref_id else None
363
364
        return runner
365
366
    def _create_auth_token(self, context, action_db, liveaction_db):
367
        if not context:
368
            return None
369
370
        user = context.get('user', None)
371
        if not user:
372
            return None
373
374
        metadata = {
375
            'service': 'actions_container',
376
            'action_name': action_db.name,
377
            'live_action_id': str(liveaction_db.id)
378
379
        }
380
381
        ttl = cfg.CONF.auth.service_token_ttl
382
        token_db = access.create_token(username=user, ttl=ttl, metadata=metadata, service=True)
383
        return token_db
384
385
    def _delete_auth_token(self, auth_token):
386
        if auth_token:
387
            access.delete_token(auth_token.token)
388
389
390
def get_runner_container():
391
    return RunnerContainer()
392