Completed
Pull Request — master (#2299)
by Manas
06:08
created

st2actions.ActionExecutionDispatcher.shutdown()   A

Complexity

Conditions 2

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 11
rs 9.4286
cc 2
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import sys
17
import traceback
18
19
from kombu import Connection
20
21
from st2actions.container.base import RunnerContainer
22
from st2common import log as logging
23
from st2common.constants import action as action_constants
24
from st2common.exceptions.actionrunner import ActionRunnerException
25
from st2common.exceptions.db import StackStormDBObjectNotFoundError
26
from st2common.models.db.liveaction import LiveActionDB
27
from st2common.persistence.execution import ActionExecution
28
from st2common.services import executions
29
from st2common.transport import consumers, liveaction
30
from st2common.transport import utils as transport_utils
31
from st2common.util import action_db as action_utils
32
from st2common.util import system_info
33
34
35
LOG = logging.getLogger(__name__)
36
37
ACTIONRUNNER_WORK_Q = liveaction.get_status_management_queue(
38
    'st2.actionrunner.work', routing_key=action_constants.LIVEACTION_STATUS_SCHEDULED)
39
40
ACTIONRUNNER_CANCEL_Q = liveaction.get_status_management_queue(
41
    'st2.actionrunner.canel', routing_key=action_constants.LIVEACTION_STATUS_CANCELING)
42
43
44
class ActionExecutionDispatcher(consumers.MessageHandler):
45
    message_type = LiveActionDB
46
47
    def __init__(self, connection, queues):
48
        super(ActionExecutionDispatcher, self).__init__(connection, queues)
49
        self.container = RunnerContainer()
50
        self._running_liveactions = set()
51
52
    def process(self, liveaction):
0 ignored issues
show
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 29).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
53
        """Dispatches the LiveAction to appropriate action runner.
54
55
        LiveAction in statuses other than "scheduled" and "canceling" are ignored. If
56
        LiveAction is already canceled and result is empty, the LiveAction
57
        is updated with a generic exception message.
58
59
        :param liveaction: Action execution request.
60
        :type liveaction: ``st2common.models.db.liveaction.LiveActionDB``
61
62
        :rtype: ``dict``
63
        """
64
65
        if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED:
66
            LOG.info('%s is not executing %s (id=%s) with "%s" status.',
67
                     self.__class__.__name__, type(liveaction), liveaction.id, liveaction.status)
68
            if not liveaction.result:
69
                updated_liveaction = action_utils.update_liveaction_status(
70
                    status=liveaction.status,
71
                    result={'message': 'Action execution canceled by user.'},
72
                    liveaction_id=liveaction.id)
73
                executions.update_execution(updated_liveaction)
74
            return
75
76
        if liveaction.status not in [action_constants.LIVEACTION_STATUS_SCHEDULED,
77
                                     action_constants.LIVEACTION_STATUS_CANCELING]:
78
            LOG.info('%s is not dispatching %s (id=%s) with "%s" status.',
79
                     self.__class__.__name__, type(liveaction), liveaction.id, liveaction.status)
80
            return
81
82
        try:
83
            liveaction_db = action_utils.get_liveaction_by_id(liveaction.id)
84
        except StackStormDBObjectNotFoundError:
85
            LOG.exception('Failed to find liveaction %s in the database.', liveaction.id)
86
            raise
87
88
        return (self._run_action(liveaction_db)
89
                if liveaction.status == action_constants.LIVEACTION_STATUS_SCHEDULED
90
                else self._cancel_action(liveaction_db))
91
92
    def shutdown(self):
93
        super(ActionExecutionDispatcher, self).shutdown()
94
        # Abandon running executions.
95
        for liveaction_id in self._running_liveactions:
96
            liveaction_db = action_utils.update_liveaction_status(
97
                status=action_constants.LIVEACTION_STATUS_ABANDONED,
98
                liveaction_id=liveaction_id,
99
                result={})
100
            execution_db = executions.update_execution(liveaction_db)
101
            LOG.info('Marked execution %s as %s.', execution_db.id,
102
                     action_constants.LIVEACTION_STATUS_ABANDONED)
103
104
    def _run_action(self, liveaction_db):
105
        # stamp liveaction with process_info
106
        runner_info = system_info.get_process_info()
107
108
        # Update liveaction status to "running"
109
        liveaction_db = action_utils.update_liveaction_status(
110
            status=action_constants.LIVEACTION_STATUS_RUNNING,
111
            runner_info=runner_info,
112
            liveaction_id=liveaction_db.id)
113
114
        self._running_liveactions.add(liveaction_db.id)
115
116
        action_execution_db = executions.update_execution(liveaction_db)
117
118
        # Launch action
119
        extra = {'action_execution_db': action_execution_db, 'liveaction_db': liveaction_db}
120
        LOG.audit('Launching action execution.', extra=extra)
121
122
        # the extra field will not be shown in non-audit logs so temporarily log at info.
123
        LOG.info('Dispatched {~}action_execution: %s / {~}live_action: %s with "%s" status.',
124
                 action_execution_db.id, liveaction_db.id, liveaction_db.status)
125
126
        extra = {'liveaction_db': liveaction_db}
127
        try:
128
            result = self.container.dispatch(liveaction_db)
129
            LOG.debug('Runner dispatch produced result: %s', result)
130
            if not result:
131
                raise ActionRunnerException('Failed to execute action.')
132
        except:
133
            _, ex, tb = sys.exc_info()
134
            extra['error'] = str(ex)
135
            LOG.info('Action "%s" failed: %s' % (liveaction_db.action, str(ex)), extra=extra)
136
137
            liveaction_db = action_utils.update_liveaction_status(
138
                status=action_constants.LIVEACTION_STATUS_FAILED,
139
                liveaction_id=liveaction_db.id,
140
                result={'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))})
141
            executions.update_execution(liveaction_db)
142
            raise
143
        finally:
144
            self._running_liveactions.remove(liveaction_db.id)
145
146
        return result
147
148
    def _cancel_action(self, liveaction_db):
149
        action_execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id))
150
        extra = {'action_execution_db': action_execution_db, 'liveaction_db': liveaction_db}
151
        LOG.audit('Canceling action execution.', extra=extra)
152
153
        # the extra field will not be shown in non-audit logs so temporarily log at info.
154
        LOG.info('Dispatched {~}action_execution: %s / {~}live_action: %s with "%s" status.',
155
                 action_execution_db.id, liveaction_db.id, liveaction_db.status)
156
157
        try:
158
            result = self.container.dispatch(liveaction_db)
159
            LOG.debug('Runner dispatch produced result: %s', result)
160
        except:
161
            _, ex, tb = sys.exc_info()
162
            extra['error'] = str(ex)
163
            LOG.info('Failed to cancel action execution %s.' % (liveaction_db.id), extra=extra)
164
            raise
165
166
        return result
167
168
169
def get_worker():
170
    with Connection(transport_utils.get_messaging_urls()) as conn:
171
        return ActionExecutionDispatcher(conn, [ACTIONRUNNER_WORK_Q, ACTIONRUNNER_CANCEL_Q])
172