Passed
Push — master ( 925a01...b3f9b5 )
by Lakshmi
02:56
created

Querier.__init__()   B

Complexity

Conditions 6

Size

Total Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 6
c 1
b 0
f 0
dl 0
loc 33
rs 7.5384
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import abc
17
import eventlet
18
import Queue
19
import six
20
import time
21
22
from oslo_config import cfg
23
24
from st2actions.container.service import RunnerContainerService
25
from st2common import log as logging
26
from st2common.constants import action as action_constants
27
from st2common.persistence.executionstate import ActionExecutionState
28
from st2common.persistence.liveaction import LiveAction
29
from st2common.runners import utils as runners_utils
30
from st2common.services import executions
31
from st2common.util import date as date_utils
32
33
LOG = logging.getLogger(__name__)
34
35
__all__ = [
36
    'Querier',
37
    'QueryContext'
38
]
39
40
41
@six.add_metaclass(abc.ABCMeta)
42
class Querier(object):
43
    delete_state_object_on_error = True
44
45
    def __init__(self, empty_q_sleep_time=5,
46
                 no_workers_sleep_time=1, container_service=None):
47
48
        # Let's check to see if deprecated config group ``results_tracker`` is being used.
49
        try:
50
            query_interval = cfg.CONF.results_tracker.query_interval
51
            LOG.warning('You are using deprecated config group ``results_tracker``.' +
52
                        '\nPlease use ``resultstracker`` group instead.')
53
        except:
54
            pass
55
56
        try:
57
            thread_pool_size = cfg.CONF.results_tracker.thread_pool_size
58
            LOG.warning('You are using deprecated config group ``results_tracker``.' +
59
                        '\nPlease use ``resultstracker`` group instead.')
60
        except:
61
            pass
62
63
        if not query_interval:
64
            query_interval = cfg.CONF.resultstracker.query_interval
65
        if not thread_pool_size:
66
            thread_pool_size = cfg.CONF.resultstracker.thread_pool_size
67
68
        self._query_thread_pool_size = thread_pool_size
69
        self._query_interval = query_interval
70
        self._query_contexts = Queue.Queue()
71
        self._thread_pool = eventlet.GreenPool(self._query_thread_pool_size)
72
        self._empty_q_sleep_time = empty_q_sleep_time
73
        self._no_workers_sleep_time = no_workers_sleep_time
74
        if not container_service:
75
            container_service = RunnerContainerService()
76
        self.container_service = container_service
77
        self._started = False
78
79
    def start(self):
80
        self._started = True
81
        while True:
82
            while self._query_contexts.empty():
83
                eventlet.greenthread.sleep(self._empty_q_sleep_time)
84
            while self._thread_pool.free() <= 0:
85
                eventlet.greenthread.sleep(self._no_workers_sleep_time)
86
            self._fire_queries()
87
            eventlet.sleep(self._query_interval)
88
89
    def add_queries(self, query_contexts=None):
90
        if query_contexts is None:
91
            query_contexts = []
92
        LOG.debug('Adding queries to querier: %s' % query_contexts)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
93
        for query_context in query_contexts:
94
            self._query_contexts.put((time.time(), query_context))
95
96
    def is_started(self):
97
        return self._started
98
99
    def _fire_queries(self):
100
        if self._thread_pool.free() <= 0:
101
            return
102
103
        now = time.time()
104
        reschedule_queries = []
105
106
        while not self._query_contexts.empty() and self._thread_pool.free() > 0:
107
            (last_query_time, query_context) = self._query_contexts.get_nowait()
108
            if now - last_query_time < self._query_interval:
109
                reschedule_queries.append((last_query_time, query_context))
110
                continue
111
            else:
112
                self._thread_pool.spawn(
113
                    self._query_and_save_results,
114
                    query_context,
115
                    last_query_time
116
                )
117
118
        for query in reschedule_queries:
119
            self._query_contexts.put((query[0], query[1]))
