Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

st2common/st2common/query/base.py (1 issue)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import abc
18
import eventlet
19
import six.moves.queue
20
import six
21
import time
22
23
from oslo_config import cfg
24
25
from st2common import log as logging
26
from st2common.constants import action as action_constants
27
from st2common.exceptions import db as db_exc
28
from st2common.persistence.executionstate import ActionExecutionState
29
from st2common.persistence.liveaction import LiveAction
30
from st2common.runners import utils as runners_utils
31
from st2common.services import executions
32
from st2common.util import date as date_utils
33
34
LOG = logging.getLogger(__name__)
35
36
__all__ = [
37
    'Querier',
38
    'QueryContext'
39
]
40
41
42
@six.add_metaclass(abc.ABCMeta)
43
class Querier(object):
44
    delete_state_object_on_error = True
45
46
    def _get_config_value(self, config_option):
47
        config_value = None
48
49
        if 'results_tracker' in cfg.CONF and config_option in cfg.CONF.results_tracker:
50
            config_value = getattr(cfg.CONF.results_tracker, config_option)
51
            LOG.warning('You are using deprecated config group "results_tracker" for "%s". '
52
                        'Please use "resultstracker" group instead.', config_option)
53
54
        if not config_value and config_option in cfg.CONF.resultstracker:
55
            config_value = getattr(cfg.CONF.resultstracker, config_option)
56
57
        return config_value
58
59
    def __init__(self):
60
        self._empty_q_sleep_time = self._get_config_value('empty_q_sleep_time')
61
        self._no_workers_sleep_time = self._get_config_value('no_workers_sleep_time')
62
        self._query_interval = self._get_config_value('query_interval')
63
        self._query_thread_pool_size = self._get_config_value('thread_pool_size')
64
        self._query_contexts = six.moves.queue.Queue()
65
        self._thread_pool = eventlet.GreenPool(self._query_thread_pool_size)
66
        self._started = False
67
68
    def start(self):
69
        self._started = True
70
        while True:
71
            while self._query_contexts.empty():
72
                eventlet.greenthread.sleep(self._empty_q_sleep_time)
73
            while self._thread_pool.free() <= 0:
74
                eventlet.greenthread.sleep(self._no_workers_sleep_time)
75
            self._fire_queries()
76
            eventlet.sleep(self._query_interval)
77
78
    def add_queries(self, query_contexts=None):
79
        if query_contexts is None:
80
            query_contexts = []
81
        LOG.debug('Adding queries to querier: %s' % query_contexts)
82
        for query_context in query_contexts:
83
            self._query_contexts.put((time.time(), query_context))
84
85
    def is_started(self):
86
        return self._started
87
88
    def _fire_queries(self, blocking=False):
89
        if self._thread_pool.free() <= 0:
90
            return
91
92
        now = time.time()
93
        reschedule_queries = []
94
95
        while not self._query_contexts.empty() and self._thread_pool.free() > 0:
96
            (last_query_time, query_context) = self._query_contexts.get_nowait()
97
            if now - last_query_time < self._query_interval:
98
                reschedule_queries.append((last_query_time, query_context))
99
                continue
100
            else:
101
                if not blocking:
102
                    self._thread_pool.spawn(
103
                        self._query_and_save_results,
104
                        query_context,
105
                        last_query_time
106
                    )
107
                # Add an option to block and execute the function directly for unit tests.
108
                else:
109
                    self._query_and_save_results(
110
                        query_context,
111
                        last_query_time
112
                    )
113
114
        for query in reschedule_queries:
115
            self._query_contexts.put((query[0], query[1]))
116
117
    def _query_and_save_results(self, query_context, last_query_time=None):
118
        this_query_time = time.time()
119
        execution_id = query_context.execution_id
120
        actual_query_context = query_context.query_context
121
122
        LOG.debug('Querying external service for results. Context: %s' % actual_query_context)
123
        try:
124
            (status, results) = self.query(
125
                execution_id,
126
                actual_query_context,
127
                last_query_time=last_query_time
128
            )
129
        except:
130
            LOG.exception('Failed querying results for liveaction_id %s.', execution_id)
131
            if self.delete_state_object_on_error:
