Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

st2actions/st2actions/worker.py (1 issue)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import sys
18
import traceback
19
20
from kombu import Connection
21
22
from st2actions.container.base import RunnerContainer
23
from st2common import log as logging
24
from st2common.constants import action as action_constants
25
from st2common.exceptions.actionrunner import ActionRunnerException
26
from st2common.exceptions.db import StackStormDBObjectNotFoundError
27
from st2common.models.db.liveaction import LiveActionDB
28
from st2common.persistence.execution import ActionExecution
29
from st2common.services import executions
30
from st2common.services import workflows as wf_svc
31
from st2common.transport.consumers import MessageHandler
32
from st2common.transport.consumers import ActionsQueueConsumer
33
from st2common.transport import utils as transport_utils
34
from st2common.util import action_db as action_utils
35
from st2common.util import system_info
36
from st2common.transport import queues
37
38
39
__all__ = [
40
    'ActionExecutionDispatcher',
41
    'get_worker'
42
]
43
44
45
LOG = logging.getLogger(__name__)
46
47
ACTIONRUNNER_QUEUES = [
48
    queues.ACTIONRUNNER_WORK_QUEUE,
49
    queues.ACTIONRUNNER_CANCEL_QUEUE,
50
    queues.ACTIONRUNNER_PAUSE_QUEUE,
51
    queues.ACTIONRUNNER_RESUME_QUEUE
52
]
53
54
ACTIONRUNNER_DISPATCHABLE_STATES = [
55
    action_constants.LIVEACTION_STATUS_SCHEDULED,
56
    action_constants.LIVEACTION_STATUS_CANCELING,
57
    action_constants.LIVEACTION_STATUS_PAUSING,
58
    action_constants.LIVEACTION_STATUS_RESUMING
59
]
60
61
62
class ActionExecutionDispatcher(MessageHandler):
63
    message_type = LiveActionDB
64
65
    def __init__(self, connection, queues):
0 ignored issues
show
Comprehensibility Bug introduced by
queues is re-defining a name which is already available in the outer-scope (previously defined on line 36).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
66
        super(ActionExecutionDispatcher, self).__init__(connection, queues)
67
        self.container = RunnerContainer()
68
        self._running_liveactions = set()
69
70
    def get_queue_consumer(self, connection, queues):
71
        # We want to use a special ActionsQueueConsumer which uses 2 dispatcher pools
72
        return ActionsQueueConsumer(connection=connection, queues=queues, handler=self)
73
74
    def process(self, liveaction):
75
        """Dispatches the LiveAction to appropriate action runner.
76
77
        LiveAction in statuses other than "scheduled" and "canceling" are ignored. If
78
        LiveAction is already canceled and result is empty, the LiveAction
79
        is updated with a generic exception message.
80
81
        :param liveaction: Action execution request.
82
        :type liveaction: ``st2common.models.db.liveaction.LiveActionDB``
83
84
        :rtype: ``dict``
85
        """
86
87
        if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED:
88
            LOG.info('%s is not executing %s (id=%s) with "%s" status.',
89
                     self.__class__.__name__, type(liveaction), liveaction.id, liveaction.status)
90
            if not liveaction.result:
91
                updated_liveaction = action_utils.update_liveaction_status(
92
                    status=liveaction.status,
93
                    result={'message': 'Action execution canceled by user.'},
94
                    liveaction_id=liveaction.id)
95
                executions.update_execution(updated_liveaction)
96
            return
97
98
        if liveaction.status not in ACTIONRUNNER_DISPATCHABLE_STATES:
99
            LOG.info('%s is not dispatching %s (id=%s) with "%s" status.',
100
                     self.__class__.__name__, type(liveaction), liveaction.id, liveaction.status)
101
            return
102
103
        try:
104
            liveaction_db = action_utils.get_liveaction_by_id(liveaction.id)
105
        except StackStormDBObjectNotFoundError:
106
            LOG.exception('Failed to find liveaction %s in the database.', liveaction.id)
107
            raise
108
109
        if liveaction.status != liveaction_db.status:
110
            LOG.warning(
111
                'The status of liveaction %s has changed from %s to %s '
112
                'while in the queue waiting for processing.',
113
                liveaction.id,
114
                liveaction.status,
115
                liveaction_db.status
116
            )
117
118
        dispatchers = {
119
            action_constants.LIVEACTION_STATUS_SCHEDULED: self._run_action,
120
            action_constants.LIVEACTION_STATUS_CANCELING: self._cancel_action,
121
            action_constants.LIVEACTION_STATUS_PAUSING: self._pause_action,
122
            action_constants.LIVEACTION_STATUS_RESUMING: self._resume_action
123
        }
124
125
        return dispatchers[liveaction.status](liveaction)
126
127
    def shutdown(self):
128
        super(ActionExecutionDispatcher, self).shutdown()
129
        # Abandon running executions if incomplete
130
        while self._running_liveactions:
131
            liveaction_id = self._running_liveactions.pop()
132
            try:
133
                executions.abandon_execution_if_incomplete(liveaction_id=liveaction_id)
134
            except:
135
                LOG.exception('Failed to abandon liveaction %s.', liveaction_id)
136
137
    def _run_action(self, liveaction_db):
138
        # stamp liveaction with process_info
139
        runner_info = system_info.get_process_info()
