Passed
Pull Request — master (#3507)
by W
04:31
created

RunnerContainer._do_resume()   C

Complexity

Conditions 7

Size

Total Lines 64

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
dl 0
loc 64
rs 6.0815
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import json
17
import sys
18
import traceback
19
20
from st2common import log as logging
21
from st2common.util import date as date_utils
22
from st2common.constants import action as action_constants
23
from st2common.exceptions import actionrunner
24
from st2common.exceptions.param import ParamException
25
from st2common.models.db.executionstate import ActionExecutionStateDB
26
from st2common.models.system.action import ResolvedActionParameters
27
from st2common.persistence.execution import ActionExecution
28
from st2common.persistence.executionstate import ActionExecutionState
29
from st2common.services import access, executions
30
from st2common.util.action_db import (get_action_by_ref, get_runnertype_by_name)
31
from st2common.util.action_db import (update_liveaction_status, get_liveaction_by_id)
32
from st2common.util import param as param_utils
33
34
from st2actions.container.service import RunnerContainerService
35
from st2common.runners.base import get_runner
36
from st2common.runners.base import AsyncActionRunner
37
38
LOG = logging.getLogger(__name__)
39
40
__all__ = [
41
    'RunnerContainer',
42
    'get_runner_container'
43
]
44
45
46
class RunnerContainer(object):
47
48
    def dispatch(self, liveaction_db):
49
        action_db = get_action_by_ref(liveaction_db.action)
50
        if not action_db:
51
            raise Exception('Action %s not found in DB.' % (liveaction_db.action))
52
53
        runnertype_db = get_runnertype_by_name(action_db.runner_type['name'])
54
55
        extra = {'liveaction_db': liveaction_db, 'runnertype_db': runnertype_db}
56
        LOG.info('Dispatching Action to a runner', extra=extra)
57
58
        # Get runner instance.
59
        runner = self._get_runner(runnertype_db, action_db, liveaction_db)
60
        LOG.debug('Runner instance for RunnerType "%s" is: %s', runnertype_db.name, runner)
61
62
        # Process the request.
63
        funcs = {
64
            action_constants.LIVEACTION_STATUS_REQUESTED: self._do_run,
65
            action_constants.LIVEACTION_STATUS_SCHEDULED: self._do_run,
66
            action_constants.LIVEACTION_STATUS_RUNNING: self._do_run,
67
            action_constants.LIVEACTION_STATUS_CANCELING: self._do_cancel,
68
            action_constants.LIVEACTION_STATUS_PAUSING: self._do_pause,
69
            action_constants.LIVEACTION_STATUS_RESUMING: self._do_resume
70
        }
71
72
        liveaction_db = funcs[liveaction_db.status](
73
            runner=runner,
74
            runnertype_db=runnertype_db,
75
            action_db=action_db,
76
            liveaction_db=liveaction_db
77
        )
78
79
        return liveaction_db.result
80
81
    def _do_run(self, runner, runnertype_db, action_db, liveaction_db):
82
        # Create a temporary auth token which will be available
83
        # for the duration of the action execution.
84
        runner.auth_token = self._create_auth_token(context=runner.context, action_db=action_db,
85
                                                    liveaction_db=liveaction_db)
86
87
        updated_liveaction_db = None
88
        try:
89
            # Finalized parameters are resolved and then rendered. This process could
90
            # fail. Handle the exception and report the error correctly.
91
            try:
92
                runner_params, action_params = param_utils.render_final_params(
93
                    runnertype_db.runner_parameters, action_db.parameters, liveaction_db.parameters,
94
                    liveaction_db.context)
95
                runner.runner_parameters = runner_params
96
            except ParamException as e:
97
                raise actionrunner.ActionRunnerException(str(e))
98
99
            LOG.debug('Performing pre-run for runner: %s', runner.runner_id)
100
            runner.pre_run()
101
102
            # Mask secret parameters in the log context
103
            resolved_action_params = ResolvedActionParameters(action_db=action_db,
104
                                                              runner_type_db=runnertype_db,
105
                                                              runner_parameters=runner_params,
106
                                                              action_parameters=action_params)
107
            extra = {'runner': runner, 'parameters': resolved_action_params}
108
            LOG.debug('Performing run for runner: %s' % (runner.runner_id), extra=extra)
0 ignored issues
show
Bug introduced by
The variable extra was used before it was assigned.
Loading history...
109
            (status, result, context) = runner.run(action_params)
110
111
            try:
112
                result = json.loads(result)
113
            except:
114
                pass
115
116
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
117
            if isinstance(runner, AsyncActionRunner) and not action_completed:
118
                self._setup_async_query(liveaction_db.id, runnertype_db, context)
119
        except:
120
            LOG.exception('Failed to run action.')
121
            _, ex, tb = sys.exc_info()
122
            # mark execution as failed.
123
            status = action_constants.LIVEACTION_STATUS_FAILED
124
            # include the error message and traceback to try and provide some hints.
125
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
126
            context = None
127
        finally:
128
            # Log action completion
129
            extra = {'result': result, 'status': status}
130
            LOG.debug('Action "%s" completed.' % (action_db.name), extra=extra)
131
132
            try:
133
                LOG.debug('Setting status: %s for liveaction: %s', status, liveaction_db.id)
134
                updated_liveaction_db = self._update_live_action_db(liveaction_db.id, status,
135
                                                                    result, context)
136
            except Exception as e:
137
                msg = 'Cannot update LiveAction object for id: %s, status: %s, result: %s.' % (
138
                    liveaction_db.id, status, result)
139
                LOG.exception(msg)
140
                raise e
141
142
            try:
143
                executions.update_execution(updated_liveaction_db)
144
                extra = {'liveaction_db': updated_liveaction_db}
145
                LOG.debug('Updated liveaction after run', extra=extra)
146
            except Exception as e:
147
                msg = 'Cannot update ActionExecution object for id: %s, status: %s, result: %s.' % (
148
                    updated_liveaction_db.id, status, result)
149
                LOG.exception(msg)
150
                raise e
151
152
            # Always clean-up the auth_token
153
            # Note: self._clean_up_auth_token should never throw to ensure post_run is always
154
            # called.
155
            self._clean_up_auth_token(runner=runner, status=status)
156
157
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
158
        runner.post_run(status=status, result=result)
159
        runner.container_service = None
160
161
        LOG.debug('Runner do_run result', extra={'result': updated_liveaction_db.result})
162
        LOG.audit('Liveaction completed', extra={'liveaction_db': updated_liveaction_db})
163
164
        return updated_liveaction_db
165
166
    def _do_cancel(self, runner, runnertype_db, action_db, liveaction_db):
167
        try:
168
            extra = {'runner': runner}
169
            LOG.debug('Performing cancel for runner: %s', (runner.runner_id), extra=extra)
170
171
            runner.cancel()
172
173
            liveaction_db = update_liveaction_status(
174
                status=action_constants.LIVEACTION_STATUS_CANCELED,
175
                end_timestamp=date_utils.get_datetime_utc_now(),
176
                liveaction_db=liveaction_db)
177
178
            executions.update_execution(liveaction_db)
179
180
            LOG.debug('Performing post_run for runner: %s', runner.runner_id)
181
            result = {'error': 'Execution canceled by user.'}
182
            runner.post_run(status=liveaction_db.status, result=result)
183
            runner.container_service = None
184
        except:
185
            _, ex, tb = sys.exc_info()
186
            # include the error message and traceback to try and provide some hints.
187
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
188
            LOG.exception('Failed to cancel action %s.' % (liveaction_db.id), extra=result)
189
        finally:
190
            # Always clean-up the auth_token
191
            status = liveaction_db.status
192
            self._clean_up_auth_token(runner=runner, status=status)
193
194
        return liveaction_db
195
196
    def _do_pause(self, runner, runnertype_db, action_db, liveaction_db):
197
        try:
198
            extra = {'runner': runner}
199
            LOG.debug('Performing pause for runner: %s', (runner.runner_id), extra=extra)
200
201
            runner.pause()
202
203
            liveaction_db = get_liveaction_by_id(liveaction_db.id)
204
205
            runner.container_service = None
206
        except:
207
            _, ex, tb = sys.exc_info()
208
            # include the error message and traceback to try and provide some hints.
209
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
210
            LOG.exception('Failed to pause action %s.' % (liveaction_db.id), extra=result)
211
        finally:
212
            # Always clean-up the auth_token
213
            status = liveaction_db.status
214
            self._clean_up_auth_token(runner=runner, status=status)
215
216
        return liveaction_db
217
218
    def _do_resume(self, runner, runnertype_db, action_db, liveaction_db):
219
        updated_liveaction_db = None
220
221
        try:
222
            extra = {'runner': runner}
223
            LOG.debug('Performing resume for runner: %s', (runner.runner_id), extra=extra)
0 ignored issues
show
Bug introduced by
The variable extra was used before it was assigned.
Loading history...
224
225
            (status, result, context) = runner.resume()
226
227
            try:
228
                result = json.loads(result)
229
            except:
230
                pass
231
232
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
233
234
            if isinstance(runner, AsyncActionRunner) and not action_completed:
235
                self._setup_async_query(liveaction_db.id, runnertype_db, context)
236
        except:
237
            LOG.exception('Failed to run action.')
238
            _, ex, tb = sys.exc_info()
239
            # mark execution as failed.
240
            status = action_constants.LIVEACTION_STATUS_FAILED
241
            # include the error message and traceback to try and provide some hints.
242
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
243
            context = None
244
        finally:
245
            # Log action completion
246
            extra = {'result': result, 'status': status}
247
            LOG.debug('Action "%s" completed.' % (action_db.name), extra=extra)
248
249
            try:
250
                LOG.debug('Setting status: %s for liveaction: %s', status, liveaction_db.id)
251
                updated_liveaction_db = self._update_live_action_db(liveaction_db.id, status,
252
                                                                    result, context)
253
            except Exception as e:
254
                msg = 'Cannot update LiveAction object for id: %s, status: %s, result: %s.' % (
255
                    liveaction_db.id, status, result)
256
                LOG.exception(msg)
257
                raise e
258
259
            try:
260
                executions.update_execution(updated_liveaction_db)
261
                extra = {'liveaction_db': updated_liveaction_db}
262
                LOG.debug('Updated liveaction after run', extra=extra)
263
            except Exception as e:
264
                msg = 'Cannot update ActionExecution object for id: %s, status: %s, result: %s.' % (
265
                    updated_liveaction_db.id, status, result)
266
                LOG.exception(msg)
267
                raise e
268
269
            # Always clean-up the auth_token
270
            # Note: self._clean_up_auth_token should never throw to ensure post_run is always
271
            # called.
272
            self._clean_up_auth_token(runner=runner, status=status)
273
274
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
275
        runner.post_run(status=status, result=result)
276
        runner.container_service = None
277
278
        LOG.debug('Runner do_run result', extra={'result': updated_liveaction_db.result})
279
        LOG.audit('Liveaction completed', extra={'liveaction_db': updated_liveaction_db})
280
281
        return updated_liveaction_db
282
283
    def _clean_up_auth_token(self, runner, status):
284
        """
285
        Clean up the temporary auth token for the current action.
286
287
        Note: This method should never throw since it's called inside finally block which assumes
288
        it doesn't throw.
289
        """
290
        # Deletion of the runner generated auth token is delayed until the token expires.
291
        # Async actions such as Mistral workflows uses the auth token to launch other
292
        # actions in the workflow. If the auth token is deleted here, then the actions
293
        # in the workflow will fail with unauthorized exception.
294
        is_async_runner = isinstance(runner, AsyncActionRunner)
295
        action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
296
297
        if not is_async_runner or (is_async_runner and action_completed):
298
            try:
299
                self._delete_auth_token(runner.auth_token)
300
            except:
301
                LOG.exception('Unable to clean-up auth_token.')
302
303
            return True
304
305
        return False
306
307
    def _update_live_action_db(self, liveaction_id, status, result, context):
308
        """
309
        Update LiveActionDB object for the provided liveaction id.
310
        """
311
        liveaction_db = get_liveaction_by_id(liveaction_id)
312
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
313
            end_timestamp = date_utils.get_datetime_utc_now()
314
        else:
315
            end_timestamp = None
316
317
        liveaction_db = update_liveaction_status(status=status,
318
                                                 result=result,
319
                                                 context=context,
320
                                                 end_timestamp=end_timestamp,
321
                                                 liveaction_db=liveaction_db)
322
        return liveaction_db
323
324
    def _get_entry_point_abs_path(self, pack, entry_point):
325
        return RunnerContainerService.get_entry_point_abs_path(pack=pack,
326
                                                               entry_point=entry_point)
327
328
    def _get_action_libs_abs_path(self, pack, entry_point):
329
        return RunnerContainerService.get_action_libs_abs_path(pack=pack,
330
                                                               entry_point=entry_point)
331
332
    def _get_rerun_reference(self, context):
333
        execution_id = context.get('re-run', {}).get('ref')
334
        return ActionExecution.get_by_id(execution_id) if execution_id else None
335
336
    def _get_runner(self, runnertype_db, action_db, liveaction_db):
337
        runner = get_runner(runnertype_db.runner_module)
338
339
        resolved_entry_point = self._get_entry_point_abs_path(action_db.pack,
340
                                                              action_db.entry_point)
341
342
        runner.runner_type_db = runnertype_db
343
        runner.container_service = RunnerContainerService()
344
        runner.action = action_db
345
        runner.action_name = action_db.name
346
        runner.liveaction = liveaction_db
347
        runner.liveaction_id = str(liveaction_db.id)
348
        runner.execution = ActionExecution.get(liveaction__id=runner.liveaction_id)
349
        runner.execution_id = str(runner.execution.id)
350
        runner.entry_point = resolved_entry_point
351
        runner.context = getattr(liveaction_db, 'context', dict())
352
        runner.callback = getattr(liveaction_db, 'callback', dict())
353
        runner.libs_dir_path = self._get_action_libs_abs_path(action_db.pack,
354
                                                              action_db.entry_point)
355
356
        # For re-run, get the ActionExecutionDB in which the re-run is based on.
357
        rerun_ref_id = runner.context.get('re-run', {}).get('ref')
358
        runner.rerun_ex_ref = ActionExecution.get(id=rerun_ref_id) if rerun_ref_id else None
359
360
        return runner
361
362
    def _create_auth_token(self, context, action_db, liveaction_db):
363
        if not context:
364
            return None
365
366
        user = context.get('user', None)
367
        if not user:
368
            return None
369
370
        metadata = {
371
            'service': 'actions_container',
372
            'action_name': action_db.name,
373
            'live_action_id': str(liveaction_db.id)
374
375
        }
376
        token_db = access.create_token(username=user, metadata=metadata, service=True)
377
        return token_db
378
379
    def _delete_auth_token(self, auth_token):
380
        if auth_token:
381
            access.delete_token(auth_token.token)
382
383
    def _setup_async_query(self, liveaction_id, runnertype_db, query_context):
384
        query_module = getattr(runnertype_db, 'query_module', None)
385
        if not query_module:
386
            LOG.error('No query module specified for runner %s.', runnertype_db)
387
            return
388
        try:
389
            self._create_execution_state(liveaction_id, runnertype_db, query_context)
390
        except:
391
            LOG.exception('Unable to create action execution state db model ' +
392
                          'for liveaction_id %s', liveaction_id)
393
394
    def _create_execution_state(self, liveaction_id, runnertype_db, query_context):
395
        state_db = ActionExecutionStateDB(
396
            execution_id=liveaction_id,
397
            query_module=runnertype_db.query_module,
398
            query_context=query_context)
399
        try:
400
            return ActionExecutionState.add_or_update(state_db)
401
        except:
402
            LOG.exception('Unable to create execution state db for liveaction_id %s.'
403
                          % liveaction_id)
404
            return None
405
406
407
def get_runner_container():
408
    return RunnerContainer()
409