unnecessary parenthesis after keywords
1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
2 | # contributor license agreements. See the NOTICE file distributed with |
||
3 | # this work for additional information regarding copyright ownership. |
||
4 | # The ASF licenses this file to You under the Apache License, Version 2.0 |
||
5 | # (the "License"); you may not use this file except in compliance with |
||
6 | # the License. You may obtain a copy of the License at |
||
7 | # |
||
8 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
9 | # |
||
10 | # Unless required by applicable law or agreed to in writing, software |
||
11 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
13 | # See the License for the specific language governing permissions and |
||
14 | # limitations under the License. |
||
15 | |||
16 | from __future__ import absolute_import |
||
17 | import abc |
||
18 | import eventlet |
||
19 | import six.moves.queue |
||
20 | import six |
||
21 | import time |
||
22 | |||
23 | from oslo_config import cfg |
||
24 | |||
25 | from st2common import log as logging |
||
26 | from st2common.constants import action as action_constants |
||
27 | from st2common.exceptions import db as db_exc |
||
28 | from st2common.persistence.executionstate import ActionExecutionState |
||
29 | from st2common.persistence.liveaction import LiveAction |
||
30 | from st2common.runners import utils as runners_utils |
||
31 | from st2common.services import executions |
||
32 | from st2common.util import date as date_utils |
||
33 | |||
34 | LOG = logging.getLogger(__name__) |
||
35 | |||
36 | __all__ = [ |
||
37 | 'Querier', |
||
38 | 'QueryContext' |
||
39 | ] |
||
40 | |||
41 | |||
42 | @six.add_metaclass(abc.ABCMeta) |
||
43 | class Querier(object): |
||
44 | delete_state_object_on_error = True |
||
45 | |||
46 | def _get_config_value(self, config_option): |
||
47 | config_value = None |
||
48 | |||
49 | if 'results_tracker' in cfg.CONF and config_option in cfg.CONF.results_tracker: |
||
50 | config_value = getattr(cfg.CONF.results_tracker, config_option) |
||
51 | LOG.warning('You are using deprecated config group "results_tracker" for "%s". ' |
||
52 | 'Please use "resultstracker" group instead.', config_option) |
||
53 | |||
54 | if not config_value and config_option in cfg.CONF.resultstracker: |
||
55 | config_value = getattr(cfg.CONF.resultstracker, config_option) |
||
56 | |||
57 | return config_value |
||
58 | |||
59 | def __init__(self): |
||
60 | self._empty_q_sleep_time = self._get_config_value('empty_q_sleep_time') |
||
61 | self._no_workers_sleep_time = self._get_config_value('no_workers_sleep_time') |
||
62 | self._query_interval = self._get_config_value('query_interval') |
||
63 | self._query_thread_pool_size = self._get_config_value('thread_pool_size') |
||
64 | self._query_contexts = six.moves.queue.Queue() |
||
65 | self._thread_pool = eventlet.GreenPool(self._query_thread_pool_size) |
||
66 | self._started = False |
||
67 | |||
68 | def start(self): |
||
69 | self._started = True |
||
70 | while True: |
||
71 | while self._query_contexts.empty(): |
||
72 | eventlet.greenthread.sleep(self._empty_q_sleep_time) |
||
73 | while self._thread_pool.free() <= 0: |
||
74 | eventlet.greenthread.sleep(self._no_workers_sleep_time) |
||
75 | self._fire_queries() |
||
76 | eventlet.sleep(self._query_interval) |
||
77 | |||
78 | def add_queries(self, query_contexts=None): |
||
79 | if query_contexts is None: |
||
80 | query_contexts = [] |
||
81 | LOG.debug('Adding queries to querier: %s' % query_contexts) |
||
82 | for query_context in query_contexts: |
||
83 | self._query_contexts.put((time.time(), query_context)) |
||
84 | |||
85 | def is_started(self): |
||
86 | return self._started |
||
87 | |||
88 | def _fire_queries(self, blocking=False): |
||
89 | if self._thread_pool.free() <= 0: |
||
90 | return |
||
91 | |||
92 | now = time.time() |
||
93 | reschedule_queries = [] |
||
94 | |||
95 | while not self._query_contexts.empty() and self._thread_pool.free() > 0: |
||
96 | (last_query_time, query_context) = self._query_contexts.get_nowait() |
||
97 | if now - last_query_time < self._query_interval: |
||
98 | reschedule_queries.append((last_query_time, query_context)) |
||
99 | continue |
||
100 | else: |
||
101 | if not blocking: |
||
102 | self._thread_pool.spawn( |
||
103 | self._query_and_save_results, |
||
104 | query_context, |
||
105 | last_query_time |
||
106 | ) |
||
107 | # Add an option to block and execute the function directly for unit tests. |
||
108 | else: |
||
109 | self._query_and_save_results( |
||
110 | query_context, |
||
111 | last_query_time |
||
112 | ) |
||
113 | |||
114 | for query in reschedule_queries: |
||
115 | self._query_contexts.put((query[0], query[1])) |
||
116 | |||
117 | def _query_and_save_results(self, query_context, last_query_time=None): |
||
118 | this_query_time = time.time() |
||
119 | execution_id = query_context.execution_id |
||
120 | actual_query_context = query_context.query_context |
||
121 | |||
122 | LOG.debug('Querying external service for results. Context: %s' % actual_query_context) |
||
123 | try: |
||
124 | (status, results) = self.query( |
||
125 | execution_id, |
||
126 | actual_query_context, |
||
127 | last_query_time=last_query_time |
||
128 | ) |
||
129 | except: |
||
130 | LOG.exception('Failed querying results for liveaction_id %s.', execution_id) |
||
131 | if self.delete_state_object_on_error: |
||
132 | self._delete_state_object(query_context) |
||
133 | LOG.debug('Removed state object %s.', query_context) |
||
134 | return |
||
135 | |||
136 | liveaction_db = None |
||
137 | try: |
||
138 | liveaction_db = self._update_action_results(execution_id, status, results) |
||
139 | except Exception: |
||
140 | LOG.exception('Failed updating action results for liveaction_id %s', execution_id) |
||
141 | if self.delete_state_object_on_error: |
||
142 | self._delete_state_object(query_context) |
||
143 | LOG.debug('Removed state object %s.', query_context) |
||
144 | return |
||
145 | |||
146 | if (status in action_constants.LIVEACTION_COMPLETED_STATES or |
||
147 | status == action_constants.LIVEACTION_STATUS_PAUSED): |
||
148 | runners_utils.invoke_post_run(liveaction_db) |
||
149 | self._delete_state_object(query_context) |
||
150 | LOG.debug( |
||
151 | "Detailed workflow liveaction results - ", extra={'liveaction_db': liveaction_db} |
||
152 | ) |
||
153 | return |
||
154 | |||
155 | if not self._is_state_object_exist(query_context): |
||
156 | LOG.warning( |
||
157 | 'Query for liveaction_id %s is not rescheduled ' |
||
158 | 'because state object %s has been deleted.', |
||
159 | execution_id, |
||
160 | query_context.id |
||
161 | ) |
||
162 | |||
163 | return |
||
164 | |||
165 | self._query_contexts.put((this_query_time, query_context)) |
||
166 | |||
167 | def _update_action_results(self, execution_id, status, results): |
||
168 | liveaction_db = LiveAction.get_by_id(execution_id) |
||
169 | if not liveaction_db: |
||
170 | raise Exception('No DB model for liveaction_id: %s' % execution_id) |
||
171 | |||
172 | if liveaction_db.status != action_constants.LIVEACTION_STATUS_CANCELED: |
||
173 | liveaction_db.status = status |
||
174 | |||
175 | liveaction_db.result = results |
||
176 | |||
177 | # Action has completed, record end_timestamp |
||
178 | if (liveaction_db.status in action_constants.LIVEACTION_COMPLETED_STATES and |
||
179 | not liveaction_db.end_timestamp): |
||
180 | liveaction_db.end_timestamp = date_utils.get_datetime_utc_now() |
||
181 | |||
182 | # update liveaction, update actionexecution and then publish update. |
||
183 | updated_liveaction = LiveAction.add_or_update(liveaction_db, publish=False) |
||
184 | executions.update_execution(updated_liveaction) |
||
185 | LiveAction.publish_update(updated_liveaction) |
||
186 | |||
187 | return updated_liveaction |
||
188 | |||
189 | def _delete_state_object(self, query_context): |
||
190 | state_db = None |
||
191 | |||
192 | try: |
||
193 | state_db = ActionExecutionState.get_by_id(query_context.id) |
||
194 | except db_exc.StackStormDBObjectNotFoundError: |
||
195 | pass |
||
196 | |||
197 | if state_db is not None: |
||
198 | try: |
||
199 | LOG.info('Clearing state object: %s', state_db) |
||
200 | ActionExecutionState.delete(state_db) |
||
201 | except: |
||
202 | LOG.exception('Failed clearing state object: %s', state_db) |
||
203 | |||
204 | def _is_state_object_exist(self, query_context): |
||
205 | state_db = None |
||
206 | |||
207 | try: |
||
208 | state_db = ActionExecutionState.get_by_id(query_context.id) |
||
209 | except db_exc.StackStormDBObjectNotFoundError: |
||
210 | pass |
||
211 | |||
212 | return (state_db is not None) |
||
0 ignored issues
–
show
Unused Code
Coding Style
introduced
by
Loading history...
|
|||
213 | |||
214 | def query(self, execution_id, query_context, last_query_time=None): |
||
215 | """ |
||
216 | This is the method individual queriers must implement. |
||
217 | This method should return a tuple of (status, results). |
||
218 | |||
219 | status should be one of LIVEACTION_STATUS_SUCCEEDED, LIVEACTION_STATUS_RUNNING, |
||
220 | LIVEACTION_STATUS_FAILED defined in st2common.constants.action. |
||
221 | """ |
||
222 | pass |
||
223 | |||
224 | def print_stats(self): |
||
225 | LOG.info('\t --- Name: %s, pending queuries: %d', self.__class__.__name__, |
||
226 | self._query_contexts.qsize()) |
||
227 | |||
228 | |||
229 | class QueryContext(object): |
||
230 | def __init__(self, obj_id, execution_id, query_context, query_module): |
||
231 | self.id = obj_id |
||
232 | self.execution_id = execution_id |
||
233 | self.query_context = query_context |
||
234 | self.query_module = query_module |
||
235 | |||
236 | @classmethod |
||
237 | def from_model(cls, model): |
||
238 | return QueryContext(str(model.id), str(model.execution_id), model.query_context, |
||
239 | model.query_module) |
||
240 | |||
241 | def __repr__(self): |
||
242 | return ('<QueryContext id=%s,execution_id=%s,query_context=%s>' % |
||
243 | (self.id, self.execution_id, self.query_context)) |
||
244 |