Passed
Pull Request — master (#3507)
by W
07:37 queued 01:55
created

ActionExecutionDispatcher._pause_action()   A

Complexity

Conditions 2

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
dl 0
loc 19
rs 9.4285
c 1
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import sys
17
import traceback
18
19
from kombu import Connection
20
21
from st2actions.container.base import RunnerContainer
22
from st2common import log as logging
23
from st2common.constants import action as action_constants
24
from st2common.exceptions.actionrunner import ActionRunnerException
25
from st2common.exceptions.db import StackStormDBObjectNotFoundError
26
from st2common.models.db.liveaction import LiveActionDB
27
from st2common.persistence.execution import ActionExecution
28
from st2common.services import executions
29
from st2common.transport import liveaction
30
from st2common.transport.consumers import MessageHandler
31
from st2common.transport.consumers import ActionsQueueConsumer
32
from st2common.transport import utils as transport_utils
33
from st2common.util import action_db as action_utils
34
from st2common.util import system_info
35
36
37
LOG = logging.getLogger(__name__)
38
39
ACTIONRUNNER_WORK_Q = liveaction.get_status_management_queue(
40
    'st2.actionrunner.work', routing_key=action_constants.LIVEACTION_STATUS_SCHEDULED)
41
42
ACTIONRUNNER_CANCEL_Q = liveaction.get_status_management_queue(
43
    'st2.actionrunner.canel', routing_key=action_constants.LIVEACTION_STATUS_CANCELING)
44
45
ACTIONRUNNER_PAUSE_Q = liveaction.get_status_management_queue(
46
    'st2.actionrunner.pause', routing_key=action_constants.LIVEACTION_STATUS_PAUSING)
47
48
ACTIONRUNNER_RESUME_Q = liveaction.get_status_management_queue(
49
    'st2.actionrunner.resume', routing_key=action_constants.LIVEACTION_STATUS_RESUMING)
50
51
ACTIONRUNNER_QUEUES = [
52
    ACTIONRUNNER_WORK_Q,
53
    ACTIONRUNNER_CANCEL_Q,
54
    ACTIONRUNNER_PAUSE_Q,
55
    ACTIONRUNNER_RESUME_Q
56
]
57
58
ACTIONRUNNER_DISPATCHABLE_STATES = [
59
    action_constants.LIVEACTION_STATUS_SCHEDULED,
60
    action_constants.LIVEACTION_STATUS_CANCELING,
61
    action_constants.LIVEACTION_STATUS_PAUSING,
62
    action_constants.LIVEACTION_STATUS_RESUMING
63
]
64
65
66
class ActionExecutionDispatcher(MessageHandler):
67
    message_type = LiveActionDB
68
69
    def __init__(self, connection, queues):
70
        super(ActionExecutionDispatcher, self).__init__(connection, queues)
71
        self.container = RunnerContainer()
72
        self._running_liveactions = set()
73
74
    def get_queue_consumer(self, connection, queues):
75
        # We want to use a special ActionsQueueConsumer which uses 2 dispatcher pools
76
        return ActionsQueueConsumer(connection=connection, queues=queues, handler=self)
77
78
    def process(self, liveaction):
0 ignored issues
show
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 29).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
79
        """Dispatches the LiveAction to appropriate action runner.
80
81
        LiveAction in statuses other than "scheduled" and "canceling" are ignored. If
82
        LiveAction is already canceled and result is empty, the LiveAction
83
        is updated with a generic exception message.
84
85
        :param liveaction: Action execution request.
86
        :type liveaction: ``st2common.models.db.liveaction.LiveActionDB``
87
88
        :rtype: ``dict``
89
        """
90
91
        if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED:
92
            LOG.info('%s is not executing %s (id=%s) with "%s" status.',
93
                     self.__class__.__name__, type(liveaction), liveaction.id, liveaction.status)
94
            if not liveaction.result:
95
                updated_liveaction = action_utils.update_liveaction_status(
96
                    status=liveaction.status,
97
                    result={'message': 'Action execution canceled by user.'},
98
                    liveaction_id=liveaction.id)
99
                executions.update_execution(updated_liveaction)
100
            return
101
102
        if liveaction.status not in ACTIONRUNNER_DISPATCHABLE_STATES:
103
            LOG.info('%s is not dispatching %s (id=%s) with "%s" status.',
104
                     self.__class__.__name__, type(liveaction), liveaction.id, liveaction.status)
105
            return
106
107
        try:
108
            liveaction_db = action_utils.get_liveaction_by_id(liveaction.id)
109
        except StackStormDBObjectNotFoundError:
110
            LOG.exception('Failed to find liveaction %s in the database.', liveaction.id)
111
            raise
112
113
        if liveaction.status != liveaction_db.status:
114
            LOG.warning(
115
                'The status of liveaction %s has changed from %s to %s '
116
                'while in the queue waiting for processing.',
117
                liveaction.id,
118
                liveaction.status,
119
                liveaction_db.status
120
            )
121
122
        dispatchers = {
123
            action_constants.LIVEACTION_STATUS_SCHEDULED: self._run_action,
124
            action_constants.LIVEACTION_STATUS_CANCELING: self._cancel_action,
125
            action_constants.LIVEACTION_STATUS_PAUSING: self._pause_action,
126
            action_constants.LIVEACTION_STATUS_RESUMING: self._resume_action
127
        }
128
129
        return dispatchers[liveaction.status](liveaction)
130
131
    def shutdown(self):