132
                self._delete_state_object(query_context)
133
                LOG.debug('Removed state object %s.', query_context)
134
            return
135
136
        liveaction_db = None
137
        try:
138
            liveaction_db = self._update_action_results(execution_id, status, results)
139
        except Exception:
140
            LOG.exception('Failed updating action results for liveaction_id %s', execution_id)
141
            if self.delete_state_object_on_error:
142
                self._delete_state_object(query_context)
143
                LOG.debug('Removed state object %s.', query_context)
144
            return
145
146
        if (status in action_constants.LIVEACTION_COMPLETED_STATES or
147
                status == action_constants.LIVEACTION_STATUS_PAUSED):
148
            runners_utils.invoke_post_run(liveaction_db)
149
            self._delete_state_object(query_context)
150
            LOG.debug(
151
                "Detailed workflow liveaction results - ", extra={'liveaction_db': liveaction_db}
152
            )
153
            return
154
155
        if not self._is_state_object_exist(query_context):
156
            LOG.warning(
157
                'Query for liveaction_id %s is not rescheduled '
158
                'because state object %s has been deleted.',
159
                execution_id,
160
                query_context.id
161
            )
162
163
            return
164
165
        self._query_contexts.put((this_query_time, query_context))
166
167
    def _update_action_results(self, execution_id, status, results):
168
        liveaction_db = LiveAction.get_by_id(execution_id)
169
        if not liveaction_db:
170
            raise Exception('No DB model for liveaction_id: %s' % execution_id)
171
172
        if liveaction_db.status != action_constants.LIVEACTION_STATUS_CANCELED:
173
            liveaction_db.status = status
174
175
        liveaction_db.result = results
176
177
        # Action has completed, record end_timestamp
178
        if (liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES and
179
                not liveaction_db.end_timestamp):
180
            liveaction_db.end_timestamp = date_utils.get_datetime_utc_now()
181
182
        # update liveaction, update actionexecution and then publish update.
183
        updated_liveaction = LiveAction.add_or_update(liveaction_db, publish=False)
184
        executions.update_execution(updated_liveaction)
185
        LiveAction.publish_update(updated_liveaction)
186
187
        return updated_liveaction
188
189
    def _delete_state_object(self, query_context):
190
        state_db = None
191
192
        try:
193
            state_db = ActionExecutionState.get_by_id(query_context.id)
194
        except db_exc.StackStormDBObjectNotFoundError:
195
            pass
196
197
        if state_db is not None:
198
            try:
199
                LOG.info('Clearing state object: %s', state_db)
200
                ActionExecutionState.delete(state_db)
201
            except:
202
                LOG.exception('Failed clearing state object: %s', state_db)
203
204
    def _is_state_object_exist(self, query_context):
205
        state_db = None
206
207
        try:
208
            state_db = ActionExecutionState.get_by_id(query_context.id)
209
        except db_exc.StackStormDBObjectNotFoundError:
210
            pass
211
212
        return (state_db is not None)
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after return.
Loading history...
213
214
    def query(self, execution_id, query_context, last_query_time=None):
215
        """
216
        This is the method individual queriers must implement.
217
        This method should return a tuple of (status, results).
218
219
        status should be one of LIVEACTION_STATUS_SUCCEEDED, LIVEACTION_STATUS_RUNNING,
220
        LIVEACTION_STATUS_FAILED defined in st2common.constants.action.
221
        """
222
        pass
223
224
    def print_stats(self):
225
        LOG.info('\t --- Name: %s, pending queuries: %d', self.__class__.__name__,
226
                 self._query_contexts.qsize())
227
228
229
class QueryContext(object):
230
    def __init__(self, obj_id, execution_id, query_context, query_module):
231
        self.id = obj_id
232
        self.execution_id = execution_id
233
        self.query_context = query_context
234
        self.query_module = query_module
235
236
    @classmethod
237
    def from_model(cls, model):
238
        return QueryContext(str(model.id), str(model.execution_id), model.query_context,
239
                            model.query_module)
240
241
    def __repr__(self):
242
        return ('<QueryContext id=%s,execution_id=%s,query_context=%s>' %
243
                (self.id, self.execution_id, self.query_context))
244