GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — regana/mistral_and_perf_fixes ( 2321f2...9862bf )
by
unknown
03:48
created

ActionExecutionDispatcher._run_action()   A

Complexity

Conditions 3

Size

Total Lines 47

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
dl 0
loc 47
rs 9.0303
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import sys
17
import traceback
18
19
from kombu import Connection
20
21
from st2actions.container.base import RunnerContainer
22
from st2common import log as logging
23
from st2common.constants import action as action_constants
24
from st2common.exceptions.actionrunner import ActionRunnerException
25
from st2common.exceptions.db import StackStormDBObjectNotFoundError
26
from st2common.models.db.liveaction import LiveActionDB
27
from st2common.persistence.execution import ActionExecution
28
from st2common.services import executions
29
from st2common.transport import liveaction
30
from st2common.transport.consumers import MessageHandler
31
from st2common.transport.consumers import ActionsQueueConsumer
32
from st2common.transport import utils as transport_utils
33
from st2common.util import action_db as action_utils
34
from st2common.util import system_info
35
36
37
LOG = logging.getLogger(__name__)
38
39
ACTIONRUNNER_WORK_Q = liveaction.get_status_management_queue(
40
    'st2.actionrunner.work', routing_key=action_constants.LIVEACTION_STATUS_SCHEDULED)
41
42
ACTIONRUNNER_CANCEL_Q = liveaction.get_status_management_queue(
43
    'st2.actionrunner.canel', routing_key=action_constants.LIVEACTION_STATUS_CANCELING)
44
45
46
class ActionExecutionDispatcher(MessageHandler):
47
    message_type = LiveActionDB
48
49
    def __init__(self, connection, queues):
50
        super(ActionExecutionDispatcher, self).__init__(connection, queues)
51
        self.container = RunnerContainer()
52
        self._running_liveactions = set()
53
54
    def get_queue_consumer(self, connection, queues):
55
        # We want to use a special ActionsQueueConsumer which uses 2 dispatcher pools
56
        return ActionsQueueConsumer(connection=connection, queues=queues, handler=self)
57
58
    def process(self, liveaction):
0 ignored issues
show
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 29).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
59
        """Dispatches the LiveAction to appropriate action runner.
60
61
        LiveAction in statuses other than "scheduled" and "canceling" are ignored. If
62
        LiveAction is already canceled and result is empty, the LiveAction
63
        is updated with a generic exception message.
64
65
        :param liveaction: Action execution request.
66
        :type liveaction: ``st2common.models.db.liveaction.LiveActionDB``
67
68
        :rtype: ``dict``
69
        """
70
71
        if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED:
72
            LOG.info('%s is not executing %s (id=%s) with "%s" status.',
73
                     self.__class__.__name__, type(liveaction), liveaction.id, liveaction.status)
74
            if not liveaction.result:
75
                updated_liveaction = action_utils.update_liveaction_status(
76
                    status=liveaction.status,
77
                    result={'message': 'Action execution canceled by user.'},
78
                    liveaction_id=liveaction.id)
79
                executions.update_execution(updated_liveaction)
80
            return
81
82
        if liveaction.status not in [action_constants.LIVEACTION_STATUS_SCHEDULED,
83
                                     action_constants.LIVEACTION_STATUS_CANCELING]:
84
            LOG.info('%s is not dispatching %s (id=%s) with "%s" status.',
85
                     self.__class__.__name__, type(liveaction), liveaction.id, liveaction.status)
86
            return
87
88
        try:
89
            liveaction_db = action_utils.get_liveaction_by_id(liveaction.id)
90
        except StackStormDBObjectNotFoundError:
91
            LOG.exception('Failed to find liveaction %s in the database.', liveaction.id)
92
            raise
93
94
        return (self._run_action(liveaction_db)
95
                if liveaction.status == action_constants.LIVEACTION_STATUS_SCHEDULED
96
                else self._cancel_action(liveaction_db))
