Passed
Pull Request — master (#3756)
by W
04:56
created

Querier.print_stats()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import abc
17
import eventlet
18
import Queue
19
import six
20
import time
21
22
from oslo_config import cfg
23
24
from st2actions.container.service import RunnerContainerService
25
from st2common import log as logging
26
from st2common.constants import action as action_constants
27
from st2common.persistence.executionstate import ActionExecutionState
28
from st2common.persistence.liveaction import LiveAction
29
from st2common.runners import utils as runners_utils
30
from st2common.services import executions
31
from st2common.util import date as date_utils
32
33
LOG = logging.getLogger(__name__)
34
35
__all__ = [
36
    'Querier',
37
    'QueryContext'
38
]
39
40
41
@six.add_metaclass(abc.ABCMeta)
42
class Querier(object):
43
    delete_state_object_on_error = True
44
45
    def _get_config_value(self, config_option):
46
        config_value = None
47
48
        if 'results_tracker' in cfg.CONF and config_option in cfg.CONF.results_tracker:
49
            config_value = getattr(cfg.CONF.results_tracker, config_option)
50
            LOG.warning('You are using deprecated config group "results_tracker" for "%s". '
51
                        'Please use "resultstracker" group instead.', config_option)
52
53
        if not config_value and config_option in cfg.CONF.resultstracker:
54
            config_value = getattr(cfg.CONF.resultstracker, config_option)
55
56
        return config_value
57
58
    def __init__(self, container_service=None):
59
        self._empty_q_sleep_time = self._get_config_value('empty_q_sleep_time')
60
        self._no_workers_sleep_time = self._get_config_value('no_workers_sleep_time')
61
        self._query_interval = self._get_config_value('query_interval')
62
        self._query_thread_pool_size = self._get_config_value('thread_pool_size')
63
        self._query_contexts = Queue.Queue()
64
        self._thread_pool = eventlet.GreenPool(self._query_thread_pool_size)
65
        self._started = False
66
67
        self.container_service = (
68
            RunnerContainerService()
69
            if not container_service
70
            else container_service
71
        )
72
73
    def start(self):
74
        self._started = True
75
        while True:
76
            while self._query_contexts.empty():
77
                eventlet.greenthread.sleep(self._empty_q_sleep_time)
78
            while self._thread_pool.free() <= 0:
79
                eventlet.greenthread.sleep(self._no_workers_sleep_time)
80
            self._fire_queries()
81
            eventlet.sleep(self._query_interval)
82
83
    def add_queries(self, query_contexts=None):
84
        if query_contexts is None:
85
            query_contexts = []
86
        LOG.debug('Adding queries to querier: %s' % query_contexts)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
87
        for query_context in query_contexts:
88
            self._query_contexts.put((time.time(), query_context))
89
90
    def is_started(self):
91
        return self._started
92
93
    def _fire_queries(self):
94
        if self._thread_pool.free() <= 0:
95
            return
96
97
        now = time.time()
98
        reschedule_queries = []
99
100
        while not self._query_contexts.empty() and self._thread_pool.free() > 0:
101
            (last_query_time, query_context) = self._query_contexts.get_nowait()
102
            if now - last_query_time < self._query_interval:
103
                reschedule_queries.append((last_query_time, query_context))
104
                continue
105
            else:
106
                self._thread_pool.spawn(
107
                    self._query_and_save_results,
108
                    query_context,
109
                    last_query_time
110
                )
111
112
        for query in reschedule_queries:
113
            self._query_contexts.put((query[0], query[1]))
114
115
    def _query_and_save_results(self, query_context, last_query_time=None):
116
        this_query_time = time.time()
117
        execution_id = query_context.execution_id
118
        actual_query_context = query_context.query_context
119
120
        LOG.debug('Querying external service for results. Context: %s' % actual_query_context)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
121
        try:
122
            (status, results) = self.query(
123
                execution_id,
124
                actual_query_context,
125
                last_query_time=last_query_time
126
            )
127
        except:
128
            LOG.exception('Failed querying results for liveaction_id %s.', execution_id)
129
            if self.delete_state_object_on_error:
130
                self._delete_state_object(query_context)
131
                LOG.debug('Removed state object %s.', query_context)
132
            return
133
134
        liveaction_db = None
135
        try:
136
            liveaction_db = self._update_action_results(execution_id, status, results)
137
        except Exception:
138
            LOG.exception('Failed updating action results for liveaction_id %s', execution_id)
139
            if self.delete_state_object_on_error:
140
                self._delete_state_object(query_context)
141
                LOG.debug('Removed state object %s.', query_context)
142
            return
143
144
        if (status in action_constants.LIVEACTION_COMPLETED_STATES or
145
                status == action_constants.LIVEACTION_STATUS_PAUSED):
146
            runners_utils.invoke_post_run(liveaction_db)
147
            self._delete_state_object(query_context)
148
149
            return
150
151
        self._query_contexts.put((this_query_time, query_context))
152
153
    def _update_action_results(self, execution_id, status, results):
154
        liveaction_db = LiveAction.get_by_id(execution_id)
155
        if not liveaction_db:
156
            raise Exception('No DB model for liveaction_id: %s' % execution_id)
157
158
        if liveaction_db.status != action_constants.LIVEACTION_STATUS_CANCELED:
159
            liveaction_db.status = status
160
161
        liveaction_db.result = results
162
163
        # Action has completed, record end_timestamp
164
        if (liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES and
165
                not liveaction_db.end_timestamp):
166
            liveaction_db.end_timestamp = date_utils.get_datetime_utc_now()
167
168
        # update liveaction, update actionexecution and then publish update.
169
        updated_liveaction = LiveAction.add_or_update(liveaction_db, publish=False)
170
        executions.update_execution(updated_liveaction)
171
        LiveAction.publish_update(updated_liveaction)
172
173
        return updated_liveaction
174
175
    def _delete_state_object(self, query_context):
176
        state_db = ActionExecutionState.get_by_id(query_context.id)
177
        if state_db is not None:
178
            try:
179
                LOG.info('Clearing state object: %s', state_db)
180
                ActionExecutionState.delete(state_db)
181
            except:
182
                LOG.exception('Failed clearing state object: %s', state_db)
183
184
    def query(self, execution_id, query_context, last_query_time=None):
185
        """
186
        This is the method individual queriers must implement.
187
        This method should return a tuple of (status, results).
188
189
        status should be one of LIVEACTION_STATUS_SUCCEEDED, LIVEACTION_STATUS_RUNNING,
190
        LIVEACTION_STATUS_FAILED defined in st2common.constants.action.
191
        """
192
        pass
193
194
    def print_stats(self):
195
        LOG.info('\t --- Name: %s, pending queuries: %d', self.__class__.__name__,
196
                 self._query_contexts.qsize())
197
198
199
class QueryContext(object):
200
    def __init__(self, obj_id, execution_id, query_context, query_module):
201
        self.id = obj_id
202
        self.execution_id = execution_id
203
        self.query_context = query_context
204
        self.query_module = query_module
205
206
    @classmethod
207
    def from_model(cls, model):
208
        return QueryContext(str(model.id), str(model.execution_id), model.query_context,
209
                            model.query_module)
210
211
    def __repr__(self):
212
        return ('<QueryContext id=%s,execution_id=%s,query_context=%s>' %
213
                (self.id, self.execution_id, self.query_context))
214