120
121
    def _query_and_save_results(self, query_context, last_query_time=None):
122
        this_query_time = time.time()
123
        execution_id = query_context.execution_id
124
        actual_query_context = query_context.query_context
125
126
        LOG.debug('Querying external service for results. Context: %s' % actual_query_context)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
127
        try:
128
            (status, results) = self.query(
129
                execution_id,
130
                actual_query_context,
131
                last_query_time=last_query_time
132
            )
133
        except:
134
            LOG.exception('Failed querying results for liveaction_id %s.', execution_id)
135
            if self.delete_state_object_on_error:
136
                self._delete_state_object(query_context)
137
                LOG.debug('Removed state object %s.', query_context)
138
            return
139
140
        liveaction_db = None
141
        try:
142
            liveaction_db = self._update_action_results(execution_id, status, results)
143
        except Exception:
144
            LOG.exception('Failed updating action results for liveaction_id %s', execution_id)
145
            if self.delete_state_object_on_error:
146
                self._delete_state_object(query_context)
147
                LOG.debug('Removed state object %s.', query_context)
148
            return
149
150
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
151
            if status != action_constants.LIVEACTION_STATUS_CANCELED:
152
                runners_utils.invoke_post_run(liveaction_db)
153
154
            self._delete_state_object(query_context)
155
156
            return
157
158
        self._query_contexts.put((this_query_time, query_context))
159
160
    def _update_action_results(self, execution_id, status, results):
161
        liveaction_db = LiveAction.get_by_id(execution_id)
162
        if not liveaction_db:
163
            raise Exception('No DB model for liveaction_id: %s' % execution_id)
164
165
        if liveaction_db.status != action_constants.LIVEACTION_STATUS_CANCELED:
166
            liveaction_db.status = status
167
168
        liveaction_db.result = results
169
170
        # Action has completed, record end_timestamp
171
        if (liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES and
172
                not liveaction_db.end_timestamp):
173
            liveaction_db.end_timestamp = date_utils.get_datetime_utc_now()
174
175
        # update liveaction, update actionexecution and then publish update.
176
        updated_liveaction = LiveAction.add_or_update(liveaction_db, publish=False)
177
        executions.update_execution(updated_liveaction)
178
        LiveAction.publish_update(updated_liveaction)
179
180
        return updated_liveaction
181
182
    def _delete_state_object(self, query_context):
183
        state_db = ActionExecutionState.get_by_id(query_context.id)
184
        if state_db is not None:
185
            try:
186
                LOG.info('Clearing state object: %s', state_db)
187
                ActionExecutionState.delete(state_db)
188
            except:
189
                LOG.exception('Failed clearing state object: %s', state_db)
190
191
    def query(self, execution_id, query_context, last_query_time=None):
192
        """
193
        This is the method individual queriers must implement.
194
        This method should return a tuple of (status, results).
195
196
        status should be one of LIVEACTION_STATUS_SUCCEEDED, LIVEACTION_STATUS_RUNNING,
197
        LIVEACTION_STATUS_FAILED defined in st2common.constants.action.
198
        """
199
        pass
200
201
    def print_stats(self):
202
        LOG.info('\t --- Name: %s, pending queuries: %d', self.__class__.__name__,
203
                 self._query_contexts.qsize())
204
205
206
class QueryContext(object):
207
    def __init__(self, obj_id, execution_id, query_context, query_module):
208
        self.id = obj_id
209
        self.execution_id = execution_id
210
        self.query_context = query_context
211
        self.query_module = query_module
212
213
    @classmethod
214
    def from_model(cls, model):
215
        return QueryContext(str(model.id), str(model.execution_id), model.query_context,
216
                            model.query_module)
217
218
    def __repr__(self):
219
        return ('<QueryContext id=%s,execution_id=%s,query_context=%s>' %
220
                (self.id, self.execution_id, self.query_context))
221