97
98
    def shutdown(self):
99
        super(ActionExecutionDispatcher, self).shutdown()
100
        # Abandon running executions if incomplete
101
        while self._running_liveactions:
102
            liveaction_id = self._running_liveactions.pop()
103
            try:
104
                executions.abandon_execution_if_incomplete(liveaction_id=liveaction_id)
105
            except:
106
                LOG.exception('Failed to abandon liveaction %s.', liveaction_id)
107
108
    def _run_action(self, liveaction_db):
109
        # stamp liveaction with process_info
110
        runner_info = system_info.get_process_info()
111
112
        # Update liveaction status to "running"
113
        liveaction_db = action_utils.update_liveaction_status(
114
            status=action_constants.LIVEACTION_STATUS_RUNNING,
115
            runner_info=runner_info,
116
            liveaction_id=liveaction_db.id)
117
118
        self._running_liveactions.add(liveaction_db.id)
119
120
        action_execution_db = executions.update_execution(liveaction_db)
121
122
        # Launch action
123
        extra = {'action_execution_db': action_execution_db, 'liveaction_db': liveaction_db}
124
        LOG.audit('Launching action execution.', extra=extra)
125
126
        # the extra field will not be shown in non-audit logs so temporarily log at info.
127
        LOG.info('Dispatched {~}action_execution: %s / {~}live_action: %s with "%s" status.',
128
                 action_execution_db.id, liveaction_db.id, liveaction_db.status)
129
130
        extra = {'liveaction_db': liveaction_db}
131
        try:
132
            result = self.container.dispatch(liveaction_db)
133
            LOG.debug('Runner dispatch produced result: %s', result)
134
            if not result:
135
                raise ActionRunnerException('Failed to execute action.')
136
        except:
137
            _, ex, tb = sys.exc_info()
138
            extra['error'] = str(ex)
139
            LOG.info('Action "%s" failed: %s' % (liveaction_db.action, str(ex)), extra=extra)
140
141
            liveaction_db = action_utils.update_liveaction_status(
142
                status=action_constants.LIVEACTION_STATUS_FAILED,
143
                liveaction_id=liveaction_db.id,
144
                result={'error': str(ex), 'traceback': ''.join(traceback.format_tb(tb, 20))})
145
            executions.update_execution(liveaction_db)
146
            raise
147
        finally:
148
            # In the case of worker shutdown, the items are removed from _running_liveactions.
149
            # As the subprocesses for action executions are terminated, this finally block
150
            # will be executed. Set remove will result in KeyError if item no longer exists.
151
            # Use set discard to not raise the KeyError.
152
            self._running_liveactions.discard(liveaction_db.id)
153
154
        return result
155
156
    def _cancel_action(self, liveaction_db):
157
        action_execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id))
158
        extra = {'action_execution_db': action_execution_db, 'liveaction_db': liveaction_db}
159
        LOG.audit('Canceling action execution.', extra=extra)
160
161
        # the extra field will not be shown in non-audit logs so temporarily log at info.
162
        LOG.info('Dispatched {~}action_execution: %s / {~}live_action: %s with "%s" status.',
163
                 action_execution_db.id, liveaction_db.id, liveaction_db.status)
164
165
        try:
166
            result = self.container.dispatch(liveaction_db)
167
            LOG.debug('Runner dispatch produced result: %s', result)
168
        except:
169
            _, ex, tb = sys.exc_info()
170
            extra['error'] = str(ex)
171
            LOG.info('Failed to cancel action execution %s.' % (liveaction_db.id), extra=extra)
172
            raise
173
174
        return result
175
176
177
def get_worker():
178
    with Connection(transport_utils.get_messaging_urls()) as conn:
179
        return ActionExecutionDispatcher(conn, [ACTIONRUNNER_WORK_Q, ACTIONRUNNER_CANCEL_Q])
180