GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — plexxi-v2.2.1 ( 00dc5d...9862bf )
by
unknown
04:14
created

Querier._invoke_post_run()   A

Complexity

Conditions 1

Size

Total Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 22
rs 9.2
c 0
b 0
f 0

1 Method

Rating   Name   Duplication   Size   Complexity  
A Querier._delete_state_object() 0 8 3
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import abc
17
import eventlet
18
import Queue
19
import six
20
import time
21
22
from oslo_config import cfg
23
24
from st2actions.container.service import RunnerContainerService
25
from st2common import log as logging
26
from st2common.constants import action as action_constants
27
from st2common.persistence.executionstate import ActionExecutionState
28
from st2common.persistence.liveaction import LiveAction
29
from st2common.runners import utils as runners_utils
30
from st2common.services import executions
31
from st2common.util import date as date_utils
32
33
LOG = logging.getLogger(__name__)
34
35
__all__ = [
36
    'Querier',
37
    'QueryContext'
38
]
39
40
41
@six.add_metaclass(abc.ABCMeta)
42
class Querier(object):
43
    delete_state_object_on_error = True
44
45
    def __init__(self, empty_q_sleep_time=5,
46
                 no_workers_sleep_time=1, container_service=None):
47
        self._query_thread_pool_size = cfg.CONF.results_tracker.thread_pool_size
48
        self._query_interval = cfg.CONF.results_tracker.query_interval
49
        self._query_contexts = Queue.Queue()
50
        self._thread_pool = eventlet.GreenPool(self._query_thread_pool_size)
51
        self._empty_q_sleep_time = empty_q_sleep_time
52
        self._no_workers_sleep_time = no_workers_sleep_time
53
        if not container_service:
54
            container_service = RunnerContainerService()
55
        self.container_service = container_service
56
        self._started = False
57
58
    def start(self):
59
        self._started = True
60
        while True:
61
            while self._query_contexts.empty():
62
                eventlet.greenthread.sleep(self._empty_q_sleep_time)
63
            while self._thread_pool.free() <= 0:
64
                eventlet.greenthread.sleep(self._no_workers_sleep_time)
65
            self._fire_queries()
66
            eventlet.sleep(self._query_interval)
67
68
    def add_queries(self, query_contexts=None):
69
        if query_contexts is None:
70
            query_contexts = []
71
        LOG.debug('Adding queries to querier: %s' % query_contexts)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
72
        for query_context in query_contexts:
73
            self._query_contexts.put((time.time(), query_context))
74
75
    def is_started(self):
76
        return self._started
77
78
    def _fire_queries(self):
79
        if self._thread_pool.free() <= 0:
80
            return
81
82
        now = time.time()
83
        reschedule_queries = []
84
85
        while not self._query_contexts.empty() and self._thread_pool.free() > 0:
86
            (last_query_time, query_context) = self._query_contexts.get_nowait()
87
            if now - last_query_time < self._query_interval:
88
                reschedule_queries.append((last_query_time, query_context))
89
                continue
90
            else:
91
                self._thread_pool.spawn(
92
                    self._query_and_save_results,
93
                    query_context,
94
                    last_query_time
95
                )
96
97
        for query in reschedule_queries:
98
            self._query_contexts.put((query[0], query[1]))
99
100
    def _query_and_save_results(self, query_context, last_query_time=None):
101
        this_query_time = time.time()
102
        execution_id = query_context.execution_id
103
        actual_query_context = query_context.query_context
104
105
        LOG.debug('Querying external service for results. Context: %s' % actual_query_context)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
106
        try:
107
            (status, results) = self.query(
108
                execution_id,
109
                actual_query_context,
110
                last_query_time=last_query_time
111
            )
112
        except:
113
            LOG.exception('Failed querying results for liveaction_id %s.', execution_id)
114
            if self.delete_state_object_on_error:
115
                self._delete_state_object(query_context)
116
                LOG.debug('Removed state object %s.', query_context)
117
            return
118
119
        liveaction_db = None
120
        try:
121
            liveaction_db = self._update_action_results(execution_id, status, results)
122
        except Exception:
123
            LOG.exception('Failed updating action results for liveaction_id %s', execution_id)
124
            if self.delete_state_object_on_error:
125
                self._delete_state_object(query_context)
126
                LOG.debug('Removed state object %s.', query_context)
127
            return
128
129
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
130
            if status != action_constants.LIVEACTION_STATUS_CANCELED:
131
                runners_utils.invoke_post_run(liveaction_db)
132
133
            self._delete_state_object(query_context)
134
135
            return
136
137
        self._query_contexts.put((this_query_time, query_context))
138
139
    def _update_action_results(self, execution_id, status, results):
140
        liveaction_db = LiveAction.get_by_id(execution_id)
141
        if not liveaction_db:
142
            raise Exception('No DB model for liveaction_id: %s' % execution_id)
143
144
        if liveaction_db.status != action_constants.LIVEACTION_STATUS_CANCELED:
145
            liveaction_db.status = status
146
147
        liveaction_db.result = results
148
149
        # Action has completed, record end_timestamp
150
        if (liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES and
151
                not liveaction_db.end_timestamp):
152
            liveaction_db.end_timestamp = date_utils.get_datetime_utc_now()
153
154
        # update liveaction, update actionexecution and then publish update.
155
        updated_liveaction = LiveAction.add_or_update(liveaction_db, publish=False)
156
        executions.update_execution(updated_liveaction)
157
        LiveAction.publish_update(updated_liveaction)
158
159
        return updated_liveaction
160
161
    def _delete_state_object(self, query_context):
162
        state_db = ActionExecutionState.get_by_id(query_context.id)
163
        if state_db is not None:
164
            try:
165
                LOG.info('Clearing state object: %s', state_db)
166
                ActionExecutionState.delete(state_db)
167
            except:
168
                LOG.exception('Failed clearing state object: %s', state_db)
169
170
    def query(self, execution_id, query_context, last_query_time=None):
171
        """
172
        This is the method individual queriers must implement.
173
        This method should return a tuple of (status, results).
174
175
        status should be one of LIVEACTION_STATUS_SUCCEEDED, LIVEACTION_STATUS_RUNNING,
176
        LIVEACTION_STATUS_FAILED defined in st2common.constants.action.
177
        """
178
        pass
179
180
    def print_stats(self):
181
        LOG.info('\t --- Name: %s, pending queuries: %d', self.__class__.__name__,
182
                 self._query_contexts.qsize())
183
184
185
class QueryContext(object):
186
    def __init__(self, obj_id, execution_id, query_context, query_module):
187
        self.id = obj_id
188
        self.execution_id = execution_id
189
        self.query_context = query_context
190
        self.query_module = query_module
191
192
    @classmethod
193
    def from_model(cls, model):
194
        return QueryContext(str(model.id), str(model.execution_id), model.query_context,
195
                            model.query_module)
196
197
    def __repr__(self):
198
        return ('<QueryContext id=%s,execution_id=%s,query_context=%s>' %
199
                (self.id, self.execution_id, self.query_context))
200