GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

Issues (503)

st2actions/st2actions/worker.py (2 issues)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import sys
17
import traceback
18
19
from kombu import Connection
20
21
from st2actions.container.base import RunnerContainer
22
from st2common import log as logging
23
from st2common.constants import action as action_constants
24
from st2common.exceptions.actionrunner import ActionRunnerException
25
from st2common.exceptions.db import StackStormDBObjectNotFoundError
26
from st2common.models.db.liveaction import LiveActionDB
27
from st2common.persistence.execution import ActionExecution
28
from st2common.services import executions
29
from st2common.transport.consumers import MessageHandler
30
from st2common.transport.consumers import ActionsQueueConsumer
31
from st2common.transport import utils as transport_utils
32
from st2common.util import action_db as action_utils
33
from st2common.util import system_info
34
from st2common.transport import queues
35
36
37
__all__ = [
38
    'ActionExecutionDispatcher',
39
    'get_worker'
40
]
41
42
43
LOG = logging.getLogger(__name__)
44
45
ACTIONRUNNER_QUEUES = [
46
    queues.ACTIONRUNNER_WORK_QUEUE,
47
    queues.ACTIONRUNNER_CANCEL_QUEUE,
48
    queues.ACTIONRUNNER_PAUSE_QUEUE,
49
    queues.ACTIONRUNNER_RESUME_QUEUE
50
]
51
52
ACTIONRUNNER_DISPATCHABLE_STATES = [
53
    action_constants.LIVEACTION_STATUS_SCHEDULED,
54
    action_constants.LIVEACTION_STATUS_CANCELING,
55
    action_constants.LIVEACTION_STATUS_PAUSING,
56
    action_constants.LIVEACTION_STATUS_RESUMING
57
]
58
59
60
class ActionExecutionDispatcher(MessageHandler):
61
    message_type = LiveActionDB
62
63
    def __init__(self, connection, queues):
0 ignored issues
show
Comprehensibility Bug introduced by
queues is re-defining a name which is already available in the outer-scope (previously defined on line 34).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
64
        super(ActionExecutionDispatcher, self).__init__(connection, queues)
65
        self.container = RunnerContainer()
66
        self._running_liveactions = set()
67
68
    def get_queue_consumer(self, connection, queues):
0 ignored issues
show
Comprehensibility Bug introduced by
queues is re-defining a name which is already available in the outer-scope (previously defined on line 34).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
69
        # We want to use a special ActionsQueueConsumer which uses 2 dispatcher pools
70
        return ActionsQueueConsumer(connection=connection, queues=queues, handler=self)
71
72
    def process(self, liveaction):
73
        """Dispatches the LiveAction to appropriate action runner.
74
75
        LiveAction in statuses other than "scheduled" and "canceling" are ignored. If
76
        LiveAction is already canceled and result is empty, the LiveAction
77
        is updated with a generic exception message.
78
79
        :param liveaction: Action execution request.
80
        :type liveaction: ``st2common.models.db.liveaction.LiveActionDB``
81
82
        :rtype: ``dict``
83
        """
84
85
        if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED:
86
            LOG.info('%s is not executing %s (id=%s) with "%s" status.',
87
                     self.__class__.__name__, type(liveaction), liveaction.id, liveaction.status)
88
            if not liveaction.result:
89
                updated_liveaction = action_utils.update_liveaction_status(
90
                    status=liveaction.status,
91
                    result={'message': 'Action execution canceled by user.'},
92
                    liveaction_id=liveaction.id)
93
                executions.update_execution(updated_liveaction)
94
            return
95
96
        if liveaction.status not in ACTIONRUNNER_DISPATCHABLE_STATES:
97
            LOG.info('%s is not dispatching %s (id=%s) with "%s" status.',
98
                     self.__class__.__name__, type(liveaction), liveaction.id, liveaction.status)
99
            return
100
101
        try:
102
            liveaction_db = action_utils.get_liveaction_by_id(liveaction.id)
103
        except StackStormDBObjectNotFoundError:
104
            LOG.exception('Failed to find liveaction %s in the database.', liveaction.id)
105
            raise
106
107
        if liveaction.status != liveaction_db.status:
108
            LOG.warning(
109
                'The status of liveaction %s has changed from %s to %s '
110
                'while in the queue waiting for processing.',
111
                liveaction.id,
112
                liveaction.status,
113
                liveaction_db.status
114
            )
115
116
        dispatchers = {
117
            action_constants.LIVEACTION_STATUS_SCHEDULED: self._run_action,
118
            action_constants.LIVEACTION_STATUS_CANCELING: self._cancel_action,
119
            action_constants.LIVEACTION_STATUS_PAUSING: self._pause_action,
120
            action_constants.LIVEACTION_STATUS_RESUMING: self._resume_action
121
        }
122
123
        return dispatchers[liveaction.status](liveaction)
124
125
    def shutdown(self):
126
        super(ActionExecutionDispatcher, self).shutdown()
127
        # Abandon running executions if incomplete
128
        while self._running_liveactions:
129
            liveaction_id = self._running_liveactions.pop()
130
            try:
131
                executions.abandon_execution_if_incomplete(liveaction_id=liveaction_id)