140
141
        # Update liveaction status to "running"
142
        liveaction_db = action_utils.update_liveaction_status(
143
            status=action_constants.LIVEACTION_STATUS_RUNNING,
144
            runner_info=runner_info,
145
            liveaction_id=liveaction_db.id)
146
147
        self._running_liveactions.add(liveaction_db.id)
148
149
        action_execution_db = executions.update_execution(liveaction_db)
150
151
        # Launch action
152
        extra = {'action_execution_db': action_execution_db, 'liveaction_db': liveaction_db}
153
        LOG.audit('Launching action execution.', extra=extra)
154
155
        # the extra field will not be shown in non-audit logs so temporarily log at info.
156
        LOG.info('Dispatched {~}action_execution: %s / {~}live_action: %s with "%s" status.',
157
                 action_execution_db.id, liveaction_db.id, liveaction_db.status)
158
159
        extra = {'liveaction_db': liveaction_db}
160
        try:
161
            result = self.container.dispatch(liveaction_db)
162
            LOG.debug('Runner dispatch produced result: %s', result)
163
            if not result and not liveaction_db.action_is_workflow:
164
                raise ActionRunnerException('Failed to execute action.')
165
        except:
166
            _, ex, tb = sys.exc_info()
167
            extra['error'] = str(ex)
168
            LOG.info('Action "%s" failed: %s' % (liveaction_db.action, str(ex)), extra=extra)
169
170
            liveaction_db = action_utils.update_liveaction_status(
171
                status=action_constants.LIVEACTION_STATUS_FAILED,
172
                liveaction_id=liveaction_db.id,
173
                result={'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))})
174
            executions.update_execution(liveaction_db)
175
            raise
176
        finally:
177
            # In the case of worker shutdown, the items are removed from _running_liveactions.
178
            # As the subprocesses for action executions are terminated, this finally block
179
            # will be executed. Set remove will result in KeyError if item no longer exists.
180
            # Use set discard to not raise the KeyError.
181
            self._running_liveactions.discard(liveaction_db.id)
182
183
        return result
184
185
    def _cancel_action(self, liveaction_db):
186
        action_execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id))
187
        extra = {'action_execution_db': action_execution_db, 'liveaction_db': liveaction_db}
188
        LOG.audit('Canceling action execution.', extra=extra)
189
190
        # the extra field will not be shown in non-audit logs so temporarily log at info.
191
        LOG.info('Dispatched {~}action_execution: %s / {~}live_action: %s with "%s" status.',
192
                 action_execution_db.id, liveaction_db.id, liveaction_db.status)
193
194
        try:
195
            result = self.container.dispatch(liveaction_db)
196
            LOG.debug('Runner dispatch produced result: %s', result)
197
        except:
198
            _, ex, tb = sys.exc_info()
199
            extra['error'] = str(ex)
200
            LOG.info('Failed to cancel action execution %s.' % (liveaction_db.id), extra=extra)
201
            raise
202
203
        return result
204
205
    def _pause_action(self, liveaction_db):
206
        action_execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id))
207
        extra = {'action_execution_db': action_execution_db, 'liveaction_db': liveaction_db}
208
        LOG.audit('Pausing action execution.', extra=extra)
209
210
        # the extra field will not be shown in non-audit logs so temporarily log at info.
211
        LOG.info('Dispatched {~}action_execution: %s / {~}live_action: %s with "%s" status.',
212
                 action_execution_db.id, liveaction_db.id, liveaction_db.status)
213
214
        try:
215
            result = self.container.dispatch(liveaction_db)
216
            LOG.debug('Runner dispatch produced result: %s', result)
217
        except:
218
            _, ex, tb = sys.exc_info()
219
            extra['error'] = str(ex)
220
            LOG.info('Failed to pause action execution %s.' % (liveaction_db.id), extra=extra)
221
            raise
222
223
        return result
224
225
    def _resume_action(self, liveaction_db):
226
        action_execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id))
227
        extra = {'action_execution_db': action_execution_db, 'liveaction_db': liveaction_db}
228
        LOG.audit('Resuming action execution.', extra=extra)
229
230
        # the extra field will not be shown in non-audit logs so temporarily log at info.
231
        LOG.info('Dispatched {~}action_execution: %s / {~}live_action: %s with "%s" status.',
232
                 action_execution_db.id, liveaction_db.id, liveaction_db.status)
233
234
        try:
235
            result = self.container.dispatch(liveaction_db)
236
            LOG.debug('Runner dispatch produced result: %s', result)
237
        except:
238
            _, ex, tb = sys.exc_info()
239
            extra['error'] = str(ex)
240
            LOG.info('Failed to resume action execution %s.' % (liveaction_db.id), extra=extra)
241
            raise
242
243
        # Cascade the resume upstream if action execution is child of an orquesta workflow.
244
        # The action service request_resume function is not used here because we do not want
245
        # other peer subworkflows to be resumed.
246
        if 'orquesta' in action_execution_db.context and 'parent' in action_execution_db.context:
247
            wf_svc.handle_action_execution_resume(action_execution_db)
248
249
        return result
250
251
252
def get_worker():
253
    with Connection(transport_utils.get_messaging_urls()) as conn:
254
        return ActionExecutionDispatcher(conn, ACTIONRUNNER_QUEUES)
255