1
|
|
|
import random |
2
|
|
|
import uuid |
3
|
|
|
|
4
|
|
|
from mistralclient.api import base as mistralclient_base |
5
|
|
|
from mistralclient.api import client as mistral |
6
|
|
|
from oslo_config import cfg |
7
|
|
|
import eventlet |
8
|
|
|
import retrying |
9
|
|
|
|
10
|
|
|
from st2common.query.base import Querier |
11
|
|
|
from st2common.constants import action as action_constants |
12
|
|
|
from st2common.exceptions import resultstracker as exceptions |
13
|
|
|
from st2common import log as logging |
14
|
|
|
from st2common.persistence.execution import ActionExecution |
15
|
|
|
from st2common.util import action_db as action_utils |
16
|
|
|
from st2common.util import jsonify |
17
|
|
|
from st2common.util.url import get_url_without_trailing_slash |
18
|
|
|
from st2common.util.workflow import mistral as utils |
19
|
|
|
|
20
|
|
|
|
21
|
|
|
LOG = logging.getLogger(__name__) |
22
|
|
|
|
23
|
|
|
DONE_STATES = { |
24
|
|
|
'ERROR': action_constants.LIVEACTION_STATUS_FAILED, |
25
|
|
|
'SUCCESS': action_constants.LIVEACTION_STATUS_SUCCEEDED, |
26
|
|
|
'CANCELLED': action_constants.LIVEACTION_STATUS_CANCELED, |
27
|
|
|
'PAUSED': action_constants.LIVEACTION_STATUS_PAUSED |
28
|
|
|
} |
29
|
|
|
|
30
|
|
|
ACTIVE_STATES = { |
31
|
|
|
'RUNNING': action_constants.LIVEACTION_STATUS_RUNNING |
32
|
|
|
} |
33
|
|
|
|
34
|
|
|
CANCELED_STATES = [ |
35
|
|
|
action_constants.LIVEACTION_STATUS_CANCELED, |
36
|
|
|
action_constants.LIVEACTION_STATUS_CANCELING |
37
|
|
|
] |
38
|
|
|
|
39
|
|
|
PAUSED_STATES = [ |
40
|
|
|
action_constants.LIVEACTION_STATUS_PAUSED, |
41
|
|
|
action_constants.LIVEACTION_STATUS_PAUSING |
42
|
|
|
] |
43
|
|
|
|
44
|
|
|
RESUMING_STATES = [ |
45
|
|
|
action_constants.LIVEACTION_STATUS_RESUMING |
46
|
|
|
] |
47
|
|
|
|
48
|
|
|
|
49
|
|
|
def get_instance(): |
50
|
|
|
return MistralResultsQuerier(str(uuid.uuid4())) |
51
|
|
|
|
52
|
|
|
|
53
|
|
|
class MistralResultsQuerier(Querier): |
54
|
|
|
delete_state_object_on_error = False |
55
|
|
|
|
56
|
|
|
def __init__(self, id, *args, **kwargs): |
|
|
|
|
57
|
|
|
super(MistralResultsQuerier, self).__init__(*args, **kwargs) |
58
|
|
|
self._base_url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url) |
59
|
|
|
self._client = mistral.client( |
60
|
|
|
mistral_url=self._base_url, |
61
|
|
|
username=cfg.CONF.mistral.keystone_username, |
62
|
|
|
api_key=cfg.CONF.mistral.keystone_password, |
63
|
|
|
project_name=cfg.CONF.mistral.keystone_project_name, |
64
|
|
|
auth_url=cfg.CONF.mistral.keystone_auth_url, |
65
|
|
|
cacert=cfg.CONF.mistral.cacert, |
66
|
|
|
insecure=cfg.CONF.mistral.insecure) |
67
|
|
|
self._jitter = cfg.CONF.mistral.jitter_interval |
68
|
|
|
|
69
|
|
|
@retrying.retry( |
70
|
|
|
retry_on_exception=utils.retry_on_exceptions, |
71
|
|
|
wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec, |
72
|
|
|
wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec, |
73
|
|
|
stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec) |
74
|
|
|
def query(self, execution_id, query_context, last_query_time=None): |
75
|
|
|
""" |
76
|
|
|
Queries mistral for workflow results using v2 APIs. |
77
|
|
|
:param execution_id: st2 execution_id (context to be used for logging/audit) |
78
|
|
|
:type execution_id: ``str`` |
79
|
|
|
:param query_context: context for the query to be made to mistral. This contains mistral |
80
|
|
|
execution id. |
81
|
|
|
:type query_context: ``object`` |
82
|
|
|
:param last_query_time: Timestamp of last query. |
83
|
|
|
:type last_query_time: ``float`` |
84
|
|
|
:rtype: (``str``, ``object``) |
85
|
|
|
""" |
86
|
|
|
# Retrieve liveaction_db to append new result to existing result. |
87
|
|
|
liveaction_db = action_utils.get_liveaction_by_id(execution_id) |
88
|
|
|
|
89
|
|
|
mistral_exec_id = query_context.get('mistral', {}).get('execution_id', None) |
90
|
|
|
if not mistral_exec_id: |
91
|
|
|
raise Exception('[%s] Missing mistral workflow execution ID in query context. %s', |
92
|
|
|
execution_id, query_context) |
93
|
|
|
|
94
|
|
|
LOG.info('[%s] Querying mistral execution %s...', execution_id, mistral_exec_id) |
95
|
|
|
|
96
|
|
|
try: |
97
|
|
|
wf_result = self._get_workflow_result(execution_id, mistral_exec_id) |
98
|
|
|
|
99
|
|
|
stream = getattr(liveaction_db, 'result', {}) |
100
|
|
|
|
101
|
|
|
wf_tasks_result = self._get_workflow_tasks( |
102
|
|
|
execution_id, |
103
|
|
|
mistral_exec_id, |
104
|
|
|
recorded_tasks=stream.get('tasks', []) |
105
|
|
|
) |
106
|
|
|
|
107
|
|
|
result = self._format_query_result( |
108
|
|
|
liveaction_db.result, |
109
|
|
|
wf_result, |
110
|
|
|
wf_tasks_result |
111
|
|
|
) |
112
|
|
|
except exceptions.ReferenceNotFoundError as exc: |
113
|
|
|
LOG.exception('[%s] Unable to find reference.', execution_id) |
114
|
|
|
return (action_constants.LIVEACTION_STATUS_FAILED, exc.message) |
115
|
|
|
except Exception: |
116
|
|
|
LOG.exception('[%s] Unable to fetch mistral workflow result and tasks. %s', |
117
|
|
|
execution_id, query_context) |
118
|
|
|
raise |
119
|
|
|
|
120
|
|
|
# Retrieve liveaction_db again in case state has changed |
121
|
|
|
# while the querier get results from mistral API above. |
122
|
|
|
liveaction_db = action_utils.get_liveaction_by_id(execution_id) |
123
|
|
|
|
124
|
|
|
status = self._determine_execution_status( |
125
|
|
|
liveaction_db, |
126
|
|
|
result['extra']['state'], |
127
|
|
|
result['tasks'] |
128
|
|
|
) |
129
|
|
|
|
130
|
|
|
LOG.info('[%s] Determined execution status: %s', execution_id, status) |
131
|
|
|
LOG.debug('[%s] Combined execution result: %s', execution_id, result) |
132
|
|
|
|
133
|
|
|
return (status, result) |
134
|
|
|
|
135
|
|
|
def _get_workflow_result(self, st2_exec_id, mistral_exec_id): |
136
|
|
|
""" |
137
|
|
|
Returns the workflow status and output. Mistral workflow status will be converted |
138
|
|
|
to st2 action status. |
139
|
|
|
:param st2_exec_id: st2 execution ID |
140
|
|
|
:type st2_exec_id: ``str`` |
141
|
|
|
:param mistral_exec_id: Mistral execution ID |
142
|
|
|
:type mistral_exec_id: ``str`` |
143
|
|
|
:rtype: (``str``, ``dict``) |
144
|
|
|
""" |
145
|
|
|
try: |
146
|
|
|
jitter = random.uniform(0, self._jitter) |
147
|
|
|
eventlet.sleep(jitter) |
148
|
|
|
execution = self._client.executions.get(mistral_exec_id) |
149
|
|
|
except mistralclient_base.APIException as mistral_exc: |
150
|
|
|
if 'not found' in mistral_exc.message: |
151
|
|
|
raise exceptions.ReferenceNotFoundError(mistral_exc.message) |
152
|
|
|
raise mistral_exc |
153
|
|
|
|
154
|
|
|
result = jsonify.try_loads(execution.output) if execution.state in DONE_STATES else {} |
155
|
|
|
|
156
|
|
|
result['extra'] = { |
157
|
|
|
'state': execution.state, |
158
|
|
|
'state_info': execution.state_info |
159
|
|
|
} |
160
|
|
|
|
161
|
|
|
LOG.info( |
162
|
|
|
'[%s] Query returned status "%s" for mistral execution %s.', |
163
|
|
|
st2_exec_id, |
164
|
|
|
execution.state, |
165
|
|
|
mistral_exec_id |
166
|
|
|
) |
167
|
|
|
|
168
|
|
|
return result |
169
|
|
|
|
170
|
|
|
def _get_workflow_tasks(self, st2_exec_id, mistral_exec_id, recorded_tasks=None): |
171
|
|
|
""" |
172
|
|
|
Returns the list of tasks for a workflow execution. |
173
|
|
|
:param st2_exec_id: st2 execution ID |
174
|
|
|
:type st2_exec_id: ``str`` |
175
|
|
|
:param mistral_exec_id: Mistral execution ID |
176
|
|
|
:type mistral_exec_id: ``str`` |
177
|
|
|
:param recorded_tasks: The list of tasks recorded in the liveaction result. |
178
|
|
|
:rtype: ``list`` |
179
|
|
|
""" |
180
|
|
|
result = [] |
181
|
|
|
queries = [] |
182
|
|
|
|
183
|
|
|
if recorded_tasks is None: |
184
|
|
|
recorded_tasks = [] |
185
|
|
|
|
186
|
|
|
try: |
187
|
|
|
wf_tasks = self._client.tasks.list(workflow_execution_id=mistral_exec_id) |
188
|
|
|
|
189
|
|
|
for wf_task in wf_tasks: |
190
|
|
|
recorded = list(filter(lambda x: x['id'] == wf_task.id, recorded_tasks)) |
|
|
|
|
191
|
|
|
|
192
|
|
|
if (not recorded or |
193
|
|
|
recorded[0].get('state') != wf_task.state or |
194
|
|
|
str(recorded[0].get('created_at')) != wf_task.created_at or |
195
|
|
|
str(recorded[0].get('updated_at')) != wf_task.updated_at): |
196
|
|
|
queries.append(wf_task) |
197
|
|
|
|
198
|
|
|
target_task_names = [wf_task.name for wf_task in queries] |
199
|
|
|
|
200
|
|
|
LOG.info( |
201
|
|
|
'[%s] Querying the following tasks for mistral execution %s: %s', |
202
|
|
|
st2_exec_id, |
203
|
|
|
mistral_exec_id, |
204
|
|
|
', '.join(target_task_names) if target_task_names else 'None' |
205
|
|
|
) |
206
|
|
|
|
207
|
|
|
for wf_task in queries: |
208
|
|
|
result.append(self._client.tasks.get(wf_task.id)) |
209
|
|
|
|
210
|
|
|
# Lets not blast requests but just space it out for better CPU profile |
211
|
|
|
jitter = random.uniform(0, self._jitter) |
212
|
|
|
eventlet.sleep(jitter) |
213
|
|
|
except mistralclient_base.APIException as mistral_exc: |
214
|
|
|
if 'not found' in mistral_exc.message: |
215
|
|
|
raise exceptions.ReferenceNotFoundError(mistral_exc.message) |
216
|
|
|
raise mistral_exc |
217
|
|
|
|
218
|
|
|
return [self._format_task_result(task=entry.to_dict()) for entry in result] |
219
|
|
|
|
220
|
|
|
def _format_task_result(self, task): |
221
|
|
|
""" |
222
|
|
|
Format task result to follow the unified workflow result format. |
223
|
|
|
""" |
224
|
|
|
result = { |
225
|
|
|
'id': task['id'], |
226
|
|
|
'name': task['name'], |
227
|
|
|
'workflow_execution_id': task.get('workflow_execution_id', None), |
228
|
|
|
'workflow_name': task['workflow_name'], |
229
|
|
|
'created_at': task.get('created_at', None), |
230
|
|
|
'updated_at': task.get('updated_at', None), |
231
|
|
|
'state': task.get('state', None), |
232
|
|
|
'state_info': task.get('state_info', None) |
233
|
|
|
} |
234
|
|
|
|
235
|
|
|
for attr in ['result', 'input', 'published']: |
236
|
|
|
result[attr] = jsonify.try_loads(task.get(attr, None)) |
237
|
|
|
|
238
|
|
|
return result |
239
|
|
|
|
240
|
|
|
def _format_query_result(self, current_result, new_wf_result, new_wf_tasks_result): |
241
|
|
|
result = new_wf_result |
242
|
|
|
|
243
|
|
|
new_wf_task_ids = [entry['id'] for entry in new_wf_tasks_result] |
244
|
|
|
|
245
|
|
|
old_wf_tasks_result_to_keep = [ |
246
|
|
|
entry for entry in current_result.get('tasks', []) |
247
|
|
|
if entry['id'] not in new_wf_task_ids |
248
|
|
|
] |
249
|
|
|
|
250
|
|
|
result['tasks'] = old_wf_tasks_result_to_keep + new_wf_tasks_result |
251
|
|
|
|
252
|
|
|
return result |
253
|
|
|
|
254
|
|
|
def _has_active_tasks(self, liveaction_db, mistral_wf_state, mistral_tasks): |
255
|
|
|
# Identify if there are any active tasks in Mistral. |
256
|
|
|
active_mistral_tasks = len([t for t in mistral_tasks if t['state'] in ACTIVE_STATES]) > 0 |
257
|
|
|
|
258
|
|
|
active_st2_tasks = False |
259
|
|
|
execution = ActionExecution.get(liveaction__id=str(liveaction_db.id)) |
260
|
|
|
|
261
|
|
|
for child_exec_id in execution.children: |
262
|
|
|
child_exec = ActionExecution.get(id=child_exec_id) |
263
|
|
|
|
264
|
|
|
# Catch exception where a child is requested twice due to st2mistral retrying |
265
|
|
|
# from a st2 API connection failure. The first child will be stuck in requested |
266
|
|
|
# while the mistral workflow is already completed. |
267
|
|
|
if (mistral_wf_state in DONE_STATES and |
268
|
|
|
child_exec.status == action_constants.LIVEACTION_STATUS_REQUESTED): |
269
|
|
|
continue |
270
|
|
|
|
271
|
|
|
if (child_exec.status not in action_constants.LIVEACTION_COMPLETED_STATES and |
272
|
|
|
child_exec.status != action_constants.LIVEACTION_STATUS_PAUSED): |
273
|
|
|
active_st2_tasks = True |
274
|
|
|
break |
275
|
|
|
|
276
|
|
|
if active_mistral_tasks: |
277
|
|
|
LOG.info('There are active mistral tasks for %s.', str(liveaction_db.id)) |
278
|
|
|
|
279
|
|
|
if active_st2_tasks: |
280
|
|
|
LOG.info('There are active st2 tasks for %s.', str(liveaction_db.id)) |
281
|
|
|
|
282
|
|
|
return active_mistral_tasks or active_st2_tasks |
283
|
|
|
|
284
|
|
|
def _determine_execution_status(self, liveaction_db, wf_state, tasks): |
285
|
|
|
# Determine if liveaction is being canceled, paused, or resumed. |
286
|
|
|
is_action_canceled = liveaction_db.status in CANCELED_STATES |
287
|
|
|
is_action_paused = liveaction_db.status in PAUSED_STATES |
288
|
|
|
is_action_resuming = liveaction_db.status in RESUMING_STATES |
289
|
|
|
|
290
|
|
|
# Identify the list of tasks that are still running or pausing. |
291
|
|
|
active_tasks = self._has_active_tasks(liveaction_db, wf_state, tasks) |
292
|
|
|
|
293
|
|
|
# Keep the execution in running state if there are active tasks. |
294
|
|
|
# In certain use cases, Mistral sets the workflow state to |
295
|
|
|
# completion prior to task completion. |
296
|
|
|
if is_action_canceled and active_tasks: |
297
|
|
|
status = action_constants.LIVEACTION_STATUS_CANCELING |
298
|
|
|
elif is_action_canceled and not active_tasks and wf_state not in DONE_STATES: |
299
|
|
|
status = action_constants.LIVEACTION_STATUS_CANCELING |
300
|
|
|
elif not is_action_canceled and active_tasks and wf_state == 'CANCELLED': |
301
|
|
|
status = action_constants.LIVEACTION_STATUS_CANCELING |
302
|
|
|
elif is_action_paused and active_tasks: |
303
|
|
|
status = action_constants.LIVEACTION_STATUS_PAUSING |
304
|
|
|
elif is_action_paused and not active_tasks and wf_state not in DONE_STATES: |
305
|
|
|
status = action_constants.LIVEACTION_STATUS_PAUSING |
306
|
|
|
elif not is_action_paused and active_tasks and wf_state == 'PAUSED': |
307
|
|
|
status = action_constants.LIVEACTION_STATUS_PAUSING |
308
|
|
|
elif is_action_resuming and wf_state == 'PAUSED': |
309
|
|
|
status = action_constants.LIVEACTION_STATUS_RESUMING |
310
|
|
|
elif wf_state in DONE_STATES and active_tasks: |
311
|
|
|
status = action_constants.LIVEACTION_STATUS_RUNNING |
312
|
|
|
elif wf_state in DONE_STATES and not active_tasks: |
313
|
|
|
status = DONE_STATES[wf_state] |
314
|
|
|
else: |
315
|
|
|
status = action_constants.LIVEACTION_STATUS_RUNNING |
316
|
|
|
|
317
|
|
|
return status |
318
|
|
|
|
It is generally discouraged to redefine built-ins as this makes code very hard to read.