132
            except:
133
                LOG.exception('Failed to abandon liveaction %s.', liveaction_id)
134
135
    def _run_action(self, liveaction_db):
136
        # stamp liveaction with process_info
137
        runner_info = system_info.get_process_info()
138
139
        # Update liveaction status to "running"
140
        liveaction_db = action_utils.update_liveaction_status(
141
            status=action_constants.LIVEACTION_STATUS_RUNNING,
142
            runner_info=runner_info,
143
            liveaction_id=liveaction_db.id)
144
145
        self._running_liveactions.add(liveaction_db.id)
146
147
        action_execution_db = executions.update_execution(liveaction_db)
148
149
        # Launch action
150
        extra = {'action_execution_db': action_execution_db, 'liveaction_db': liveaction_db}
151
        LOG.audit('Launching action execution.', extra=extra)
152
153
        # the extra field will not be shown in non-audit logs so temporarily log at info.
154
        LOG.info('Dispatched {~}action_execution: %s / {~}live_action: %s with "%s" status.',
155
                 action_execution_db.id, liveaction_db.id, liveaction_db.status)
156
157
        extra = {'liveaction_db': liveaction_db}
158
        try:
159
            result = self.container.dispatch(liveaction_db)
160
            LOG.debug('Runner dispatch produced result: %s', result)
161
            if not result:
162
                raise ActionRunnerException('Failed to execute action.')
163
        except:
164
            _, ex, tb = sys.exc_info()
165
            extra['error'] = str(ex)
166
            LOG.info('Action "%s" failed: %s' % (liveaction_db.action, str(ex)), extra=extra)
167
168
            liveaction_db = action_utils.update_liveaction_status(
169
                status=action_constants.LIVEACTION_STATUS_FAILED,
170
                liveaction_id=liveaction_db.id,
171
                result={'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))})
172
            executions.update_execution(liveaction_db)
173
            raise
174
        finally:
175
            # In the case of worker shutdown, the items are removed from _running_liveactions.
176
            # As the subprocesses for action executions are terminated, this finally block
177
            # will be executed. Set remove will result in KeyError if item no longer exists.
178
            # Use set discard to not raise the KeyError.
179
            self._running_liveactions.discard(liveaction_db.id)
180
181
        return result
182
183
    def _cancel_action(self, liveaction_db):
184
        action_execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id))
185
        extra = {'action_execution_db': action_execution_db, 'liveaction_db': liveaction_db}
186
        LOG.audit('Canceling action execution.', extra=extra)
187
188
        # the extra field will not be shown in non-audit logs so temporarily log at info.
189
        LOG.info('Dispatched {~}action_execution: %s / {~}live_action: %s with "%s" status.',
190
                 action_execution_db.id, liveaction_db.id, liveaction_db.status)
191
192
        try:
193
            result = self.container.dispatch(liveaction_db)
194
            LOG.debug('Runner dispatch produced result: %s', result)
195
        except:
196
            _, ex, tb = sys.exc_info()
197
            extra['error'] = str(ex)
198
            LOG.info('Failed to cancel action execution %s.' % (liveaction_db.id), extra=extra)
199
            raise
200
201
        return result
202
203
    def _pause_action(self, liveaction_db):
204
        action_execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id))
205
        extra = {'action_execution_db': action_execution_db, 'liveaction_db': liveaction_db}
206
        LOG.audit('Pausing action execution.', extra=extra)
207
208
        # the extra field will not be shown in non-audit logs so temporarily log at info.
209
        LOG.info('Dispatched {~}action_execution: %s / {~}live_action: %s with "%s" status.',
210
                 action_execution_db.id, liveaction_db.id, liveaction_db.status)
211
212
        try:
213
            result = self.container.dispatch(liveaction_db)
214
            LOG.debug('Runner dispatch produced result: %s', result)
215
        except:
216
            _, ex, tb = sys.exc_info()
217
            extra['error'] = str(ex)
218
            LOG.info('Failed to pause action execution %s.' % (liveaction_db.id), extra=extra)
219
            raise
220
221
        return result
222
223
    def _resume_action(self, liveaction_db):
224
        action_execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id))
225
        extra = {'action_execution_db': action_execution_db, 'liveaction_db': liveaction_db}
226
        LOG.audit('Resuming action execution.', extra=extra)
227
228
        # the extra field will not be shown in non-audit logs so temporarily log at info.
229
        LOG.info('Dispatched {~}action_execution: %s / {~}live_action: %s with "%s" status.',
230
                 action_execution_db.id, liveaction_db.id, liveaction_db.status)
231
232
        try:
233
            result = self.container.dispatch(liveaction_db)
234
            LOG.debug('Runner dispatch produced result: %s', result)
235
        except:
236
            _, ex, tb = sys.exc_info()
237
            extra['error'] = str(ex)
238
            LOG.info('Failed to resume action execution %s.' % (liveaction_db.id), extra=extra)
239
            raise
240
241
        return result
242
243
244
def get_worker():
245
    with Connection(transport_utils.get_messaging_urls()) as conn:
246
        return ActionExecutionDispatcher(conn, ACTIONRUNNER_QUEUES)
247