Test Failed
Pull Request — master (#4068)
by W
03:56
created

RunnerContainer   B

Complexity

Total Complexity 40

Size/Duplication

Total Lines 330
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 330
rs 8.2608
wmc 40

14 Methods

Rating   Name   Duplication   Size   Complexity  
A _create_auth_token() 0 18 3
A _delete_auth_token() 0 3 2
B _do_cancel() 0 25 2
A _do_pause() 0 20 2
B _do_run() 0 68 6
A _get_rerun_reference() 0 3 2
A _get_action_libs_abs_path() 0 2 1
A _update_live_action_db() 0 19 2
B _get_runner() 0 37 3
B _update_status() 0 26 3
B _clean_up_auth_token() 0 23 5
B dispatch() 0 36 3
B _do_resume() 0 33 5
A _get_entry_point_abs_path() 0 2 1

How to fix   Complexity   

Complex Class

Complex classes like RunnerContainer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import sys
19
import traceback
20
21
from oslo_config import cfg
22
23
from st2common import log as logging
24
from st2common.util import date as date_utils
25
from st2common.constants import action as action_constants
26
from st2common.content import utils as content_utils
27
from st2common.exceptions import actionrunner
28
from st2common.exceptions.param import ParamException
29
from st2common.models.system.action import ResolvedActionParameters
30
from st2common.persistence.execution import ActionExecution
31
from st2common.services import access, executions, queries
32
from st2common.util.action_db import (get_action_by_ref, get_runnertype_by_name)
33
from st2common.util.action_db import (update_liveaction_status, get_liveaction_by_id)
34
from st2common.util import param as param_utils
35
from st2common.util.config_loader import ContentPackConfigLoader
36
from st2common.util import jsonify
37
38
from st2common.runners.base import get_runner
39
from st2common.runners.base import AsyncActionRunner, PollingAsyncActionRunner
40
41
LOG = logging.getLogger(__name__)
42
43
__all__ = [
44
    'RunnerContainer',
45
    'get_runner_container'
46
]
47
48
49
class RunnerContainer(object):
50
51
    def dispatch(self, liveaction_db):
52
        action_db = get_action_by_ref(liveaction_db.action)
53
        if not action_db:
54
            raise Exception('Action %s not found in DB.' % (liveaction_db.action))
55
56
        liveaction_db.context['pack'] = action_db.pack
57
58
        runner_type_db = get_runnertype_by_name(action_db.runner_type['name'])
59
60
        extra = {'liveaction_db': liveaction_db, 'runner_type_db': runner_type_db}
61
        LOG.info('Dispatching Action to a runner', extra=extra)
62
63
        # Get runner instance.
64
        runner = self._get_runner(runner_type_db, action_db, liveaction_db)
65
66
        LOG.debug('Runner instance for RunnerType "%s" is: %s', runner_type_db.name, runner)
67
68
        # Process the request.
69
        funcs = {
70
            action_constants.LIVEACTION_STATUS_REQUESTED: self._do_run,
71
            action_constants.LIVEACTION_STATUS_SCHEDULED: self._do_run,
72
            action_constants.LIVEACTION_STATUS_RUNNING: self._do_run,
73
            action_constants.LIVEACTION_STATUS_CANCELING: self._do_cancel,
74
            action_constants.LIVEACTION_STATUS_PAUSING: self._do_pause,
75
            action_constants.LIVEACTION_STATUS_RESUMING: self._do_resume
76
        }
77
78
        if liveaction_db.status not in funcs:
79
            raise actionrunner.ActionRunnerDispatchError(
80
                'Action runner is unable to dispatch the liveaction because it is '
81
                'in an unsupported status of "%s".' % liveaction_db.status
82
            )
83
84
        liveaction_db = funcs[liveaction_db.status](runner)
85
86
        return liveaction_db.result
87
88
    def _do_run(self, runner):
89
        # Create a temporary auth token which will be available
90
        # for the duration of the action execution.
91
        runner.auth_token = self._create_auth_token(
92
            context=runner.context,
93
            action_db=runner.action,
94
            liveaction_db=runner.liveaction)
95
96
        try:
97
            # Finalized parameters are resolved and then rendered. This process could
98
            # fail. Handle the exception and report the error correctly.
99
            try:
100
                runner_params, action_params = param_utils.render_final_params(
101
                    runner.runner_type.runner_parameters,
102
                    runner.action.parameters,
103
                    runner.liveaction.parameters,
104
                    runner.liveaction.context)
105
106
                runner.runner_parameters = runner_params
107
            except ParamException as e:
108
                raise actionrunner.ActionRunnerException(str(e))
109
110
            LOG.debug('Performing pre-run for runner: %s', runner.runner_id)
111
            runner.pre_run()
112
113
            # Mask secret parameters in the log context
114
            resolved_action_params = ResolvedActionParameters(
115
                action_db=runner.action,
116
                runner_type_db=runner.runner_type,
117
                runner_parameters=runner_params,
118
                action_parameters=action_params)
119
120
            extra = {'runner': runner, 'parameters': resolved_action_params}
121
            LOG.debug('Performing run for runner: %s' % (runner.runner_id), extra=extra)
0 ignored issues
show
Bug introduced by
The variable extra was used before it was assigned.
Loading history...
122
            (status, result, context) = runner.run(action_params)
123
            result = jsonify.try_loads(result)
124
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
125
126
            if (isinstance(runner, PollingAsyncActionRunner) and
127
                    runner.is_polling_enabled() and not action_completed):
128
                queries.setup_query(runner.liveaction.id, runner.runner_type, context)
129
        except:
130
            LOG.exception('Failed to run action.')
131
            _, ex, tb = sys.exc_info()
132
            # mark execution as failed.
133
            status = action_constants.LIVEACTION_STATUS_FAILED
134
            # include the error message and traceback to try and provide some hints.
135
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
136
            context = None
137
        finally:
138
            # Log action completion
139
            extra = {'result': result, 'status': status}
140
            LOG.debug('Action "%s" completed.' % (runner.action.name), extra=extra)
141
142
            # Update the final status of liveaction and corresponding action execution.
143
            runner.liveaction = self._update_status(runner.liveaction.id, status, result, context)
144
145
            # Always clean-up the auth_token
146
            # This method should be called in the finally block to ensure post_run is not impacted.
147
            self._clean_up_auth_token(runner=runner, status=status)
148
149
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
150
        runner.post_run(status=status, result=result)
151
152
        LOG.debug('Runner do_run result', extra={'result': runner.liveaction.result})
153
        LOG.audit('Liveaction completed', extra={'liveaction_db': runner.liveaction})
154
155
        return runner.liveaction
156
157
    def _do_cancel(self, runner):
158
        try:
159
            extra = {'runner': runner}
160
            LOG.debug('Performing cancel for runner: %s', (runner.runner_id), extra=extra)
161
            (status, result, context) = runner.cancel()
162
163
            # Update the final status of liveaction and corresponding action execution.
164
            # The status is updated here because we want to keep the workflow running
165
            # as is if the cancel operation failed.
166
            runner.liveaction = self._update_status(runner.liveaction.id, status, result, context)
167
        except:
168
            _, ex, tb = sys.exc_info()
169
            # include the error message and traceback to try and provide some hints.
170
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
171
            LOG.exception('Failed to cancel action %s.' % (runner.liveaction.id), extra=result)
172
        finally:
173
            # Always clean-up the auth_token
174
            # This method should be called in the finally block to ensure post_run is not impacted.
175
            self._clean_up_auth_token(runner=runner, status=runner.liveaction.status)
176
177
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
178
        result = {'error': 'Execution canceled by user.'}
179
        runner.post_run(status=runner.liveaction.status, result=result)
180
181
        return runner.liveaction
182
183
    def _do_pause(self, runner):
184
        try:
185
            extra = {'runner': runner}
186
            LOG.debug('Performing pause for runner: %s', (runner.runner_id), extra=extra)
187
            (status, result, context) = runner.pause()
188
        except:
189
            _, ex, tb = sys.exc_info()
190
            # include the error message and traceback to try and provide some hints.
191
            status = action_constants.LIVEACTION_STATUS_FAILED
192
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
193
            context = runner.liveaction.context
194
            LOG.exception('Failed to pause action %s.' % (runner.liveaction.id), extra=result)
195
        finally:
196
            # Update the final status of liveaction and corresponding action execution.
197
            runner.liveaction = self._update_status(runner.liveaction.id, status, result, context)
198
199
            # Always clean-up the auth_token
200
            self._clean_up_auth_token(runner=runner, status=runner.liveaction.status)
201
202
        return runner.liveaction
203
204
    def _do_resume(self, runner):
205
        try:
206
            extra = {'runner': runner}
207
            LOG.debug('Performing resume for runner: %s', (runner.runner_id), extra=extra)
208
            (status, result, context) = runner.resume()
209
            result = jsonify.try_loads(result)
210
            action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
211
212
            if (isinstance(runner, PollingAsyncActionRunner) and
213
                    runner.is_polling_enabled() and not action_completed):
214
                queries.setup_query(runner.liveaction.id, runner.runner_type, context)
215
        except:
216
            _, ex, tb = sys.exc_info()
217
            # include the error message and traceback to try and provide some hints.
218
            status = action_constants.LIVEACTION_STATUS_FAILED
219
            result = {'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))}
220
            context = runner.liveaction.context
221
            LOG.exception('Failed to resume action %s.' % (runner.liveaction.id), extra=result)
222
        finally:
223
            # Update the final status of liveaction and corresponding action execution.
224
            runner.liveaction = self._update_status(runner.liveaction.id, status, result, context)
225
226
            # Always clean-up the auth_token
227
            # This method should be called in the finally block to ensure post_run is not impacted.
228
            self._clean_up_auth_token(runner=runner, status=runner.liveaction.status)
229
230
        LOG.debug('Performing post_run for runner: %s', runner.runner_id)
231
        runner.post_run(status=status, result=result)
232
233
        LOG.debug('Runner do_run result', extra={'result': runner.liveaction.result})
234
        LOG.audit('Liveaction completed', extra={'liveaction_db': runner.liveaction})
235
236
        return runner.liveaction
237
238
    def _clean_up_auth_token(self, runner, status):
239
        """
240
        Clean up the temporary auth token for the current action.
241
242
        Note: This method should never throw since it's called inside finally block which assumes
243
        it doesn't throw.
244
        """
245
        # Deletion of the runner generated auth token is delayed until the token expires.
246
        # Async actions such as Mistral workflows uses the auth token to launch other
247
        # actions in the workflow. If the auth token is deleted here, then the actions
248
        # in the workflow will fail with unauthorized exception.
249
        is_async_runner = isinstance(runner, AsyncActionRunner)
250
        action_completed = status in action_constants.LIVEACTION_COMPLETED_STATES
251
252
        if not is_async_runner or (is_async_runner and action_completed):
253
            try:
254
                self._delete_auth_token(runner.auth_token)
255
            except:
256
                LOG.exception('Unable to clean-up auth_token.')
257
258
            return True
259
260
        return False
261
262
    def _update_live_action_db(self, liveaction_id, status, result, context):
263
        """
264
        Update LiveActionDB object for the provided liveaction id.
265
        """
266
        liveaction_db = get_liveaction_by_id(liveaction_id)
267
268
        state_changed = (liveaction_db.status != status)
269
270
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
271
            end_timestamp = date_utils.get_datetime_utc_now()
272
        else:
273
            end_timestamp = None
274
275
        liveaction_db = update_liveaction_status(status=status,
276
                                                 result=result,
277
                                                 context=context,
278
                                                 end_timestamp=end_timestamp,
279
                                                 liveaction_db=liveaction_db)
280
        return (liveaction_db, state_changed)
281
282
    def _update_status(self, liveaction_id, status, result, context):
283
        try:
284
            LOG.debug('Setting status: %s for liveaction: %s', status, liveaction_id)
285
            liveaction_db, state_changed = self._update_live_action_db(
286
                liveaction_id, status, result, context)
287
        except Exception as e:
288
            LOG.exception(
289
                'Cannot update liveaction '
290
                '(id: %s, status: %s, result: %s).' % (
291
                    liveaction_id, status, result)
292
            )
293
            raise e
294
295
        try:
296
            executions.update_execution(liveaction_db, publish=state_changed)
297
            extra = {'liveaction_db': liveaction_db}
298
            LOG.debug('Updated liveaction after run', extra=extra)
299
        except Exception as e:
300
            LOG.exception(
301
                'Cannot update action execution for liveaction '
302
                '(id: %s, status: %s, result: %s).' % (
303
                    liveaction_id, status, result)
304
            )
305
            raise e
306
307
        return liveaction_db
308
309
    def _get_entry_point_abs_path(self, pack, entry_point):
310
        return content_utils.get_entry_point_abs_path(pack=pack, entry_point=entry_point)
311
312
    def _get_action_libs_abs_path(self, pack, entry_point):
313
        return content_utils.get_action_libs_abs_path(pack=pack, entry_point=entry_point)
314
315
    def _get_rerun_reference(self, context):
316
        execution_id = context.get('re-run', {}).get('ref')
317
        return ActionExecution.get_by_id(execution_id) if execution_id else None
318
319
    def _get_runner(self, runner_type_db, action_db, liveaction_db):
320
        resolved_entry_point = self._get_entry_point_abs_path(action_db.pack, action_db.entry_point)
321
        context = getattr(liveaction_db, 'context', dict())
322
        user = context.get('user', cfg.CONF.system_user.user)
323
        config = None
324
325
        # Note: Right now configs are only supported by the Python runner actions
326
        if runner_type_db.runner_module == 'python_runner':
327
            LOG.debug('Loading config from pack for python runner.')
328
            config_loader = ContentPackConfigLoader(pack_name=action_db.pack, user=user)
329
            config = config_loader.get_config()
330
331
        runner = get_runner(
332
            package_name=runner_type_db.runner_package,
333
            module_name=runner_type_db.runner_module,
334
            config=config)
335
336
        # TODO: Pass those arguments to the constructor instead of late
337
        # assignment, late assignment is awful
338
        runner.runner_type = runner_type_db
339
        runner.action = action_db
340
        runner.action_name = action_db.name
341
        runner.liveaction = liveaction_db
342
        runner.liveaction_id = str(liveaction_db.id)
343
        runner.execution = ActionExecution.get(liveaction__id=runner.liveaction_id)
344
        runner.execution_id = str(runner.execution.id)
345
        runner.entry_point = resolved_entry_point
346
        runner.context = context
347
        runner.callback = getattr(liveaction_db, 'callback', dict())
348
        runner.libs_dir_path = self._get_action_libs_abs_path(action_db.pack,
349
                                                              action_db.entry_point)
350
351
        # For re-run, get the ActionExecutionDB in which the re-run is based on.
352
        rerun_ref_id = runner.context.get('re-run', {}).get('ref')
353
        runner.rerun_ex_ref = ActionExecution.get(id=rerun_ref_id) if rerun_ref_id else None
354
355
        return runner
356
357
    def _create_auth_token(self, context, action_db, liveaction_db):
358
        if not context:
359
            return None
360
361
        user = context.get('user', None)
362
        if not user:
363
            return None
364
365
        metadata = {
366
            'service': 'actions_container',
367
            'action_name': action_db.name,
368
            'live_action_id': str(liveaction_db.id)
369
370
        }
371
372
        ttl = cfg.CONF.auth.service_token_ttl
373
        token_db = access.create_token(username=user, ttl=ttl, metadata=metadata, service=True)
374
        return token_db
375
376
    def _delete_auth_token(self, auth_token):
377
        if auth_token:
378
            access.delete_token(auth_token.token)
379
380
381
def get_runner_container():
382
    return RunnerContainer()
383