Passed
Pull Request — master (#3507)
by W
04:31
created

Querier._query_and_save_results()   D

Complexity

Conditions 8

Size

Total Lines 40

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
dl 0
loc 40
rs 4
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import abc
17
import eventlet
18
import Queue
19
import six
20
import time
21
22
from oslo_config import cfg
23
24
from st2actions.container.service import RunnerContainerService
25
from st2common import log as logging
26
from st2common.constants import action as action_constants
27
from st2common.persistence.executionstate import ActionExecutionState
28
from st2common.persistence.liveaction import LiveAction
29
from st2common.runners import utils as runners_utils
30
from st2common.services import executions
31
from st2common.util import date as date_utils
32
33
LOG = logging.getLogger(__name__)
34
35
__all__ = [
36
    'Querier',
37
    'QueryContext'
38
]
39
40
41
@six.add_metaclass(abc.ABCMeta)
42
class Querier(object):
43
    delete_state_object_on_error = True
44
45
    def __init__(self, empty_q_sleep_time=5,
46
                 no_workers_sleep_time=1, container_service=None):
47
        self._query_thread_pool_size = cfg.CONF.results_tracker.thread_pool_size
48
        self._query_interval = cfg.CONF.results_tracker.query_interval
49
        self._query_contexts = Queue.Queue()
50
        self._thread_pool = eventlet.GreenPool(self._query_thread_pool_size)
51
        self._empty_q_sleep_time = empty_q_sleep_time
52
        self._no_workers_sleep_time = no_workers_sleep_time
53
        if not container_service:
54
            container_service = RunnerContainerService()
55
        self.container_service = container_service
56
        self._started = False
57
58
    def start(self):
59
        self._started = True
60
        while True:
61
            while self._query_contexts.empty():
62
                eventlet.greenthread.sleep(self._empty_q_sleep_time)
63
            while self._thread_pool.free() <= 0:
64
                eventlet.greenthread.sleep(self._no_workers_sleep_time)
65
            self._fire_queries()
66
            eventlet.sleep(self._query_interval)
67
68
    def add_queries(self, query_contexts=None):
69
        if query_contexts is None:
70
            query_contexts = []
71
        LOG.debug('Adding queries to querier: %s' % query_contexts)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
72
        for query_context in query_contexts:
73
            self._query_contexts.put((time.time(), query_context))
74
75
    def is_started(self):
76
        return self._started
77
78
    def _fire_queries(self):
79
        if self._thread_pool.free() <= 0:
80
            return
81
82
        now = time.time()
83
        reschedule_queries = []
84
85
        while not self._query_contexts.empty() and self._thread_pool.free() > 0:
86
            (last_query_time, query_context) = self._query_contexts.get_nowait()
87
            if now - last_query_time < self._query_interval:
88
                reschedule_queries.append((last_query_time, query_context))
89
                continue
90
            else:
91
                self._thread_pool.spawn(
92
                    self._query_and_save_results,
93
                    query_context,
94
                    last_query_time
95
                )
96
97
        for query in reschedule_queries:
98
            self._query_contexts.put((query[0], query[1]))
99
100
    def _query_and_save_results(self, query_context, last_query_time=None):
101
        this_query_time = time.time()
102
        execution_id = query_context.execution_id
103
        actual_query_context = query_context.query_context
104
105
        LOG.debug('Querying external service for results. Context: %s' % actual_query_context)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
106
        try:
107
            (status, results) = self.query(
108
                execution_id,
109
                actual_query_context,
110
                last_query_time=last_query_time
111
            )
112
        except:
113
            LOG.exception('Failed querying results for liveaction_id %s.', execution_id)
114
            if self.delete_state_object_on_error:
115
                self._delete_state_object(query_context)
116
                LOG.debug('Removed state object %s.', query_context)
117
            return
118
119
        liveaction_db = None
120
        try:
121
            liveaction_db = self._update_action_results(execution_id, status, results)
122
        except Exception:
123
            LOG.exception('Failed updating action results for liveaction_id %s', execution_id)
124
            if self.delete_state_object_on_error:
125
                self._delete_state_object(query_context)
126
                LOG.debug('Removed state object %s.', query_context)
127
            return
128
129
        if (status in action_constants.LIVEACTION_COMPLETED_STATES or
130
                status == action_constants.LIVEACTION_STATUS_PAUSED):
131
132
            if status != action_constants.LIVEACTION_STATUS_CANCELED:
133
                runners_utils.invoke_post_run(liveaction_db)
134
135
            self._delete_state_object(query_context)
136
137
            return
138
139
        self._query_contexts.put((this_query_time, query_context))
140
141
    def _update_action_results(self, execution_id, status, results):
142
        liveaction_db = LiveAction.get_by_id(execution_id)
143
        if not liveaction_db:
144
            raise Exception('No DB model for liveaction_id: %s' % execution_id)
145
146
        if liveaction_db.status != action_constants.LIVEACTION_STATUS_CANCELED:
147
            liveaction_db.status = status
148
149
        liveaction_db.result = results
150
151
        # Action has completed, record end_timestamp
152
        if (liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES and
153
                not liveaction_db.end_timestamp):
154
            liveaction_db.end_timestamp = date_utils.get_datetime_utc_now()
155
156
        # update liveaction, update actionexecution and then publish update.
157
        updated_liveaction = LiveAction.add_or_update(liveaction_db, publish=False)
158
        executions.update_execution(updated_liveaction)
159
        LiveAction.publish_update(updated_liveaction)
160
161
        return updated_liveaction
162
163
    def _delete_state_object(self, query_context):
164
        state_db = ActionExecutionState.get_by_id(query_context.id)
165
        if state_db is not None:
166
            try:
167
                LOG.info('Clearing state object: %s', state_db)
168
                ActionExecutionState.delete(state_db)
169
            except:
170
                LOG.exception('Failed clearing state object: %s', state_db)
171
172
    def query(self, execution_id, query_context, last_query_time=None):
173
        """
174
        This is the method individual queriers must implement.
175
        This method should return a tuple of (status, results).
176
177
        status should be one of LIVEACTION_STATUS_SUCCEEDED, LIVEACTION_STATUS_RUNNING,
178
        LIVEACTION_STATUS_FAILED defined in st2common.constants.action.
179
        """
180
        pass
181
182
    def print_stats(self):
183
        LOG.info('\t --- Name: %s, pending queuries: %d', self.__class__.__name__,
184
                 self._query_contexts.qsize())
185
186
187
class QueryContext(object):
188
    def __init__(self, obj_id, execution_id, query_context, query_module):
189
        self.id = obj_id
190
        self.execution_id = execution_id
191
        self.query_context = query_context
192
        self.query_module = query_module
193
194
    @classmethod
195
    def from_model(cls, model):
196
        return QueryContext(str(model.id), str(model.execution_id), model.query_context,
197
                            model.query_module)
198
199
    def __repr__(self):
200
        return ('<QueryContext id=%s,execution_id=%s,query_context=%s>' %
201
                (self.id, self.execution_id, self.query_context))
202