Passed
Pull Request — master (#3505)
by Lakshmi
10:06 queued 04:20
created

Querier._update_action_results()   B

Complexity

Conditions 5

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
dl 0
loc 21
rs 8.439
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import abc
17
import eventlet
18
import Queue
19
import six
20
import time
21
22
from oslo_config import cfg
23
24
from st2actions.container.service import RunnerContainerService
25
from st2common import log as logging
26
from st2common.constants import action as action_constants
27
from st2common.persistence.executionstate import ActionExecutionState
28
from st2common.persistence.liveaction import LiveAction
29
from st2common.runners import utils as runners_utils
30
from st2common.services import executions
31
from st2common.util import date as date_utils
32
33
LOG = logging.getLogger(__name__)
34
35
__all__ = [
36
    'Querier',
37
    'QueryContext'
38
]
39
40
41
@six.add_metaclass(abc.ABCMeta)
42
class Querier(object):
43
    delete_state_object_on_error = True
44
45
    def __init__(self, empty_q_sleep_time=5,
46
                 no_workers_sleep_time=1, container_service=None):
47
48
        query_interval = cfg.CONF.resultstracker.query_interval
49
        thread_pool_size = cfg.CONF.resultstracker.thread_pool_size
50
51
        # Let's check to see if deprecated config group ``results_tracker`` is being used.
52
        try:
53
            query_interval = cfg.CONF.results_tracker.query_interval
54
            thread_pool_size = cfg.CONF.results_tracker.thread_pool_size
55
            LOG.warning('You are using deprecated config group ``results_tracker``.' +
56
                        '\nPlease use ``resultstracker`` group instead.')
57
        except:
58
            pass
59
60
        self._query_thread_pool_size = thread_pool_size
61
        self._query_interval = query_interval
62
        self._query_contexts = Queue.Queue()
63
        self._thread_pool = eventlet.GreenPool(self._query_thread_pool_size)
64
        self._empty_q_sleep_time = empty_q_sleep_time
65
        self._no_workers_sleep_time = no_workers_sleep_time
66
        if not container_service:
67
            container_service = RunnerContainerService()
68
        self.container_service = container_service
69
        self._started = False
70
71
    def start(self):
72
        self._started = True
73
        while True:
74
            while self._query_contexts.empty():
75
                eventlet.greenthread.sleep(self._empty_q_sleep_time)
76
            while self._thread_pool.free() <= 0:
77
                eventlet.greenthread.sleep(self._no_workers_sleep_time)
78
            self._fire_queries()
79
            eventlet.sleep(self._query_interval)
80
81
    def add_queries(self, query_contexts=None):
82
        if query_contexts is None:
83
            query_contexts = []
84
        LOG.debug('Adding queries to querier: %s' % query_contexts)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
85
        for query_context in query_contexts:
86
            self._query_contexts.put((time.time(), query_context))
87
88
    def is_started(self):
89
        return self._started
90
91
    def _fire_queries(self):
92
        if self._thread_pool.free() <= 0:
93
            return
94
95
        now = time.time()
96
        reschedule_queries = []
97
98
        while not self._query_contexts.empty() and self._thread_pool.free() > 0:
99
            (last_query_time, query_context) = self._query_contexts.get_nowait()
100
            if now - last_query_time < self._query_interval:
101
                reschedule_queries.append((last_query_time, query_context))
102
                continue
103
            else:
104
                self._thread_pool.spawn(
105
                    self._query_and_save_results,
106
                    query_context,
107
                    last_query_time
108
                )
109
110
        for query in reschedule_queries:
111
            self._query_contexts.put((query[0], query[1]))
112
113
    def _query_and_save_results(self, query_context, last_query_time=None):
114
        this_query_time = time.time()
115
        execution_id = query_context.execution_id
116
        actual_query_context = query_context.query_context
117
118
        LOG.debug('Querying external service for results. Context: %s' % actual_query_context)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
119
        try:
120
            (status, results) = self.query(
121
                execution_id,
122
                actual_query_context,
123
                last_query_time=last_query_time
124
            )
125
        except:
126
            LOG.exception('Failed querying results for liveaction_id %s.', execution_id)
127
            if self.delete_state_object_on_error:
128
                self._delete_state_object(query_context)
129
                LOG.debug('Removed state object %s.', query_context)
130
            return
131
132
        liveaction_db = None
133
        try:
134
            liveaction_db = self._update_action_results(execution_id, status, results)
135
        except Exception:
136
            LOG.exception('Failed updating action results for liveaction_id %s', execution_id)
137
            if self.delete_state_object_on_error:
138
                self._delete_state_object(query_context)
139
                LOG.debug('Removed state object %s.', query_context)
140
            return
141
142
        if status in action_constants.LIVEACTION_COMPLETED_STATES:
143
            if status != action_constants.LIVEACTION_STATUS_CANCELED:
144
                runners_utils.invoke_post_run(liveaction_db)
145
146
            self._delete_state_object(query_context)
147
148
            return
149
150
        self._query_contexts.put((this_query_time, query_context))
151
152
    def _update_action_results(self, execution_id, status, results):
153
        liveaction_db = LiveAction.get_by_id(execution_id)
154
        if not liveaction_db:
155
            raise Exception('No DB model for liveaction_id: %s' % execution_id)
156
157
        if liveaction_db.status != action_constants.LIVEACTION_STATUS_CANCELED:
158
            liveaction_db.status = status
159
160
        liveaction_db.result = results
161
162
        # Action has completed, record end_timestamp
163
        if (liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES and
164
                not liveaction_db.end_timestamp):
165
            liveaction_db.end_timestamp = date_utils.get_datetime_utc_now()
166
167
        # update liveaction, update actionexecution and then publish update.
168
        updated_liveaction = LiveAction.add_or_update(liveaction_db, publish=False)
169
        executions.update_execution(updated_liveaction)
170
        LiveAction.publish_update(updated_liveaction)
171
172
        return updated_liveaction
173
174
    def _delete_state_object(self, query_context):
175
        state_db = ActionExecutionState.get_by_id(query_context.id)
176
        if state_db is not None:
177
            try:
178
                LOG.info('Clearing state object: %s', state_db)
179
                ActionExecutionState.delete(state_db)
180
            except:
181
                LOG.exception('Failed clearing state object: %s', state_db)
182
183
    def query(self, execution_id, query_context, last_query_time=None):
184
        """
185
        This is the method individual queriers must implement.
186
        This method should return a tuple of (status, results).
187
188
        status should be one of LIVEACTION_STATUS_SUCCEEDED, LIVEACTION_STATUS_RUNNING,
189
        LIVEACTION_STATUS_FAILED defined in st2common.constants.action.
190
        """
191
        pass
192
193
    def print_stats(self):
194
        LOG.info('\t --- Name: %s, pending queuries: %d', self.__class__.__name__,
195
                 self._query_contexts.qsize())
196
197
198
class QueryContext(object):
199
    def __init__(self, obj_id, execution_id, query_context, query_module):
200
        self.id = obj_id
201
        self.execution_id = execution_id
202
        self.query_context = query_context
203
        self.query_module = query_module
204
205
    @classmethod
206
    def from_model(cls, model):
207
        return QueryContext(str(model.id), str(model.execution_id), model.query_context,
208
                            model.query_module)
209
210
    def __repr__(self):
211
        return ('<QueryContext id=%s,execution_id=%s,query_context=%s>' %
212
                (self.id, self.execution_id, self.query_context))
213