132
        super(ActionExecutionDispatcher, self).shutdown()
133
        # Abandon running executions if incomplete
134
        while self._running_liveactions:
135
            liveaction_id = self._running_liveactions.pop()
136
            try:
137
                executions.abandon_execution_if_incomplete(liveaction_id=liveaction_id)
138
            except:
139
                LOG.exception('Failed to abandon liveaction %s.', liveaction_id)
140
141
    def _run_action(self, liveaction_db):
142
        # stamp liveaction with process_info
143
        runner_info = system_info.get_process_info()
144
145
        # Update liveaction status to "running"
146
        liveaction_db = action_utils.update_liveaction_status(
147
            status=action_constants.LIVEACTION_STATUS_RUNNING,
148
            runner_info=runner_info,
149
            liveaction_id=liveaction_db.id)
150
151
        self._running_liveactions.add(liveaction_db.id)
152
153
        action_execution_db = executions.update_execution(liveaction_db)
154
155
        # Launch action
156
        extra = {'action_execution_db': action_execution_db, 'liveaction_db': liveaction_db}
157
        LOG.audit('Launching action execution.', extra=extra)
158
159
        # the extra field will not be shown in non-audit logs so temporarily log at info.
160
        LOG.info('Dispatched {~}action_execution: %s / {~}live_action: %s with "%s" status.',
161
                 action_execution_db.id, liveaction_db.id, liveaction_db.status)
162
163
        extra = {'liveaction_db': liveaction_db}
164
        try:
165
            result = self.container.dispatch(liveaction_db)
166
            LOG.debug('Runner dispatch produced result: %s', result)
167
            if not result:
168
                raise ActionRunnerException('Failed to execute action.')
169
        except:
170
            _, ex, tb = sys.exc_info()
171
            extra['error'] = str(ex)
172
            LOG.info('Action "%s" failed: %s' % (liveaction_db.action, str(ex)), extra=extra)
173
174
            liveaction_db = action_utils.update_liveaction_status(
175
                status=action_constants.LIVEACTION_STATUS_FAILED,
176
                liveaction_id=liveaction_db.id,
177
                result={'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))})
178
            executions.update_execution(liveaction_db)
179
            raise
180
        finally:
181
            # In the case of worker shutdown, the items are removed from _running_liveactions.
182
            # As the subprocesses for action executions are terminated, this finally block
183
            # will be executed. Set remove will result in KeyError if item no longer exists.
184
            # Use set discard to not raise the KeyError.
185
            self._running_liveactions.discard(liveaction_db.id)
186
187
        return result
188
189
    def _cancel_action(self, liveaction_db):
190
        action_execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id))
191
        extra = {'action_execution_db': action_execution_db, 'liveaction_db': liveaction_db}
192
        LOG.audit('Canceling action execution.', extra=extra)
193
194
        # the extra field will not be shown in non-audit logs so temporarily log at info.
195
        LOG.info('Dispatched {~}action_execution: %s / {~}live_action: %s with "%s" status.',
196
                 action_execution_db.id, liveaction_db.id, liveaction_db.status)
197
198
        try:
199
            result = self.container.dispatch(liveaction_db)
200
            LOG.debug('Runner dispatch produced result: %s', result)
201
        except:
202
            _, ex, tb = sys.exc_info()
203
            extra['error'] = str(ex)
204
            LOG.info('Failed to cancel action execution %s.' % (liveaction_db.id), extra=extra)
205
            raise
206
207
        return result
208
209
    def _pause_action(self, liveaction_db):
210
        action_execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id))
211
        extra = {'action_execution_db': action_execution_db, 'liveaction_db': liveaction_db}
212
        LOG.audit('Pausing action execution.', extra=extra)
213
214
        # the extra field will not be shown in non-audit logs so temporarily log at info.
215
        LOG.info('Dispatched {~}action_execution: %s / {~}live_action: %s with "%s" status.',
216
                 action_execution_db.id, liveaction_db.id, liveaction_db.status)
217
218
        try:
219
            result = self.container.dispatch(liveaction_db)
220
            LOG.debug('Runner dispatch produced result: %s', result)
221
        except:
222
            _, ex, tb = sys.exc_info()
223
            extra['error'] = str(ex)
224
            LOG.info('Failed to pause action execution %s.' % (liveaction_db.id), extra=extra)
225
            raise
226
227
        return result
228
229
    def _resume_action(self, liveaction_db):
230
        action_execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id))
231
        extra = {'action_execution_db': action_execution_db, 'liveaction_db': liveaction_db}
232
        LOG.audit('Resuming action execution.', extra=extra)
233
234
        # the extra field will not be shown in non-audit logs so temporarily log at info.
235
        LOG.info('Dispatched {~}action_execution: %s / {~}live_action: %s with "%s" status.',
236
                 action_execution_db.id, liveaction_db.id, liveaction_db.status)
237
238
        try:
239
            result = self.container.dispatch(liveaction_db)
240
            LOG.debug('Runner dispatch produced result: %s', result)
241
        except:
242
            _, ex, tb = sys.exc_info()
243
            extra['error'] = str(ex)
244
            LOG.info('Failed to resume action execution %s.' % (liveaction_db.id), extra=extra)
245
            raise
246
247
        return result
248
249
250
def get_worker():
251
    with Connection(transport_utils.get_messaging_urls()) as conn:
252
        return ActionExecutionDispatcher(conn, ACTIONRUNNER_QUEUES)
253