1 | from __future__ import absolute_import |
||
2 | import random |
||
3 | import uuid |
||
4 | |||
5 | from mistralclient.api import base as mistralclient_base |
||
6 | from mistralclient.api import client as mistral |
||
7 | from oslo_config import cfg |
||
8 | import eventlet |
||
9 | import retrying |
||
10 | |||
11 | from st2common.query.base import Querier |
||
12 | from st2common.constants import action as action_constants |
||
13 | from st2common.exceptions import resultstracker as exceptions |
||
14 | from st2common import log as logging |
||
15 | from st2common.persistence.execution import ActionExecution |
||
16 | from st2common.util import action_db as action_utils |
||
17 | from st2common.util import jsonify |
||
18 | from st2common.util.url import get_url_without_trailing_slash |
||
19 | from st2common.util.workflow import mistral as utils |
||
20 | |||
21 | |||
22 | LOG = logging.getLogger(__name__) |
||
23 | |||
24 | DONE_STATES = { |
||
25 | 'ERROR': action_constants.LIVEACTION_STATUS_FAILED, |
||
26 | 'SUCCESS': action_constants.LIVEACTION_STATUS_SUCCEEDED, |
||
27 | 'CANCELLED': action_constants.LIVEACTION_STATUS_CANCELED, |
||
28 | 'PAUSED': action_constants.LIVEACTION_STATUS_PAUSED |
||
29 | } |
||
30 | |||
31 | ACTIVE_STATES = { |
||
32 | 'RUNNING': action_constants.LIVEACTION_STATUS_RUNNING |
||
33 | } |
||
34 | |||
35 | CANCELED_STATES = [ |
||
36 | action_constants.LIVEACTION_STATUS_CANCELED, |
||
37 | action_constants.LIVEACTION_STATUS_CANCELING |
||
38 | ] |
||
39 | |||
40 | PAUSED_STATES = [ |
||
41 | action_constants.LIVEACTION_STATUS_PAUSED, |
||
42 | action_constants.LIVEACTION_STATUS_PAUSING |
||
43 | ] |
||
44 | |||
45 | RESUMING_STATES = [ |
||
46 | action_constants.LIVEACTION_STATUS_RESUMING |
||
47 | ] |
||
48 | |||
49 | |||
50 | def get_instance(): |
||
51 | return MistralResultsQuerier(str(uuid.uuid4())) |
||
52 | |||
53 | |||
54 | class MistralResultsQuerier(Querier): |
||
55 | delete_state_object_on_error = False |
||
56 | |||
57 | def __init__(self, id, *args, **kwargs): |
||
0 ignored issues
–
show
|
|||
58 | super(MistralResultsQuerier, self).__init__(*args, **kwargs) |
||
59 | self._base_url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url) |
||
60 | self._client = mistral.client( |
||
61 | mistral_url=self._base_url, |
||
62 | username=cfg.CONF.mistral.keystone_username, |
||
63 | api_key=cfg.CONF.mistral.keystone_password, |
||
64 | project_name=cfg.CONF.mistral.keystone_project_name, |
||
65 | auth_url=cfg.CONF.mistral.keystone_auth_url, |
||
66 | cacert=cfg.CONF.mistral.cacert, |
||
67 | insecure=cfg.CONF.mistral.insecure) |
||
68 | self._jitter = cfg.CONF.mistral.jitter_interval |
||
69 | |||
70 | @retrying.retry( |
||
71 | retry_on_exception=utils.retry_on_exceptions, |
||
72 | wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec, |
||
73 | wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec, |
||
74 | stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec) |
||
75 | def query(self, execution_id, query_context, last_query_time=None): |
||
76 | """ |
||
77 | Queries mistral for workflow results using v2 APIs. |
||
78 | :param execution_id: st2 execution_id (context to be used for logging/audit) |
||
79 | :type execution_id: ``str`` |
||
80 | :param query_context: context for the query to be made to mistral. This contains mistral |
||
81 | execution id. |
||
82 | :type query_context: ``object`` |
||
83 | :param last_query_time: Timestamp of last query. |
||
84 | :type last_query_time: ``float`` |
||
85 | :rtype: (``str``, ``object``) |
||
86 | """ |
||
87 | # Retrieve liveaction_db to append new result to existing result. |
||
88 | liveaction_db = action_utils.get_liveaction_by_id(execution_id) |
||
89 | |||
90 | mistral_exec_id = query_context.get('mistral', {}).get('execution_id', None) |
||
91 | if not mistral_exec_id: |
||
92 | raise Exception('[%s] Missing mistral workflow execution ID in query context. %s', |
||
93 | execution_id, query_context) |
||
94 | |||
95 | LOG.info('[%s] Querying mistral execution %s...', execution_id, mistral_exec_id) |
||
96 | |||
97 | try: |
||
98 | wf_result = self._get_workflow_result(execution_id, mistral_exec_id) |
||
99 | |||
100 | stream = getattr(liveaction_db, 'result', {}) |
||
101 | |||
102 | wf_tasks_result = self._get_workflow_tasks( |
||
103 | execution_id, |
||
104 | mistral_exec_id, |
||
105 | recorded_tasks=stream.get('tasks', []) |
||
106 | ) |
||
107 | |||
108 | result = self._format_query_result( |
||
109 | liveaction_db.result, |
||
110 | wf_result, |
||
111 | wf_tasks_result |
||
112 | ) |
||
113 | except exceptions.ReferenceNotFoundError as exc: |
||
114 | LOG.exception('[%s] Unable to find reference.', execution_id) |
||
115 | return (action_constants.LIVEACTION_STATUS_FAILED, str(exc)) |
||
116 | except Exception: |
||
117 | LOG.exception('[%s] Unable to fetch mistral workflow result and tasks. %s', |
||
118 | execution_id, query_context) |
||
119 | raise |
||
120 | |||
121 | # Retrieve liveaction_db again in case state has changed |
||
122 | # while the querier get results from mistral API above. |
||
123 | liveaction_db = action_utils.get_liveaction_by_id(execution_id) |
||
124 | |||
125 | status = self._determine_execution_status( |
||
126 | liveaction_db, |
||
127 | result['extra']['state'], |
||
128 | result['tasks'] |
||
129 | ) |
||
130 | |||
131 | LOG.info('[%s] Determined execution status: %s', execution_id, status) |
||
132 | LOG.debug('[%s] Combined execution result: %s', execution_id, result) |
||
133 | |||
134 | return (status, result) |
||
135 | |||
136 | def _get_workflow_result(self, st2_exec_id, mistral_exec_id): |
||
137 | """ |
||
138 | Returns the workflow status and output. Mistral workflow status will be converted |
||
139 | to st2 action status. |
||
140 | :param st2_exec_id: st2 execution ID |
||
141 | :type st2_exec_id: ``str`` |
||
142 | :param mistral_exec_id: Mistral execution ID |
||
143 | :type mistral_exec_id: ``str`` |
||
144 | :rtype: (``str``, ``dict``) |
||
145 | """ |
||
146 | try: |
||
147 | jitter = random.uniform(0, self._jitter) |
||
148 | eventlet.sleep(jitter) |
||
149 | execution = self._client.executions.get(mistral_exec_id) |
||
150 | except mistralclient_base.APIException as mistral_exc: |
||
151 | if 'not found' in str(mistral_exc): |
||
152 | raise exceptions.ReferenceNotFoundError(str(mistral_exc)) |
||
153 | raise mistral_exc |
||
154 | |||
155 | result = jsonify.try_loads(execution.output) if execution.state in DONE_STATES else {} |
||
156 | |||
157 | result['extra'] = { |
||
158 | 'state': execution.state, |
||
159 | 'state_info': execution.state_info |
||
160 | } |
||
161 | |||
162 | LOG.info( |
||
163 | '[%s] Query returned status "%s" for mistral execution %s.', |
||
164 | st2_exec_id, |
||
165 | execution.state, |
||
166 | mistral_exec_id |
||
167 | ) |
||
168 | |||
169 | return result |
||
170 | |||
171 | def _get_workflow_tasks(self, st2_exec_id, mistral_exec_id, recorded_tasks=None): |
||
172 | """ |
||
173 | Returns the list of tasks for a workflow execution. |
||
174 | :param st2_exec_id: st2 execution ID |
||
175 | :type st2_exec_id: ``str`` |
||
176 | :param mistral_exec_id: Mistral execution ID |
||
177 | :type mistral_exec_id: ``str`` |
||
178 | :param recorded_tasks: The list of tasks recorded in the liveaction result. |
||
179 | :rtype: ``list`` |
||
180 | """ |
||
181 | result = [] |
||
182 | queries = [] |
||
183 | |||
184 | if recorded_tasks is None: |
||
185 | recorded_tasks = [] |
||
186 | |||
187 | try: |
||
188 | wf_tasks = self._client.tasks.list(workflow_execution_id=mistral_exec_id) |
||
189 | |||
190 | for wf_task in wf_tasks: |
||
191 | recorded = list([x for x in recorded_tasks if x['id'] == wf_task.id]) |
||
192 | |||
193 | if (not recorded or |
||
194 | recorded[0].get('state') != wf_task.state or |
||
195 | str(recorded[0].get('created_at')) != wf_task.created_at or |
||
196 | str(recorded[0].get('updated_at')) != wf_task.updated_at): |
||
197 | queries.append(wf_task) |
||
198 | |||
199 | target_task_names = [wf_task.name for wf_task in queries] |
||
200 | |||
201 | LOG.info( |
||
202 | '[%s] Querying the following tasks for mistral execution %s: %s', |
||
203 | st2_exec_id, |
||
204 | mistral_exec_id, |
||
205 | ', '.join(target_task_names) if target_task_names else 'None' |
||
206 | ) |
||
207 | |||
208 | for wf_task in queries: |
||
209 | result.append(self._client.tasks.get(wf_task.id)) |
||
210 | |||
211 | # Lets not blast requests but just space it out for better CPU profile |
||
212 | jitter = random.uniform(0, self._jitter) |
||
213 | eventlet.sleep(jitter) |
||
214 | except mistralclient_base.APIException as mistral_exc: |
||
215 | if 'not found' in str(mistral_exc): |
||
216 | raise exceptions.ReferenceNotFoundError(str(mistral_exc)) |
||
217 | raise mistral_exc |
||
218 | |||
219 | return [self._format_task_result(task=entry.to_dict()) for entry in result] |
||
220 | |||
221 | def _format_task_result(self, task): |
||
222 | """ |
||
223 | Format task result to follow the unified workflow result format. |
||
224 | """ |
||
225 | result = { |
||
226 | 'id': task['id'], |
||
227 | 'name': task['name'], |
||
228 | 'workflow_execution_id': task.get('workflow_execution_id', None), |
||
229 | 'workflow_name': task['workflow_name'], |
||
230 | 'created_at': task.get('created_at', None), |
||
231 | 'updated_at': task.get('updated_at', None), |
||
232 | 'state': task.get('state', None), |
||
233 | 'state_info': task.get('state_info', None) |
||
234 | } |
||
235 | |||
236 | for attr in ['result', 'input', 'published']: |
||
237 | result[attr] = jsonify.try_loads(task.get(attr, None)) |
||
238 | |||
239 | return result |
||
240 | |||
241 | def _format_query_result(self, current_result, new_wf_result, new_wf_tasks_result): |
||
242 | result = new_wf_result |
||
243 | |||
244 | new_wf_task_ids = [entry['id'] for entry in new_wf_tasks_result] |
||
245 | |||
246 | old_wf_tasks_result_to_keep = [ |
||
247 | entry for entry in current_result.get('tasks', []) |
||
248 | if entry['id'] not in new_wf_task_ids |
||
249 | ] |
||
250 | |||
251 | result['tasks'] = old_wf_tasks_result_to_keep + new_wf_tasks_result |
||
252 | |||
253 | return result |
||
254 | |||
255 | def _has_active_tasks(self, liveaction_db, mistral_wf_state, mistral_tasks): |
||
256 | # Identify if there are any active tasks in Mistral. |
||
257 | active_mistral_tasks = len([t for t in mistral_tasks if t['state'] in ACTIVE_STATES]) > 0 |
||
258 | |||
259 | active_st2_tasks = False |
||
260 | execution = ActionExecution.get(liveaction__id=str(liveaction_db.id)) |
||
261 | |||
262 | for child_exec_id in execution.children: |
||
263 | child_exec = ActionExecution.get(id=child_exec_id) |
||
264 | |||
265 | # Catch exception where a child is requested twice due to st2mistral retrying |
||
266 | # from a st2 API connection failure. The first child will be stuck in requested |
||
267 | # while the mistral workflow is already completed. |
||
268 | if (mistral_wf_state in DONE_STATES and |
||
269 | child_exec.status == action_constants.LIVEACTION_STATUS_REQUESTED): |
||
270 | continue |
||
271 | |||
272 | if (child_exec.status not in action_constants.LIVEACTION_COMPLETED_STATES and |
||
273 | child_exec.status != action_constants.LIVEACTION_STATUS_PAUSED): |
||
274 | active_st2_tasks = True |
||
275 | break |
||
276 | |||
277 | if active_mistral_tasks: |
||
278 | LOG.info('There are active mistral tasks for %s.', str(liveaction_db.id)) |
||
279 | |||
280 | if active_st2_tasks: |
||
281 | LOG.info('There are active st2 tasks for %s.', str(liveaction_db.id)) |
||
282 | |||
283 | return active_mistral_tasks or active_st2_tasks |
||
284 | |||
285 | def _determine_execution_status(self, liveaction_db, wf_state, tasks): |
||
286 | # Determine if liveaction is being canceled, paused, or resumed. |
||
287 | is_action_canceled = liveaction_db.status in CANCELED_STATES |
||
288 | is_action_paused = liveaction_db.status in PAUSED_STATES |
||
289 | is_action_resuming = liveaction_db.status in RESUMING_STATES |
||
290 | |||
291 | # Identify the list of tasks that are still running or pausing. |
||
292 | active_tasks = self._has_active_tasks(liveaction_db, wf_state, tasks) |
||
293 | |||
294 | # Keep the execution in running state if there are active tasks. |
||
295 | # In certain use cases, Mistral sets the workflow state to |
||
296 | # completion prior to task completion. |
||
297 | if is_action_canceled and active_tasks: |
||
298 | status = action_constants.LIVEACTION_STATUS_CANCELING |
||
299 | elif is_action_canceled and not active_tasks and wf_state not in DONE_STATES: |
||
300 | status = action_constants.LIVEACTION_STATUS_CANCELING |
||
301 | elif not is_action_canceled and active_tasks and wf_state == 'CANCELLED': |
||
302 | status = action_constants.LIVEACTION_STATUS_CANCELING |
||
303 | elif is_action_paused and active_tasks: |
||
304 | status = action_constants.LIVEACTION_STATUS_PAUSING |
||
305 | elif is_action_paused and not active_tasks and wf_state not in DONE_STATES: |
||
306 | status = action_constants.LIVEACTION_STATUS_PAUSING |
||
307 | elif not is_action_paused and active_tasks and wf_state == 'PAUSED': |
||
308 | status = action_constants.LIVEACTION_STATUS_PAUSING |
||
309 | elif is_action_resuming and wf_state == 'PAUSED': |
||
310 | status = action_constants.LIVEACTION_STATUS_RESUMING |
||
311 | elif wf_state in DONE_STATES and active_tasks: |
||
312 | status = action_constants.LIVEACTION_STATUS_RUNNING |
||
313 | elif wf_state in DONE_STATES and not active_tasks: |
||
314 | status = DONE_STATES[wf_state] |
||
315 | else: |
||
316 | status = action_constants.LIVEACTION_STATUS_RUNNING |
||
317 | |||
318 | return status |
||
319 |
It is generally discouraged to redefine built-ins as this makes code very hard to read.