Passed
Pull Request — master (#3640)
by Lakshmi
06:19
created

RunnerContainer._do_run()   B

Complexity

Conditions 6

Size

Total Lines 65

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
dl 0
loc 65
rs 7.6697
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import json
17
import sys
18
import traceback
19
20
from oslo_config import cfg
21
22
from st2common import log as logging
23
from st2common.util import date as date_utils
24
from st2common.constants import action as action_constants
25
from st2common.exceptions import actionrunner
26
from st2common.exceptions.param import ParamException
27
from st2common.models.db.executionstate import ActionExecutionStateDB
28
from st2common.models.system.action import ResolvedActionParameters
29
from st2common.persistence.execution import ActionExecution
30
from st2common.persistence.executionstate import ActionExecutionState
31
from st2common.services import access, executions
32
from st2common.util.action_db import (get_action_by_ref, get_runnertype_by_name)
33
from st2common.util.action_db import (update_liveaction_status, get_liveaction_by_id)
34
from st2common.util import param as param_utils
35
36
from st2actions.container.service import RunnerContainerService
37
from st2common.runners.base import get_runner
38
from st2common.runners.base import AsyncActionRunner
39
40
LOG = logging.getLogger(__name__)
41
42
__all__ = [
43
    'RunnerContainer',
44
    'get_runner_container'
45
]
46
47
48
class RunnerContainer(object):
49
50
    def dispatch(self, liveaction_db):
51
        action_db = get_action_by_ref(liveaction_db.action)
52
        if not action_db:
53
            raise Exception('Action %s not found in DB.' % (liveaction_db.action))
54
55
        liveaction_db.context['pack'] = action_db.pack
56
57
        runnertype_db = get_runnertype_by_name(action_db.runner_type['name'])
58
59
        extra = {'liveaction_db': liveaction_db, 'runnertype_db': runnertype_db}
60
        LOG.info('Dispatching Action to a runner', extra=extra)
61
62
        # Get runner instance.
63
        runner = self._get_runner(runnertype_db, action_db, liveaction_db)
64
65
        LOG.debug('Runner instance for RunnerType "%s" is: %s', runnertype_db.name, runner)
66
67
        # Process the request.
68
        funcs = {
69
            action_constants.LIVEACTION_STATUS_REQUESTED: self._do_run,
70
            action_constants.LIVEACTION_STATUS_SCHEDULED: self._do_run,
71
            action_constants.LIVEACTION_STATUS_RUNNING: self._do_run,
72
            action_constants.LIVEACTION_STATUS_CANCELING: self._do_cancel,
73
            action_constants.LIVEACTION_STATUS_PAUSING: self._do_pause,
74
            action_constants.LIVEACTION_STATUS_RESUMING: self._do_resume
75
        }
76
77
        if liveaction_db.status not in funcs:
78
            raise actionrunner.ActionRunnerDispatchError(
79
                'Action runner is unable to dispatch the liveaction because it is '
80
                'in an unsupported status of "%s".' % liveaction_db.status
81
            )
82
83
        liveaction_db = funcs[liveaction_db.status](
84
            runner=runner,
85
            runnertype_db=runnertype_db,
86
            action_db=action_db,
87
            liveaction_db=liveaction_db
88
        )
89
90
        return liveaction_db.result
91
92
    def _do_run(self, runner, runnertype_db, action_db, liveaction_db):
93
        # Create a temporary auth token which will be available
94
        # for the duration of the action execution.
95
        runner.auth_token = self._create_auth_token(context=runner.context, action_db=action_db,
96
                                                    liveaction_db=liveaction_db)
97
98
        try:
99
            # Finalized parameters are resolved and then rendered. This process could
100
            # fail. Handle the exception and report the error correctly.
101
            try:
102
                runner_params, action_params = param_utils.render_final_params(
103
                    runnertype_db.runner_parameters, action_db.parameters, liveaction_db.parameters,
104
                    liveaction_db.context)
105
                runner.runner_parameters = runner_params
106
            except ParamException as e:
107
                raise actionrunner.ActionRunnerException(str(e))
108
109
            LOG.debug('Performing pre-run for runner: %s', runner.runner_id)
110
            runner.pre_run()
111
112
            # Mask secret parameters in the log context
113
            resolved_action_params = ResolvedActionParameters(action_db=action_db,
114
                                                              runner_type_db=runnertype_db,
115
                                                              runner_parameters=runner_params,
116
                                                              action_parameters=action_params)
117
            extra = {'runner': runner, 'parameters': resolved_action_params}
118
            LOG.debug('Performing run for runner: %s' % (runner.runner_id), extra=extra)
0 ignored issues
show
Bug introduced by
The variable extra was used before it was assigned.
Loading history...
119
            (status, result, context) = runner.run(action_params)
120
121
            try:
122
                result = json.loads(result)
123
            except:
124
                pass
125
126
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
127
            if isinstance(runner, AsyncActionRunner) and not action_completed:
128
                self._setup_async_query(liveaction_db.id, runnertype_db, context)
129
        except:
130
            LOG.exception('Failed to run action.')
131
            _, ex, tb = sys.exc_info()
132
            # mark execution as failed.
133
            status = action_constants.LIVEACTION_STATUS_FAILED
134
            # include the error message and traceback to try and provide some hints.
135
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
136
            context = None
137
        finally:
138
            # Log action completion
139
            extra = {'result': result, 'status': status}
140
            LOG.debug('Action "%s" completed.' % (action_db.name), extra=extra)
141
142
            # Update the final status of liveaction and corresponding action execution.
143
            liveaction_db = self._update_status(liveaction_db.id, status, result, context)
144
145
            # Always clean-up the auth_token
146
            # This method should be called in the finally block to ensure post_run is not impacted.
147
            self._clean_up_auth_token(runner=runner, status=status)
148
149
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
150
        runner.post_run(status=status, result=result)
151
        runner.container_service = None
152
153
        LOG.debug('Runner do_run result', extra={'result': liveaction_db.result})
154
        LOG.audit('Liveaction completed', extra={'liveaction_db': liveaction_db})
155
156
        return liveaction_db
157
158
    def _do_cancel(self, runner, runnertype_db, action_db, liveaction_db):
159
        try:
160
            extra = {'runner': runner}
161
            LOG.debug('Performing cancel for runner: %s', (runner.runner_id), extra=extra)
162
            (status, result, context) = runner.cancel()
163
164
            # Update the final status of liveaction and corresponding action execution.
165
            # The status is updated here because we want to keep the workflow running
166
            # as is if the cancel operation failed.
167
            liveaction_db = self._update_status(liveaction_db.id, status, result, context)
168
        except:
169
            _, ex, tb = sys.exc_info()
170
            # include the error message and traceback to try and provide some hints.
171
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
172
            LOG.exception('Failed to cancel action %s.' % (liveaction_db.id), extra=result)
173
        finally:
174
            # Always clean-up the auth_token
175
            # This method should be called in the finally block to ensure post_run is not impacted.
176
            self._clean_up_auth_token(runner=runner, status=liveaction_db.status)
177
178
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
179
        result = {'error': 'Execution canceled by user.'}
180
        runner.post_run(status=liveaction_db.status, result=result)
181
        runner.container_service = None
182
183
        return liveaction_db
184
185
    def _do_pause(self, runner, runnertype_db, action_db, liveaction_db):
186
        try:
187
            extra = {'runner': runner}
188
            LOG.debug('Performing pause for runner: %s', (runner.runner_id), extra=extra)
189
            (status, result, context) = runner.pause()
190
        except:
191
            _, ex, tb = sys.exc_info()
192
            # include the error message and traceback to try and provide some hints.
193
            status = action_constants.LIVEACTION_STATUS_FAILED
194
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
195
            context = liveaction_db.context
196
            LOG.exception('Failed to pause action %s.' % (liveaction_db.id), extra=result)
197
        finally:
198
            # Update the final status of liveaction and corresponding action execution.
199
            liveaction_db = self._update_status(liveaction_db.id, status, result, context)
200
201
            # Always clean-up the auth_token
202
            self._clean_up_auth_token(runner=runner, status=liveaction_db.status)
203
204
        runner.container_service = None
205
206
        return liveaction_db
207
208
    def _do_resume(self, runner, runnertype_db, action_db, liveaction_db):
209
        try:
210
            extra = {'runner': runner}
211
            LOG.debug('Performing resume for runner: %s', (runner.runner_id), extra=extra)
212
213
            (status, result, context) = runner.resume()
214
215
            try:
216
                result = json.loads(result)
217
            except:
218
                pass
219
220
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
221
222
            if isinstance(runner, AsyncActionRunner) and not action_completed:
223
                self._setup_async_query(liveaction_db.id, runnertype_db, context)
224
        except:
225
            _, ex, tb = sys.exc_info()
226
            # include the error message and traceback to try and provide some hints.
227
            status = action_constants.LIVEACTION_STATUS_FAILED
228
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
229
            context = liveaction_db.context
230
            LOG.exception('Failed to resume action %s.' % (liveaction_db.id), extra=result)
231
        finally:
232
            # Update the final status of liveaction and corresponding action execution.
233
            liveaction_db = self._update_status(liveaction_db.id, status, result, context)
234
235
            # Always clean-up the auth_token
236
            # This method should be called in the finally block to ensure post_run is not impacted.
237
            self._clean_up_auth_token(runner=runner, status=liveaction_db.status)
238
239
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
240
        runner.post_run(status=status, result=result)
241
        runner.container_service = None
242
243
        LOG.debug('Runner do_run result', extra={'result': liveaction_db.result})
244
        LOG.audit('Liveaction completed', extra={'liveaction_db': liveaction_db})
245
246
        return liveaction_db
247
248
    def _clean_up_auth_token(self, runner, status):
249
        """
250
        Clean up the temporary auth token for the current action.
251
252
        Note: This method should never throw since it's called inside finally block which assumes
253
        it doesn't throw.
254
        """
255
        # Deletion of the runner generated auth token is delayed until the token expires.
256
        # Async actions such as Mistral workflows uses the auth token to launch other
257
        # actions in the workflow. If the auth token is deleted here, then the actions
258
        # in the workflow will fail with unauthorized exception.
259
        is_async_runner = isinstance(runner, AsyncActionRunner)
260
        action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
261
262
        if not is_async_runner or (is_async_runner and action_completed):
263
            try:
264
                self._delete_auth_token(runner.auth_token)
265
            except:
266
                LOG.exception('Unable to clean-up auth_token.')
267
268
            return True
269
270
        return False
271
272
    def _update_live_action_db(self, liveaction_id, status, result, context):
273
        """
274
        Update LiveActionDB object for the provided liveaction id.
275
        """
276
        liveaction_db = get_liveaction_by_id(liveaction_id)
277
278
        state_changed = (liveaction_db.status != status)
279
280
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
281
            end_timestamp = date_utils.get_datetime_utc_now()
282
        else:
283
            end_timestamp = None
284
285
        liveaction_db = update_liveaction_status(status=status,
286
                                                 result=result,
287
                                                 context=context,
288
                                                 end_timestamp=end_timestamp,
289
                                                 liveaction_db=liveaction_db)
290
        return (liveaction_db, state_changed)
291
292
    def _update_status(self, liveaction_id, status, result, context):
293
        try:
294
            LOG.debug('Setting status: %s for liveaction: %s', status, liveaction_id)
295
            liveaction_db, state_changed = self._update_live_action_db(
296
                liveaction_id, status, result, context)
297
        except Exception as e:
298
            LOG.exception(
299
                'Cannot update liveaction '
300
                '(id: %s, status: %s, result: %s).' % (
301
                    liveaction_id, status, result)
302
            )
303
            raise e
304
305
        try:
306
            executions.update_execution(liveaction_db, publish=state_changed)
307
            extra = {'liveaction_db': liveaction_db}
308
            LOG.debug('Updated liveaction after run', extra=extra)
309
        except Exception as e:
310
            LOG.exception(
311
                'Cannot update action execution for liveaction '
312
                '(id: %s, status: %s, result: %s).' % (
313
                    liveaction_id, status, result)
314
            )
315
            raise e
316
317
        return liveaction_db
318
319
    def _get_entry_point_abs_path(self, pack, entry_point):
320
        return RunnerContainerService.get_entry_point_abs_path(pack=pack,
321
                                                               entry_point=entry_point)
322
323
    def _get_action_libs_abs_path(self, pack, entry_point):
324
        return RunnerContainerService.get_action_libs_abs_path(pack=pack,
325
                                                               entry_point=entry_point)
326
327
    def _get_rerun_reference(self, context):
328
        execution_id = context.get('re-run', {}).get('ref')
329
        return ActionExecution.get_by_id(execution_id) if execution_id else None
330
331
    def _get_runner(self, runnertype_db, action_db, liveaction_db):
332
        runner = get_runner(runnertype_db.runner_module)
333
334
        resolved_entry_point = self._get_entry_point_abs_path(action_db.pack,
335
                                                              action_db.entry_point)
336
337
        runner.runner_type_db = runnertype_db
338
        runner.container_service = RunnerContainerService()
339
        runner.action = action_db
340
        runner.action_name = action_db.name
341
        runner.liveaction = liveaction_db
342
        runner.liveaction_id = str(liveaction_db.id)
343
        runner.execution = ActionExecution.get(liveaction__id=runner.liveaction_id)
344
        runner.execution_id = str(runner.execution.id)
345
        runner.entry_point = resolved_entry_point
346
        runner.context = getattr(liveaction_db, 'context', dict())
347
        runner.callback = getattr(liveaction_db, 'callback', dict())
348
        runner.libs_dir_path = self._get_action_libs_abs_path(action_db.pack,
349
                                                              action_db.entry_point)
350
351
        # For re-run, get the ActionExecutionDB in which the re-run is based on.
352
        rerun_ref_id = runner.context.get('re-run', {}).get('ref')
353
        runner.rerun_ex_ref = ActionExecution.get(id=rerun_ref_id) if rerun_ref_id else None
354
355
        return runner
356
357
    def _create_auth_token(self, context, action_db, liveaction_db):
358
        if not context:
359
            return None
360
361
        user = context.get('user', None)
362
        if not user:
363
            return None
364
365
        metadata = {
366
            'service': 'actions_container',
367
            'action_name': action_db.name,
368
            'live_action_id': str(liveaction_db.id)
369
370
        }
371
372
        ttl = cfg.CONF.auth.service_token_ttl
373
        token_db = access.create_token(username=user, ttl=ttl, metadata=metadata, service=True)
374
        return token_db
375
376
    def _delete_auth_token(self, auth_token):
377
        if auth_token:
378
            access.delete_token(auth_token.token)
379
380
    def _setup_async_query(self, liveaction_id, runnertype_db, query_context):
381
        query_module = getattr(runnertype_db, 'query_module', None)
382
        if not query_module:
383
            LOG.error('No query module specified for runner %s.', runnertype_db)
384
            return
385
        try:
386
            self._create_execution_state(liveaction_id, runnertype_db, query_context)
387
        except:
388
            LOG.exception('Unable to create action execution state db model ' +
389
                          'for liveaction_id %s', liveaction_id)
390
391
    def _create_execution_state(self, liveaction_id, runnertype_db, query_context):
392
        state_db = ActionExecutionStateDB(
393
            execution_id=liveaction_id,
394
            query_module=runnertype_db.query_module,
395
            query_context=query_context)
396
        try:
397
            return ActionExecutionState.add_or_update(state_db)
398
        except:
399
            LOG.exception('Unable to create execution state db for liveaction_id %s.'
400
                          % liveaction_id)
401
            return None
402
403
404
def get_runner_container():
405
    return RunnerContainer()
406