Passed
Pull Request — master (#3507)
by W
07:37 queued 01:55
created

RunnerContainer._do_resume()   C

Complexity

Conditions 7

Size

Total Lines 64

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
dl 0
loc 64
rs 6.0815
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import json
17
import sys
18
import traceback
19
20
from oslo_config import cfg
21
22
from st2common import log as logging
23
from st2common.util import date as date_utils
24
from st2common.constants import action as action_constants
25
from st2common.exceptions import actionrunner
26
from st2common.exceptions.param import ParamException
27
from st2common.models.db.executionstate import ActionExecutionStateDB
28
from st2common.models.system.action import ResolvedActionParameters
29
from st2common.persistence.execution import ActionExecution
30
from st2common.persistence.executionstate import ActionExecutionState
31
from st2common.services import access, executions
32
from st2common.util.action_db import (get_action_by_ref, get_runnertype_by_name)
33
from st2common.util.action_db import (update_liveaction_status, get_liveaction_by_id)
34
from st2common.util import param as param_utils
35
36
from st2actions.container.service import RunnerContainerService
37
from st2common.runners.base import get_runner
38
from st2common.runners.base import AsyncActionRunner
39
40
LOG = logging.getLogger(__name__)
41
42
__all__ = [
43
    'RunnerContainer',
44
    'get_runner_container'
45
]
46
47
48
class RunnerContainer(object):
49
50
    def dispatch(self, liveaction_db):
51
        action_db = get_action_by_ref(liveaction_db.action)
52
        if not action_db:
53
            raise Exception('Action %s not found in DB.' % (liveaction_db.action))
54
55
        runnertype_db = get_runnertype_by_name(action_db.runner_type['name'])
56
57
        extra = {'liveaction_db': liveaction_db, 'runnertype_db': runnertype_db}
58
        LOG.info('Dispatching Action to a runner', extra=extra)
59
60
        # Get runner instance.
61
        runner = self._get_runner(runnertype_db, action_db, liveaction_db)
62
        LOG.debug('Runner instance for RunnerType "%s" is: %s', runnertype_db.name, runner)
63
64
        # Process the request.
65
        funcs = {
66
            action_constants.LIVEACTION_STATUS_REQUESTED: self._do_run,
67
            action_constants.LIVEACTION_STATUS_SCHEDULED: self._do_run,
68
            action_constants.LIVEACTION_STATUS_RUNNING: self._do_run,
69
            action_constants.LIVEACTION_STATUS_CANCELING: self._do_cancel,
70
            action_constants.LIVEACTION_STATUS_PAUSING: self._do_pause,
71
            action_constants.LIVEACTION_STATUS_RESUMING: self._do_resume
72
        }
73
74
        liveaction_db = funcs[liveaction_db.status](
75
            runner=runner,
76
            runnertype_db=runnertype_db,
77
            action_db=action_db,
78
            liveaction_db=liveaction_db
79
        )
80
81
        return liveaction_db.result
82
83
    def _do_run(self, runner, runnertype_db, action_db, liveaction_db):
84
        # Create a temporary auth token which will be available
85
        # for the duration of the action execution.
86
        runner.auth_token = self._create_auth_token(context=runner.context, action_db=action_db,
87
                                                    liveaction_db=liveaction_db)
88
89
        updated_liveaction_db = None
90
        try:
91
            # Finalized parameters are resolved and then rendered. This process could
92
            # fail. Handle the exception and report the error correctly.
93
            try:
94
                runner_params, action_params = param_utils.render_final_params(
95
                    runnertype_db.runner_parameters, action_db.parameters, liveaction_db.parameters,
96
                    liveaction_db.context)
97
                runner.runner_parameters = runner_params
98
            except ParamException as e:
99
                raise actionrunner.ActionRunnerException(str(e))
100
101
            LOG.debug('Performing pre-run for runner: %s', runner.runner_id)
102
            runner.pre_run()
103
104
            # Mask secret parameters in the log context
105
            resolved_action_params = ResolvedActionParameters(action_db=action_db,
106
                                                              runner_type_db=runnertype_db,
107
                                                              runner_parameters=runner_params,
108
                                                              action_parameters=action_params)
109
            extra = {'runner': runner, 'parameters': resolved_action_params}
110
            LOG.debug('Performing run for runner: %s' % (runner.runner_id), extra=extra)
0 ignored issues
show
Bug introduced by
The variable extra was used before it was assigned.
Loading history...
111
            (status, result, context) = runner.run(action_params)
112
113
            try:
114
                result = json.loads(result)
115
            except:
116
                pass
117
118
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
119
            if isinstance(runner, AsyncActionRunner) and not action_completed:
120
                self._setup_async_query(liveaction_db.id, runnertype_db, context)
121
        except:
122
            LOG.exception('Failed to run action.')
123
            _, ex, tb = sys.exc_info()
124
            # mark execution as failed.
125
            status = action_constants.LIVEACTION_STATUS_FAILED
126
            # include the error message and traceback to try and provide some hints.
127
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
128
            context = None
129
        finally:
130
            # Log action completion
131
            extra = {'result': result, 'status': status}
132
            LOG.debug('Action "%s" completed.' % (action_db.name), extra=extra)
133
134
            try:
135
                LOG.debug('Setting status: %s for liveaction: %s', status, liveaction_db.id)
136
                updated_liveaction_db = self._update_live_action_db(liveaction_db.id, status,
137
                                                                    result, context)
138
            except Exception as e:
139
                msg = 'Cannot update LiveAction object for id: %s, status: %s, result: %s.' % (
140
                    liveaction_db.id, status, result)
141
                LOG.exception(msg)
142
                raise e
143
144
            try:
145
                executions.update_execution(updated_liveaction_db)
146
                extra = {'liveaction_db': updated_liveaction_db}
147
                LOG.debug('Updated liveaction after run', extra=extra)
148
            except Exception as e:
149
                msg = 'Cannot update ActionExecution object for id: %s, status: %s, result: %s.' % (
150
                    updated_liveaction_db.id, status, result)
151
                LOG.exception(msg)
152
                raise e
153
154
            # Always clean-up the auth_token
155
            # Note: self._clean_up_auth_token should never throw to ensure post_run is always
156
            # called.
157
            self._clean_up_auth_token(runner=runner, status=status)
158
159
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
160
        runner.post_run(status=status, result=result)
161
        runner.container_service = None
162
163
        LOG.debug('Runner do_run result', extra={'result': updated_liveaction_db.result})
164
        LOG.audit('Liveaction completed', extra={'liveaction_db': updated_liveaction_db})
165
166
        return updated_liveaction_db
167
168
    def _do_cancel(self, runner, runnertype_db, action_db, liveaction_db):
169
        try:
170
            extra = {'runner': runner}
171
            LOG.debug('Performing cancel for runner: %s', (runner.runner_id), extra=extra)
172
173
            runner.cancel()
174
175
            liveaction_db = update_liveaction_status(
176
                status=action_constants.LIVEACTION_STATUS_CANCELED,
177
                end_timestamp=date_utils.get_datetime_utc_now(),
178
                liveaction_db=liveaction_db)
179
180
            executions.update_execution(liveaction_db)
181
182
            LOG.debug('Performing post_run for runner: %s', runner.runner_id)
183
            result = {'error': 'Execution canceled by user.'}
184
            runner.post_run(status=liveaction_db.status, result=result)
185
            runner.container_service = None
186
        except:
187
            _, ex, tb = sys.exc_info()
188
            # include the error message and traceback to try and provide some hints.
189
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
190
            LOG.exception('Failed to cancel action %s.' % (liveaction_db.id), extra=result)
191
        finally:
192
            # Always clean-up the auth_token
193
            status = liveaction_db.status
194
            self._clean_up_auth_token(runner=runner, status=status)
195
196
        return liveaction_db
197
198
    def _do_pause(self, runner, runnertype_db, action_db, liveaction_db):
199
        try:
200
            extra = {'runner': runner}
201
            LOG.debug('Performing pause for runner: %s', (runner.runner_id), extra=extra)
202
203
            runner.pause()
204
205
            liveaction_db = get_liveaction_by_id(liveaction_db.id)
206
207
            runner.container_service = None
208
        except:
209
            _, ex, tb = sys.exc_info()
210
            # include the error message and traceback to try and provide some hints.
211
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
212
            LOG.exception('Failed to pause action %s.' % (liveaction_db.id), extra=result)
213
        finally:
214
            # Always clean-up the auth_token
215
            status = liveaction_db.status
216
            self._clean_up_auth_token(runner=runner, status=status)
217
218
        return liveaction_db
219
220
    def _do_resume(self, runner, runnertype_db, action_db, liveaction_db):
221
        updated_liveaction_db = None
222
223
        try:
224
            extra = {'runner': runner}
225
            LOG.debug('Performing resume for runner: %s', (runner.runner_id), extra=extra)
0 ignored issues
show
Bug introduced by
The variable extra was used before it was assigned.
Loading history...
226
227
            (status, result, context) = runner.resume()
228
229
            try:
230
                result = json.loads(result)
231
            except:
232
                pass
233
234
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
235
236
            if isinstance(runner, AsyncActionRunner) and not action_completed:
237
                self._setup_async_query(liveaction_db.id, runnertype_db, context)
238
        except:
239
            LOG.exception('Failed to run action.')
240
            _, ex, tb = sys.exc_info()
241
            # mark execution as failed.
242
            status = action_constants.LIVEACTION_STATUS_FAILED
243
            # include the error message and traceback to try and provide some hints.
244
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
245
            context = None
246
        finally:
247
            # Log action completion
248
            extra = {'result': result, 'status': status}
249
            LOG.debug('Action "%s" completed.' % (action_db.name), extra=extra)
250
251
            try:
252
                LOG.debug('Setting status: %s for liveaction: %s', status, liveaction_db.id)
253
                updated_liveaction_db = self._update_live_action_db(liveaction_db.id, status,
254
                                                                    result, context)
255
            except Exception as e:
256
                msg = 'Cannot update LiveAction object for id: %s, status: %s, result: %s.' % (
257
                    liveaction_db.id, status, result)
258
                LOG.exception(msg)
259
                raise e
260
261
            try:
262
                executions.update_execution(updated_liveaction_db)
263
                extra = {'liveaction_db': updated_liveaction_db}
264
                LOG.debug('Updated liveaction after run', extra=extra)
265
            except Exception as e:
266
                msg = 'Cannot update ActionExecution object for id: %s, status: %s, result: %s.' % (
267
                    updated_liveaction_db.id, status, result)
268
                LOG.exception(msg)
269
                raise e
270
271
            # Always clean-up the auth_token
272
            # Note: self._clean_up_auth_token should never throw to ensure post_run is always
273
            # called.
274
            self._clean_up_auth_token(runner=runner, status=status)
275
276
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
277
        runner.post_run(status=status, result=result)
278
        runner.container_service = None
279
280
        LOG.debug('Runner do_run result', extra={'result': updated_liveaction_db.result})
281
        LOG.audit('Liveaction completed', extra={'liveaction_db': updated_liveaction_db})
282
283
        return updated_liveaction_db
284
285
    def _clean_up_auth_token(self, runner, status):
286
        """
287
        Clean up the temporary auth token for the current action.
288
289
        Note: This method should never throw since it's called inside finally block which assumes
290
        it doesn't throw.
291
        """
292
        # Deletion of the runner generated auth token is delayed until the token expires.
293
        # Async actions such as Mistral workflows uses the auth token to launch other
294
        # actions in the workflow. If the auth token is deleted here, then the actions
295
        # in the workflow will fail with unauthorized exception.
296
        is_async_runner = isinstance(runner, AsyncActionRunner)
297
        action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
298
299
        if not is_async_runner or (is_async_runner and action_completed):
300
            try:
301
                self._delete_auth_token(runner.auth_token)
302
            except:
303
                LOG.exception('Unable to clean-up auth_token.')
304
305
            return True
306
307
        return False
308
309
    def _update_live_action_db(self, liveaction_id, status, result, context):
310
        """
311
        Update LiveActionDB object for the provided liveaction id.
312
        """
313
        liveaction_db = get_liveaction_by_id(liveaction_id)
314
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
315
            end_timestamp = date_utils.get_datetime_utc_now()
316
        else:
317
            end_timestamp = None
318
319
        liveaction_db = update_liveaction_status(status=status,
320
                                                 result=result,
321
                                                 context=context,
322
                                                 end_timestamp=end_timestamp,
323
                                                 liveaction_db=liveaction_db)
324
        return liveaction_db
325
326
    def _get_entry_point_abs_path(self, pack, entry_point):
327
        return RunnerContainerService.get_entry_point_abs_path(pack=pack,
328
                                                               entry_point=entry_point)
329
330
    def _get_action_libs_abs_path(self, pack, entry_point):
331
        return RunnerContainerService.get_action_libs_abs_path(pack=pack,
332
                                                               entry_point=entry_point)
333
334
    def _get_rerun_reference(self, context):
335
        execution_id = context.get('re-run', {}).get('ref')
336
        return ActionExecution.get_by_id(execution_id) if execution_id else None
337
338
    def _get_runner(self, runnertype_db, action_db, liveaction_db):
339
        runner = get_runner(runnertype_db.runner_module)
340
341
        resolved_entry_point = self._get_entry_point_abs_path(action_db.pack,
342
                                                              action_db.entry_point)
343
344
        runner.runner_type_db = runnertype_db
345
        runner.container_service = RunnerContainerService()
346
        runner.action = action_db
347
        runner.action_name = action_db.name
348
        runner.liveaction = liveaction_db
349
        runner.liveaction_id = str(liveaction_db.id)
350
        runner.execution = ActionExecution.get(liveaction__id=runner.liveaction_id)
351
        runner.execution_id = str(runner.execution.id)
352
        runner.entry_point = resolved_entry_point
353
        runner.context = getattr(liveaction_db, 'context', dict())
354
        runner.callback = getattr(liveaction_db, 'callback', dict())
355
        runner.libs_dir_path = self._get_action_libs_abs_path(action_db.pack,
356
                                                              action_db.entry_point)
357
358
        # For re-run, get the ActionExecutionDB in which the re-run is based on.
359
        rerun_ref_id = runner.context.get('re-run', {}).get('ref')
360
        runner.rerun_ex_ref = ActionExecution.get(id=rerun_ref_id) if rerun_ref_id else None
361
362
        return runner
363
364
    def _create_auth_token(self, context, action_db, liveaction_db):
365
        if not context:
366
            return None
367
368
        user = context.get('user', None)
369
        if not user:
370
            return None
371
372
        metadata = {
373
            'service': 'actions_container',
374
            'action_name': action_db.name,
375
            'live_action_id': str(liveaction_db.id)
376
377
        }
378
379
        ttl = cfg.CONF.auth.service_token_ttl
380
        token_db = access.create_token(username=user, ttl=ttl, metadata=metadata, service=True)
381
        return token_db
382
383
    def _delete_auth_token(self, auth_token):
384
        if auth_token:
385
            access.delete_token(auth_token.token)
386
387
    def _setup_async_query(self, liveaction_id, runnertype_db, query_context):
388
        query_module = getattr(runnertype_db, 'query_module', None)
389
        if not query_module:
390
            LOG.error('No query module specified for runner %s.', runnertype_db)
391
            return
392
        try:
393
            self._create_execution_state(liveaction_id, runnertype_db, query_context)
394
        except:
395
            LOG.exception('Unable to create action execution state db model ' +
396
                          'for liveaction_id %s', liveaction_id)
397
398
    def _create_execution_state(self, liveaction_id, runnertype_db, query_context):
399
        state_db = ActionExecutionStateDB(
400
            execution_id=liveaction_id,
401
            query_module=runnertype_db.query_module,
402
            query_context=query_context)
403
        try:
404
            return ActionExecutionState.add_or_update(state_db)
405
        except:
406
            LOG.exception('Unable to create execution state db for liveaction_id %s.'
407
                          % liveaction_id)
408
            return None
409
410
411
def get_runner_container():
412
    return RunnerContainer()
413