|
1
|
|
|
import random |
|
2
|
|
|
import uuid |
|
3
|
|
|
|
|
4
|
|
|
from mistralclient.api import base as mistralclient_base |
|
5
|
|
|
from mistralclient.api import client as mistral |
|
6
|
|
|
from oslo_config import cfg |
|
7
|
|
|
import eventlet |
|
8
|
|
|
import retrying |
|
9
|
|
|
|
|
10
|
|
|
from st2common.query.base import Querier |
|
11
|
|
|
from st2common.constants import action as action_constants |
|
12
|
|
|
from st2common.exceptions import resultstracker as exceptions |
|
13
|
|
|
from st2common import log as logging |
|
14
|
|
|
from st2common.persistence.execution import ActionExecution |
|
15
|
|
|
from st2common.util import action_db as action_utils |
|
16
|
|
|
from st2common.util import jsonify |
|
17
|
|
|
from st2common.util.url import get_url_without_trailing_slash |
|
18
|
|
|
from st2common.util.workflow import mistral as utils |
|
19
|
|
|
|
|
20
|
|
|
|
|
21
|
|
|
LOG = logging.getLogger(__name__) |
|
22
|
|
|
|
|
23
|
|
|
DONE_STATES = { |
|
24
|
|
|
'ERROR': action_constants.LIVEACTION_STATUS_FAILED, |
|
25
|
|
|
'SUCCESS': action_constants.LIVEACTION_STATUS_SUCCEEDED, |
|
26
|
|
|
'CANCELLED': action_constants.LIVEACTION_STATUS_CANCELED, |
|
27
|
|
|
'PAUSED': action_constants.LIVEACTION_STATUS_PAUSED |
|
28
|
|
|
} |
|
29
|
|
|
|
|
30
|
|
|
ACTIVE_STATES = { |
|
31
|
|
|
'RUNNING': action_constants.LIVEACTION_STATUS_RUNNING |
|
32
|
|
|
} |
|
33
|
|
|
|
|
34
|
|
|
CANCELED_STATES = [ |
|
35
|
|
|
action_constants.LIVEACTION_STATUS_CANCELED, |
|
36
|
|
|
action_constants.LIVEACTION_STATUS_CANCELING |
|
37
|
|
|
] |
|
38
|
|
|
|
|
39
|
|
|
PAUSED_STATES = [ |
|
40
|
|
|
action_constants.LIVEACTION_STATUS_PAUSED, |
|
41
|
|
|
action_constants.LIVEACTION_STATUS_PAUSING |
|
42
|
|
|
] |
|
43
|
|
|
|
|
44
|
|
|
RESUMING_STATES = [ |
|
45
|
|
|
action_constants.LIVEACTION_STATUS_RESUMING |
|
46
|
|
|
] |
|
47
|
|
|
|
|
48
|
|
|
|
|
49
|
|
|
def get_instance(): |
|
50
|
|
|
return MistralResultsQuerier(str(uuid.uuid4())) |
|
51
|
|
|
|
|
52
|
|
|
|
|
53
|
|
|
class MistralResultsQuerier(Querier): |
|
54
|
|
|
delete_state_object_on_error = False |
|
55
|
|
|
|
|
56
|
|
|
def __init__(self, id, *args, **kwargs): |
|
|
|
|
|
|
57
|
|
|
super(MistralResultsQuerier, self).__init__(*args, **kwargs) |
|
58
|
|
|
self._base_url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url) |
|
59
|
|
|
self._client = mistral.client( |
|
60
|
|
|
mistral_url=self._base_url, |
|
61
|
|
|
username=cfg.CONF.mistral.keystone_username, |
|
62
|
|
|
api_key=cfg.CONF.mistral.keystone_password, |
|
63
|
|
|
project_name=cfg.CONF.mistral.keystone_project_name, |
|
64
|
|
|
auth_url=cfg.CONF.mistral.keystone_auth_url, |
|
65
|
|
|
cacert=cfg.CONF.mistral.cacert, |
|
66
|
|
|
insecure=cfg.CONF.mistral.insecure) |
|
67
|
|
|
self._jitter = cfg.CONF.mistral.jitter_interval |
|
68
|
|
|
|
|
69
|
|
|
@retrying.retry( |
|
70
|
|
|
retry_on_exception=utils.retry_on_exceptions, |
|
71
|
|
|
wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec, |
|
72
|
|
|
wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec, |
|
73
|
|
|
stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec) |
|
74
|
|
|
def query(self, execution_id, query_context, last_query_time=None): |
|
75
|
|
|
""" |
|
76
|
|
|
Queries mistral for workflow results using v2 APIs. |
|
77
|
|
|
:param execution_id: st2 execution_id (context to be used for logging/audit) |
|
78
|
|
|
:type execution_id: ``str`` |
|
79
|
|
|
:param query_context: context for the query to be made to mistral. This contains mistral |
|
80
|
|
|
execution id. |
|
81
|
|
|
:type query_context: ``object`` |
|
82
|
|
|
:param last_query_time: Timestamp of last query. |
|
83
|
|
|
:type last_query_time: ``float`` |
|
84
|
|
|
:rtype: (``str``, ``object``) |
|
85
|
|
|
""" |
|
86
|
|
|
# Retrieve liveaction_db to append new result to existing result. |
|
87
|
|
|
liveaction_db = action_utils.get_liveaction_by_id(execution_id) |
|
88
|
|
|
|
|
89
|
|
|
mistral_exec_id = query_context.get('mistral', {}).get('execution_id', None) |
|
90
|
|
|
if not mistral_exec_id: |
|
91
|
|
|
raise Exception('[%s] Missing mistral workflow execution ID in query context. %s', |
|
92
|
|
|
execution_id, query_context) |
|
93
|
|
|
|
|
94
|
|
|
LOG.info('[%s] Querying mistral execution %s...', execution_id, mistral_exec_id) |
|
95
|
|
|
|
|
96
|
|
|
try: |
|
97
|
|
|
wf_result = self._get_workflow_result(execution_id, mistral_exec_id) |
|
98
|
|
|
|
|
99
|
|
|
stream = getattr(liveaction_db, 'result', {}) |
|
100
|
|
|
|
|
101
|
|
|
wf_tasks_result = self._get_workflow_tasks( |
|
102
|
|
|
execution_id, |
|
103
|
|
|
mistral_exec_id, |
|
104
|
|
|
recorded_tasks=stream.get('tasks', []) |
|
105
|
|
|
) |
|
106
|
|
|
|
|
107
|
|
|
result = self._format_query_result( |
|
108
|
|
|
liveaction_db.result, |
|
109
|
|
|
wf_result, |
|
110
|
|
|
wf_tasks_result |
|
111
|
|
|
) |
|
112
|
|
|
except exceptions.ReferenceNotFoundError as exc: |
|
113
|
|
|
LOG.exception('[%s] Unable to find reference.', execution_id) |
|
114
|
|
|
return (action_constants.LIVEACTION_STATUS_FAILED, exc.message) |
|
115
|
|
|
except Exception: |
|
116
|
|
|
LOG.exception('[%s] Unable to fetch mistral workflow result and tasks. %s', |
|
117
|
|
|
execution_id, query_context) |
|
118
|
|
|
raise |
|
119
|
|
|
|
|
120
|
|
|
# Retrieve liveaction_db again in case state has changed |
|
121
|
|
|
# while the querier get results from mistral API above. |
|
122
|
|
|
liveaction_db = action_utils.get_liveaction_by_id(execution_id) |
|
123
|
|
|
|
|
124
|
|
|
status = self._determine_execution_status( |
|
125
|
|
|
liveaction_db, |
|
126
|
|
|
result['extra']['state'], |
|
127
|
|
|
result['tasks'] |
|
128
|
|
|
) |
|
129
|
|
|
|
|
130
|
|
|
LOG.info('[%s] Determined execution status: %s', execution_id, status) |
|
131
|
|
|
LOG.debug('[%s] Combined execution result: %s', execution_id, result) |
|
132
|
|
|
|
|
133
|
|
|
return (status, result) |
|
134
|
|
|
|
|
135
|
|
|
def _get_workflow_result(self, st2_exec_id, mistral_exec_id): |
|
136
|
|
|
""" |
|
137
|
|
|
Returns the workflow status and output. Mistral workflow status will be converted |
|
138
|
|
|
to st2 action status. |
|
139
|
|
|
:param st2_exec_id: st2 execution ID |
|
140
|
|
|
:type st2_exec_id: ``str`` |
|
141
|
|
|
:param mistral_exec_id: Mistral execution ID |
|
142
|
|
|
:type mistral_exec_id: ``str`` |
|
143
|
|
|
:rtype: (``str``, ``dict``) |
|
144
|
|
|
""" |
|
145
|
|
|
try: |
|
146
|
|
|
jitter = random.uniform(0, self._jitter) |
|
147
|
|
|
eventlet.sleep(jitter) |
|
148
|
|
|
execution = self._client.executions.get(mistral_exec_id) |
|
149
|
|
|
except mistralclient_base.APIException as mistral_exc: |
|
150
|
|
|
if 'not found' in mistral_exc.message: |
|
151
|
|
|
raise exceptions.ReferenceNotFoundError(mistral_exc.message) |
|
152
|
|
|
raise mistral_exc |
|
153
|
|
|
|
|
154
|
|
|
result = jsonify.try_loads(execution.output) if execution.state in DONE_STATES else {} |
|
155
|
|
|
|
|
156
|
|
|
result['extra'] = { |
|
157
|
|
|
'state': execution.state, |
|
158
|
|
|
'state_info': execution.state_info |
|
159
|
|
|
} |
|
160
|
|
|
|
|
161
|
|
|
LOG.info( |
|
162
|
|
|
'[%s] Query returned status "%s" for mistral execution %s.', |
|
163
|
|
|
st2_exec_id, |
|
164
|
|
|
execution.state, |
|
165
|
|
|
mistral_exec_id |
|
166
|
|
|
) |
|
167
|
|
|
|
|
168
|
|
|
return result |
|
169
|
|
|
|
|
170
|
|
|
def _get_workflow_tasks(self, st2_exec_id, mistral_exec_id, recorded_tasks=None): |
|
171
|
|
|
""" |
|
172
|
|
|
Returns the list of tasks for a workflow execution. |
|
173
|
|
|
:param st2_exec_id: st2 execution ID |
|
174
|
|
|
:type st2_exec_id: ``str`` |
|
175
|
|
|
:param mistral_exec_id: Mistral execution ID |
|
176
|
|
|
:type mistral_exec_id: ``str`` |
|
177
|
|
|
:param recorded_tasks: The list of tasks recorded in the liveaction result. |
|
178
|
|
|
:rtype: ``list`` |
|
179
|
|
|
""" |
|
180
|
|
|
result = [] |
|
181
|
|
|
queries = [] |
|
182
|
|
|
|
|
183
|
|
|
if recorded_tasks is None: |
|
184
|
|
|
recorded_tasks = [] |
|
185
|
|
|
|
|
186
|
|
|
try: |
|
187
|
|
|
wf_tasks = self._client.tasks.list(workflow_execution_id=mistral_exec_id) |
|
188
|
|
|
|
|
189
|
|
|
for wf_task in wf_tasks: |
|
190
|
|
|
recorded = list(filter(lambda x: x['id'] == wf_task.id, recorded_tasks)) |
|
|
|
|
|
|
191
|
|
|
|
|
192
|
|
|
if (not recorded or |
|
193
|
|
|
recorded[0].get('state') != wf_task.state or |
|
194
|
|
|
str(recorded[0].get('created_at')) != wf_task.created_at or |
|
195
|
|
|
str(recorded[0].get('updated_at')) != wf_task.updated_at): |
|
196
|
|
|
queries.append(wf_task) |
|
197
|
|
|
|
|
198
|
|
|
target_task_names = [wf_task.name for wf_task in queries] |
|
199
|
|
|
|
|
200
|
|
|
LOG.info( |
|
201
|
|
|
'[%s] Querying the following tasks for mistral execution %s: %s', |
|
202
|
|
|
st2_exec_id, |
|
203
|
|
|
mistral_exec_id, |
|
204
|
|
|
', '.join(target_task_names) if target_task_names else 'None' |
|
205
|
|
|
) |
|
206
|
|
|
|
|
207
|
|
|
for wf_task in queries: |
|
208
|
|
|
result.append(self._client.tasks.get(wf_task.id)) |
|
209
|
|
|
|
|
210
|
|
|
# Lets not blast requests but just space it out for better CPU profile |
|
211
|
|
|
jitter = random.uniform(0, self._jitter) |
|
212
|
|
|
eventlet.sleep(jitter) |
|
213
|
|
|
except mistralclient_base.APIException as mistral_exc: |
|
214
|
|
|
if 'not found' in mistral_exc.message: |
|
215
|
|
|
raise exceptions.ReferenceNotFoundError(mistral_exc.message) |
|
216
|
|
|
raise mistral_exc |
|
217
|
|
|
|
|
218
|
|
|
return [self._format_task_result(task=entry.to_dict()) for entry in result] |
|
219
|
|
|
|
|
220
|
|
|
def _format_task_result(self, task): |
|
221
|
|
|
""" |
|
222
|
|
|
Format task result to follow the unified workflow result format. |
|
223
|
|
|
""" |
|
224
|
|
|
result = { |
|
225
|
|
|
'id': task['id'], |
|
226
|
|
|
'name': task['name'], |
|
227
|
|
|
'workflow_execution_id': task.get('workflow_execution_id', None), |
|
228
|
|
|
'workflow_name': task['workflow_name'], |
|
229
|
|
|
'created_at': task.get('created_at', None), |
|
230
|
|
|
'updated_at': task.get('updated_at', None), |
|
231
|
|
|
'state': task.get('state', None), |
|
232
|
|
|
'state_info': task.get('state_info', None) |
|
233
|
|
|
} |
|
234
|
|
|
|
|
235
|
|
|
for attr in ['result', 'input', 'published']: |
|
236
|
|
|
result[attr] = jsonify.try_loads(task.get(attr, None)) |
|
237
|
|
|
|
|
238
|
|
|
return result |
|
239
|
|
|
|
|
240
|
|
|
def _format_query_result(self, current_result, new_wf_result, new_wf_tasks_result): |
|
241
|
|
|
result = new_wf_result |
|
242
|
|
|
|
|
243
|
|
|
new_wf_task_ids = [entry['id'] for entry in new_wf_tasks_result] |
|
244
|
|
|
|
|
245
|
|
|
old_wf_tasks_result_to_keep = [ |
|
246
|
|
|
entry for entry in current_result.get('tasks', []) |
|
247
|
|
|
if entry['id'] not in new_wf_task_ids |
|
248
|
|
|
] |
|
249
|
|
|
|
|
250
|
|
|
result['tasks'] = old_wf_tasks_result_to_keep + new_wf_tasks_result |
|
251
|
|
|
|
|
252
|
|
|
return result |
|
253
|
|
|
|
|
254
|
|
|
def _has_active_tasks(self, liveaction_db, mistral_tasks): |
|
255
|
|
|
# Identify if there are any active tasks in Mistral. |
|
256
|
|
|
active_mistral_tasks = len([t for t in mistral_tasks if t['state'] in ACTIVE_STATES]) > 0 |
|
257
|
|
|
|
|
258
|
|
|
active_st2_tasks = False |
|
259
|
|
|
execution = ActionExecution.get(liveaction__id=str(liveaction_db.id)) |
|
260
|
|
|
|
|
261
|
|
|
for child_exec_id in execution.children: |
|
262
|
|
|
child_exec = ActionExecution.get(id=child_exec_id) |
|
263
|
|
|
|
|
264
|
|
|
if (child_exec.status not in action_constants.LIVEACTION_COMPLETED_STATES and |
|
265
|
|
|
child_exec.status != action_constants.LIVEACTION_STATUS_PAUSED): |
|
266
|
|
|
active_st2_tasks = True |
|
267
|
|
|
break |
|
268
|
|
|
|
|
269
|
|
|
if active_mistral_tasks: |
|
270
|
|
|
LOG.info('There are active mistral tasks for %s.', str(liveaction_db.id)) |
|
271
|
|
|
|
|
272
|
|
|
if active_st2_tasks: |
|
273
|
|
|
LOG.info('There are active st2 tasks for %s.', str(liveaction_db.id)) |
|
274
|
|
|
|
|
275
|
|
|
return active_mistral_tasks or active_st2_tasks |
|
276
|
|
|
|
|
277
|
|
|
def _determine_execution_status(self, liveaction_db, wf_state, tasks): |
|
278
|
|
|
# Determine if liveaction is being canceled, paused, or resumed. |
|
279
|
|
|
is_action_canceled = liveaction_db.status in CANCELED_STATES |
|
280
|
|
|
is_action_paused = liveaction_db.status in PAUSED_STATES |
|
281
|
|
|
is_action_resuming = liveaction_db.status in RESUMING_STATES |
|
282
|
|
|
|
|
283
|
|
|
# Identify the list of tasks that are still running or pausing. |
|
284
|
|
|
active_tasks = self._has_active_tasks(liveaction_db, tasks) |
|
285
|
|
|
|
|
286
|
|
|
# Keep the execution in running state if there are active tasks. |
|
287
|
|
|
# In certain use cases, Mistral sets the workflow state to |
|
288
|
|
|
# completion prior to task completion. |
|
289
|
|
|
if is_action_canceled and active_tasks: |
|
290
|
|
|
status = action_constants.LIVEACTION_STATUS_CANCELING |
|
291
|
|
|
elif is_action_canceled and not active_tasks and wf_state not in DONE_STATES: |
|
292
|
|
|
status = action_constants.LIVEACTION_STATUS_CANCELING |
|
293
|
|
|
elif not is_action_canceled and active_tasks and wf_state == 'CANCELLED': |
|
294
|
|
|
status = action_constants.LIVEACTION_STATUS_CANCELING |
|
295
|
|
|
elif is_action_paused and active_tasks: |
|
296
|
|
|
status = action_constants.LIVEACTION_STATUS_PAUSING |
|
297
|
|
|
elif is_action_paused and not active_tasks and wf_state not in DONE_STATES: |
|
298
|
|
|
status = action_constants.LIVEACTION_STATUS_PAUSING |
|
299
|
|
|
elif not is_action_paused and active_tasks and wf_state == 'PAUSED': |
|
300
|
|
|
status = action_constants.LIVEACTION_STATUS_PAUSING |
|
301
|
|
|
elif is_action_resuming and wf_state == 'PAUSED': |
|
302
|
|
|
status = action_constants.LIVEACTION_STATUS_RESUMING |
|
303
|
|
|
elif wf_state in DONE_STATES and active_tasks: |
|
304
|
|
|
status = action_constants.LIVEACTION_STATUS_RUNNING |
|
305
|
|
|
elif wf_state in DONE_STATES and not active_tasks: |
|
306
|
|
|
status = DONE_STATES[wf_state] |
|
307
|
|
|
else: |
|
308
|
|
|
status = action_constants.LIVEACTION_STATUS_RUNNING |
|
309
|
|
|
|
|
310
|
|
|
return status |
|
311
|
|
|
|
It is generally discouraged to redefine built-ins as this makes code very hard to read.