Passed
Pull Request — master (#3507)
by W
04:36
created

RunnerContainer._do_resume()   C

Complexity

Conditions 7

Size

Total Lines 64

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
dl 0
loc 64
rs 6.0815
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import json
17
import sys
18
import traceback
19
20
from oslo_config import cfg
21
22
from st2common import log as logging
23
from st2common.util import date as date_utils
24
from st2common.constants import action as action_constants
25
from st2common.exceptions import actionrunner
26
from st2common.exceptions.param import ParamException
27
from st2common.models.db.executionstate import ActionExecutionStateDB
28
from st2common.models.system.action import ResolvedActionParameters
29
from st2common.persistence.execution import ActionExecution
30
from st2common.persistence.executionstate import ActionExecutionState
31
from st2common.services import access, executions
32
from st2common.util.action_db import (get_action_by_ref, get_runnertype_by_name)
33
from st2common.util.action_db import (update_liveaction_status, get_liveaction_by_id)
34
from st2common.util import param as param_utils
35
36
from st2actions.container.service import RunnerContainerService
37
from st2common.runners.base import get_runner
38
from st2common.runners.base import AsyncActionRunner
39
40
LOG = logging.getLogger(__name__)
41
42
__all__ = [
43
    'RunnerContainer',
44
    'get_runner_container'
45
]
46
47
48
class RunnerContainer(object):
49
50
    def dispatch(self, liveaction_db):
51
        action_db = get_action_by_ref(liveaction_db.action)
52
        if not action_db:
53
            raise Exception('Action %s not found in DB.' % (liveaction_db.action))
54
55
        liveaction_db.context['pack'] = action_db.pack
56
57
        runnertype_db = get_runnertype_by_name(action_db.runner_type['name'])
58
59
        extra = {'liveaction_db': liveaction_db, 'runnertype_db': runnertype_db}
60
        LOG.info('Dispatching Action to a runner', extra=extra)
61
62
        # Get runner instance.
63
        runner = self._get_runner(runnertype_db, action_db, liveaction_db)
64
65
        LOG.debug('Runner instance for RunnerType "%s" is: %s', runnertype_db.name, runner)
66
67
        # Process the request.
68
        funcs = {
69
            action_constants.LIVEACTION_STATUS_REQUESTED: self._do_run,
70
            action_constants.LIVEACTION_STATUS_SCHEDULED: self._do_run,
71
            action_constants.LIVEACTION_STATUS_RUNNING: self._do_run,
72
            action_constants.LIVEACTION_STATUS_CANCELING: self._do_cancel,
73
            action_constants.LIVEACTION_STATUS_PAUSING: self._do_pause,
74
            action_constants.LIVEACTION_STATUS_RESUMING: self._do_resume
75
        }
76
77
        liveaction_db = funcs[liveaction_db.status](
78
            runner=runner,
79
            runnertype_db=runnertype_db,
80
            action_db=action_db,
81
            liveaction_db=liveaction_db
82
        )
83
84
        return liveaction_db.result
85
86
    def _do_run(self, runner, runnertype_db, action_db, liveaction_db):
87
        # Create a temporary auth token which will be available
88
        # for the duration of the action execution.
89
        runner.auth_token = self._create_auth_token(context=runner.context, action_db=action_db,
90
                                                    liveaction_db=liveaction_db)
91
92
        updated_liveaction_db = None
93
        try:
94
            # Finalized parameters are resolved and then rendered. This process could
95
            # fail. Handle the exception and report the error correctly.
96
            try:
97
                runner_params, action_params = param_utils.render_final_params(
98
                    runnertype_db.runner_parameters, action_db.parameters, liveaction_db.parameters,
99
                    liveaction_db.context)
100
                runner.runner_parameters = runner_params
101
            except ParamException as e:
102
                raise actionrunner.ActionRunnerException(str(e))
103
104
            LOG.debug('Performing pre-run for runner: %s', runner.runner_id)
105
            runner.pre_run()
106
107
            # Mask secret parameters in the log context
108
            resolved_action_params = ResolvedActionParameters(action_db=action_db,
109
                                                              runner_type_db=runnertype_db,
110
                                                              runner_parameters=runner_params,
111
                                                              action_parameters=action_params)
112
            extra = {'runner': runner, 'parameters': resolved_action_params}
113
            LOG.debug('Performing run for runner: %s' % (runner.runner_id), extra=extra)
0 ignored issues
show
Bug introduced by
The variable extra was used before it was assigned.
Loading history...
114
            (status, result, context) = runner.run(action_params)
115
116
            try:
117
                result = json.loads(result)
118
            except:
119
                pass
120
121
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
122
            if isinstance(runner, AsyncActionRunner) and not action_completed:
123
                self._setup_async_query(liveaction_db.id, runnertype_db, context)
124
        except:
125
            LOG.exception('Failed to run action.')
126
            _, ex, tb = sys.exc_info()
127
            # mark execution as failed.
128
            status = action_constants.LIVEACTION_STATUS_FAILED
129
            # include the error message and traceback to try and provide some hints.
130
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
131
            context = None
132
        finally:
133
            # Log action completion
134
            extra = {'result': result, 'status': status}
135
            LOG.debug('Action "%s" completed.' % (action_db.name), extra=extra)
136
137
            try:
138
                LOG.debug('Setting status: %s for liveaction: %s', status, liveaction_db.id)
139
                updated_liveaction_db = self._update_live_action_db(liveaction_db.id, status,
140
                                                                    result, context)
141
            except Exception as e:
142
                msg = 'Cannot update LiveAction object for id: %s, status: %s, result: %s.' % (
143
                    liveaction_db.id, status, result)
144
                LOG.exception(msg)
145
                raise e
146
147
            try:
148
                executions.update_execution(updated_liveaction_db)
149
                extra = {'liveaction_db': updated_liveaction_db}
150
                LOG.debug('Updated liveaction after run', extra=extra)
151
            except Exception as e:
152
                msg = 'Cannot update ActionExecution object for id: %s, status: %s, result: %s.' % (
153
                    updated_liveaction_db.id, status, result)
154
                LOG.exception(msg)
155
                raise e
156
157
            # Always clean-up the auth_token
158
            # Note: self._clean_up_auth_token should never throw to ensure post_run is always
159
            # called.
160
            self._clean_up_auth_token(runner=runner, status=status)
161
162
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
163
        runner.post_run(status=status, result=result)
164
        runner.container_service = None
165
166
        LOG.debug('Runner do_run result', extra={'result': updated_liveaction_db.result})
167
        LOG.audit('Liveaction completed', extra={'liveaction_db': updated_liveaction_db})
168
169
        return updated_liveaction_db
170
171
    def _do_cancel(self, runner, runnertype_db, action_db, liveaction_db):
172
        try:
173
            extra = {'runner': runner}
174
            LOG.debug('Performing cancel for runner: %s', (runner.runner_id), extra=extra)
175
176
            runner.cancel()
177
178
            liveaction_db = update_liveaction_status(
179
                status=action_constants.LIVEACTION_STATUS_CANCELED,
180
                end_timestamp=date_utils.get_datetime_utc_now(),
181
                liveaction_db=liveaction_db)
182
183
            executions.update_execution(liveaction_db)
184
185
            LOG.debug('Performing post_run for runner: %s', runner.runner_id)
186
            result = {'error': 'Execution canceled by user.'}
187
            runner.post_run(status=liveaction_db.status, result=result)
188
            runner.container_service = None
189
        except:
190
            _, ex, tb = sys.exc_info()
191
            # include the error message and traceback to try and provide some hints.
192
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
193
            LOG.exception('Failed to cancel action %s.' % (liveaction_db.id), extra=result)
194
        finally:
195
            # Always clean-up the auth_token
196
            status = liveaction_db.status
197
            self._clean_up_auth_token(runner=runner, status=status)
198
199
        return liveaction_db
200
201
    def _do_pause(self, runner, runnertype_db, action_db, liveaction_db):
202
        try:
203
            extra = {'runner': runner}
204
            LOG.debug('Performing pause for runner: %s', (runner.runner_id), extra=extra)
205
206
            runner.pause()
207
208
            liveaction_db = get_liveaction_by_id(liveaction_db.id)
209
210
            runner.container_service = None
211
        except:
212
            _, ex, tb = sys.exc_info()
213
            # include the error message and traceback to try and provide some hints.
214
            status = action_constants.LIVEACTION_STATUS_FAILED
215
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
216
            LOG.exception('Failed to pause action %s.' % (liveaction_db.id), extra=result)
217
218
            try:
219
                updated_liveaction_db = self._update_live_action_db(
220
                    liveaction_db.id, status, result, liveaction_db.context)
0 ignored issues
show
Bug introduced by
The variable status was used before it was assigned.
Loading history...
221
            except Exception as e:
222
                msg = 'Cannot update LiveAction object for id: %s, status: %s, result: %s.' % (
223
                    liveaction_db.id, status, result)
224
                LOG.exception(msg)
225
                raise e
226
227
            try:
228
                executions.update_execution(updated_liveaction_db)
229
                extra = {'liveaction_db': updated_liveaction_db}
230
                LOG.debug('Updated liveaction after run', extra=extra)
231
            except Exception as e:
232
                msg = 'Cannot update ActionExecution object for id: %s, status: %s, result: %s.' % (
233
                    updated_liveaction_db.id, status, result)
234
                LOG.exception(msg)
235
                raise e
236
        finally:
237
            # Always clean-up the auth_token
238
            status = liveaction_db.status
239
            self._clean_up_auth_token(runner=runner, status=status)
240
241
        return liveaction_db
242
243
    def _do_resume(self, runner, runnertype_db, action_db, liveaction_db):
244
        updated_liveaction_db = None
245
246
        try:
247
            extra = {'runner': runner}
248
            LOG.debug('Performing resume for runner: %s', (runner.runner_id), extra=extra)
0 ignored issues
show
Bug introduced by
The variable extra was used before it was assigned.
Loading history...
249
250
            (status, result, context) = runner.resume()
251
252
            try:
253
                result = json.loads(result)
254
            except:
255
                pass
256
257
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
258
259
            if isinstance(runner, AsyncActionRunner) and not action_completed:
260
                self._setup_async_query(liveaction_db.id, runnertype_db, context)
261
        except:
262
            LOG.exception('Failed to run action.')
263
            _, ex, tb = sys.exc_info()
264
            # mark execution as failed.
265
            status = action_constants.LIVEACTION_STATUS_FAILED
266
            # include the error message and traceback to try and provide some hints.
267
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
268
            context = None
269
        finally:
270
            # Log action completion
271
            extra = {'result': result, 'status': status}
272
            LOG.debug('Action "%s" completed.' % (action_db.name), extra=extra)
273
274
            try:
275
                LOG.debug('Setting status: %s for liveaction: %s', status, liveaction_db.id)
276
                updated_liveaction_db = self._update_live_action_db(liveaction_db.id, status,
277
                                                                    result, context)
278
            except Exception as e:
279
                msg = 'Cannot update LiveAction object for id: %s, status: %s, result: %s.' % (
280
                    liveaction_db.id, status, result)
281
                LOG.exception(msg)
282
                raise e
283
284
            try:
285
                executions.update_execution(updated_liveaction_db)
286
                extra = {'liveaction_db': updated_liveaction_db}
287
                LOG.debug('Updated liveaction after run', extra=extra)
288
            except Exception as e:
289
                msg = 'Cannot update ActionExecution object for id: %s, status: %s, result: %s.' % (
290
                    updated_liveaction_db.id, status, result)
291
                LOG.exception(msg)
292
                raise e
293
294
            # Always clean-up the auth_token
295
            # Note: self._clean_up_auth_token should never throw to ensure post_run is always
296
            # called.
297
            self._clean_up_auth_token(runner=runner, status=status)
298
299
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
300
        runner.post_run(status=status, result=result)
301
        runner.container_service = None
302
303
        LOG.debug('Runner do_run result', extra={'result': updated_liveaction_db.result})
304
        LOG.audit('Liveaction completed', extra={'liveaction_db': updated_liveaction_db})
305
306
        return updated_liveaction_db
307
308
    def _clean_up_auth_token(self, runner, status):
309
        """
310
        Clean up the temporary auth token for the current action.
311
312
        Note: This method should never throw since it's called inside finally block which assumes
313
        it doesn't throw.
314
        """
315
        # Deletion of the runner generated auth token is delayed until the token expires.
316
        # Async actions such as Mistral workflows uses the auth token to launch other
317
        # actions in the workflow. If the auth token is deleted here, then the actions
318
        # in the workflow will fail with unauthorized exception.
319
        is_async_runner = isinstance(runner, AsyncActionRunner)
320
        action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
321
322
        if not is_async_runner or (is_async_runner and action_completed):
323
            try:
324
                self._delete_auth_token(runner.auth_token)
325
            except:
326
                LOG.exception('Unable to clean-up auth_token.')
327
328
            return True
329
330
        return False
331
332
    def _update_live_action_db(self, liveaction_id, status, result, context):
333
        """
334
        Update LiveActionDB object for the provided liveaction id.
335
        """
336
        liveaction_db = get_liveaction_by_id(liveaction_id)
337
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
338
            end_timestamp = date_utils.get_datetime_utc_now()
339
        else:
340
            end_timestamp = None
341
342
        liveaction_db = update_liveaction_status(status=status,
343
                                                 result=result,
344
                                                 context=context,
345
                                                 end_timestamp=end_timestamp,
346
                                                 liveaction_db=liveaction_db)
347
        return liveaction_db
348
349
    def _get_entry_point_abs_path(self, pack, entry_point):
350
        return RunnerContainerService.get_entry_point_abs_path(pack=pack,
351
                                                               entry_point=entry_point)
352
353
    def _get_action_libs_abs_path(self, pack, entry_point):
354
        return RunnerContainerService.get_action_libs_abs_path(pack=pack,
355
                                                               entry_point=entry_point)
356
357
    def _get_rerun_reference(self, context):
358
        execution_id = context.get('re-run', {}).get('ref')
359
        return ActionExecution.get_by_id(execution_id) if execution_id else None
360
361
    def _get_runner(self, runnertype_db, action_db, liveaction_db):
362
        runner = get_runner(runnertype_db.runner_module)
363
364
        resolved_entry_point = self._get_entry_point_abs_path(action_db.pack,
365
                                                              action_db.entry_point)
366
367
        runner.runner_type_db = runnertype_db
368
        runner.container_service = RunnerContainerService()
369
        runner.action = action_db
370
        runner.action_name = action_db.name
371
        runner.liveaction = liveaction_db
372
        runner.liveaction_id = str(liveaction_db.id)
373
        runner.execution = ActionExecution.get(liveaction__id=runner.liveaction_id)
374
        runner.execution_id = str(runner.execution.id)
375
        runner.entry_point = resolved_entry_point
376
        runner.context = getattr(liveaction_db, 'context', dict())
377
        runner.callback = getattr(liveaction_db, 'callback', dict())
378
        runner.libs_dir_path = self._get_action_libs_abs_path(action_db.pack,
379
                                                              action_db.entry_point)
380
381
        # For re-run, get the ActionExecutionDB in which the re-run is based on.
382
        rerun_ref_id = runner.context.get('re-run', {}).get('ref')
383
        runner.rerun_ex_ref = ActionExecution.get(id=rerun_ref_id) if rerun_ref_id else None
384
385
        return runner
386
387
    def _create_auth_token(self, context, action_db, liveaction_db):
388
        if not context:
389
            return None
390
391
        user = context.get('user', None)
392
        if not user:
393
            return None
394
395
        metadata = {
396
            'service': 'actions_container',
397
            'action_name': action_db.name,
398
            'live_action_id': str(liveaction_db.id)
399
400
        }
401
402
        ttl = cfg.CONF.auth.service_token_ttl
403
        token_db = access.create_token(username=user, ttl=ttl, metadata=metadata, service=True)
404
        return token_db
405
406
    def _delete_auth_token(self, auth_token):
407
        if auth_token:
408
            access.delete_token(auth_token.token)
409
410
    def _setup_async_query(self, liveaction_id, runnertype_db, query_context):
411
        query_module = getattr(runnertype_db, 'query_module', None)
412
        if not query_module:
413
            LOG.error('No query module specified for runner %s.', runnertype_db)
414
            return
415
        try:
416
            self._create_execution_state(liveaction_id, runnertype_db, query_context)
417
        except:
418
            LOG.exception('Unable to create action execution state db model ' +
419
                          'for liveaction_id %s', liveaction_id)
420
421
    def _create_execution_state(self, liveaction_id, runnertype_db, query_context):
422
        state_db = ActionExecutionStateDB(
423
            execution_id=liveaction_id,
424
            query_module=runnertype_db.query_module,
425
            query_context=query_context)
426
        try:
427
            return ActionExecutionState.add_or_update(state_db)
428
        except:
429
            LOG.exception('Unable to create execution state db for liveaction_id %s.'
430
                          % liveaction_id)
431
            return None
432
433
434
def get_runner_container():
435
    return RunnerContainer()
436