|
1
|
|
|
import random |
|
2
|
|
|
import time |
|
3
|
|
|
import uuid |
|
4
|
|
|
|
|
5
|
|
|
from mistralclient.api import base as mistralclient_base |
|
6
|
|
|
from mistralclient.api import client as mistral |
|
7
|
|
|
from oslo_config import cfg |
|
8
|
|
|
import eventlet |
|
9
|
|
|
import retrying |
|
10
|
|
|
|
|
11
|
|
|
from st2common.query.base import Querier |
|
12
|
|
|
from st2common.constants import action as action_constants |
|
13
|
|
|
from st2common.exceptions import resultstracker as exceptions |
|
14
|
|
|
from st2common import log as logging |
|
15
|
|
|
from st2common.util import action_db as action_utils |
|
16
|
|
|
from st2common.util import jsonify |
|
17
|
|
|
from st2common.util.url import get_url_without_trailing_slash |
|
18
|
|
|
from st2common.util.workflow import mistral as utils |
|
19
|
|
|
|
|
20
|
|
|
|
|
21
|
|
|
LOG = logging.getLogger(__name__) |
|
22
|
|
|
|
|
23
|
|
|
DONE_STATES = { |
|
24
|
|
|
'ERROR': action_constants.LIVEACTION_STATUS_FAILED, |
|
25
|
|
|
'SUCCESS': action_constants.LIVEACTION_STATUS_SUCCEEDED, |
|
26
|
|
|
'CANCELLED': action_constants.LIVEACTION_STATUS_CANCELED |
|
27
|
|
|
} |
|
28
|
|
|
|
|
29
|
|
|
ACTIVE_STATES = { |
|
30
|
|
|
'RUNNING': action_constants.LIVEACTION_STATUS_RUNNING |
|
31
|
|
|
} |
|
32
|
|
|
|
|
33
|
|
|
CANCELED_STATES = [ |
|
34
|
|
|
action_constants.LIVEACTION_STATUS_CANCELED, |
|
35
|
|
|
action_constants.LIVEACTION_STATUS_CANCELING |
|
36
|
|
|
] |
|
37
|
|
|
|
|
38
|
|
|
|
|
39
|
|
|
def get_instance(): |
|
40
|
|
|
return MistralResultsQuerier(str(uuid.uuid4())) |
|
41
|
|
|
|
|
42
|
|
|
|
|
43
|
|
|
class MistralResultsQuerier(Querier): |
|
44
|
|
|
delete_state_object_on_error = False |
|
45
|
|
|
|
|
46
|
|
|
def __init__(self, id, *args, **kwargs): |
|
|
|
|
|
|
47
|
|
|
super(MistralResultsQuerier, self).__init__(*args, **kwargs) |
|
48
|
|
|
self._base_url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url) |
|
49
|
|
|
self._client = mistral.client( |
|
50
|
|
|
mistral_url=self._base_url, |
|
51
|
|
|
username=cfg.CONF.mistral.keystone_username, |
|
52
|
|
|
api_key=cfg.CONF.mistral.keystone_password, |
|
53
|
|
|
project_name=cfg.CONF.mistral.keystone_project_name, |
|
54
|
|
|
auth_url=cfg.CONF.mistral.keystone_auth_url, |
|
55
|
|
|
cacert=cfg.CONF.mistral.cacert, |
|
56
|
|
|
insecure=cfg.CONF.mistral.insecure) |
|
57
|
|
|
self._jitter = cfg.CONF.mistral.jitter_interval |
|
58
|
|
|
|
|
59
|
|
|
@retrying.retry( |
|
60
|
|
|
retry_on_exception=utils.retry_on_exceptions, |
|
61
|
|
|
wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec, |
|
62
|
|
|
wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec, |
|
63
|
|
|
stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec) |
|
64
|
|
|
def query(self, execution_id, query_context, last_query_time=None): |
|
65
|
|
|
""" |
|
66
|
|
|
Queries mistral for workflow results using v2 APIs. |
|
67
|
|
|
:param execution_id: st2 execution_id (context to be used for logging/audit) |
|
68
|
|
|
:type execution_id: ``str`` |
|
69
|
|
|
:param query_context: context for the query to be made to mistral. This contains mistral |
|
70
|
|
|
execution id. |
|
71
|
|
|
:type query_context: ``object`` |
|
72
|
|
|
:param last_query_time: Timestamp of last query. |
|
73
|
|
|
:type last_query_time: ``float`` |
|
74
|
|
|
:rtype: (``str``, ``object``) |
|
75
|
|
|
""" |
|
76
|
|
|
dt_last_query_time = ( |
|
77
|
|
|
time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(last_query_time)) |
|
78
|
|
|
if last_query_time else None |
|
79
|
|
|
) |
|
80
|
|
|
|
|
81
|
|
|
liveaction_db = action_utils.get_liveaction_by_id(execution_id) |
|
82
|
|
|
|
|
83
|
|
|
mistral_exec_id = query_context.get('mistral', {}).get('execution_id', None) |
|
84
|
|
|
if not mistral_exec_id: |
|
85
|
|
|
raise Exception('[%s] Missing mistral workflow execution ID in query context. %s', |
|
86
|
|
|
execution_id, query_context) |
|
87
|
|
|
|
|
88
|
|
|
try: |
|
89
|
|
|
wf_result = self._get_workflow_result(mistral_exec_id) |
|
90
|
|
|
|
|
91
|
|
|
wf_tasks_result = self._get_workflow_tasks( |
|
92
|
|
|
mistral_exec_id, |
|
93
|
|
|
last_query_time=dt_last_query_time |
|
94
|
|
|
) |
|
95
|
|
|
|
|
96
|
|
|
result = self._format_query_result( |
|
97
|
|
|
liveaction_db.result, |
|
98
|
|
|
wf_result, |
|
99
|
|
|
wf_tasks_result |
|
100
|
|
|
) |
|
101
|
|
|
except exceptions.ReferenceNotFoundError as exc: |
|
102
|
|
|
LOG.exception('[%s] Unable to find reference.', execution_id) |
|
103
|
|
|
return (action_constants.LIVEACTION_STATUS_FAILED, exc.message) |
|
104
|
|
|
except Exception: |
|
105
|
|
|
LOG.exception('[%s] Unable to fetch mistral workflow result and tasks. %s', |
|
106
|
|
|
execution_id, query_context) |
|
107
|
|
|
raise |
|
108
|
|
|
|
|
109
|
|
|
status = self._determine_execution_status( |
|
110
|
|
|
liveaction_db, |
|
111
|
|
|
result['extra']['state'], |
|
112
|
|
|
result['tasks'] |
|
113
|
|
|
) |
|
114
|
|
|
|
|
115
|
|
|
LOG.debug('[%s] mistral workflow execution status: %s' % (execution_id, status)) |
|
116
|
|
|
LOG.debug('[%s] mistral workflow execution result: %s' % (execution_id, result)) |
|
117
|
|
|
|
|
118
|
|
|
return (status, result) |
|
119
|
|
|
|
|
120
|
|
|
def _get_workflow_result(self, exec_id): |
|
121
|
|
|
""" |
|
122
|
|
|
Returns the workflow status and output. Mistral workflow status will be converted |
|
123
|
|
|
to st2 action status. |
|
124
|
|
|
:param exec_id: Mistral execution ID |
|
125
|
|
|
:type exec_id: ``str`` |
|
126
|
|
|
:rtype: (``str``, ``dict``) |
|
127
|
|
|
""" |
|
128
|
|
|
try: |
|
129
|
|
|
jitter = random.uniform(0, self._jitter) |
|
130
|
|
|
eventlet.sleep(jitter) |
|
131
|
|
|
execution = self._client.executions.get(exec_id) |
|
132
|
|
|
except mistralclient_base.APIException as mistral_exc: |
|
133
|
|
|
if 'not found' in mistral_exc.message: |
|
134
|
|
|
raise exceptions.ReferenceNotFoundError(mistral_exc.message) |
|
135
|
|
|
raise mistral_exc |
|
136
|
|
|
|
|
137
|
|
|
result = jsonify.try_loads(execution.output) if execution.state in DONE_STATES else {} |
|
138
|
|
|
|
|
139
|
|
|
result['extra'] = { |
|
140
|
|
|
'state': execution.state, |
|
141
|
|
|
'state_info': execution.state_info |
|
142
|
|
|
} |
|
143
|
|
|
|
|
144
|
|
|
return result |
|
145
|
|
|
|
|
146
|
|
|
def _get_workflow_tasks(self, exec_id, last_query_time=None): |
|
147
|
|
|
""" |
|
148
|
|
|
Returns the list of tasks for a workflow execution. |
|
149
|
|
|
:param exec_id: Mistral execution ID |
|
150
|
|
|
:type exec_id: ``str`` |
|
151
|
|
|
:param last_query_time: Timestamp to filter tasks |
|
152
|
|
|
:type last_query_time: ``str`` |
|
153
|
|
|
:rtype: ``list`` |
|
154
|
|
|
""" |
|
155
|
|
|
result = [] |
|
156
|
|
|
|
|
157
|
|
|
try: |
|
158
|
|
|
query_filters = {} |
|
159
|
|
|
|
|
160
|
|
|
if last_query_time: |
|
161
|
|
|
query_filters['updated_at'] = 'gte:%s' % last_query_time |
|
162
|
|
|
|
|
163
|
|
|
wf_tasks = self._client.tasks.list(workflow_execution_id=exec_id, **query_filters) |
|
164
|
|
|
|
|
165
|
|
|
for wf_task in wf_tasks: |
|
166
|
|
|
result.append(self._client.tasks.get(wf_task.id)) |
|
167
|
|
|
|
|
168
|
|
|
# Lets not blast requests but just space it out for better CPU profile |
|
169
|
|
|
jitter = random.uniform(0, self._jitter) |
|
170
|
|
|
eventlet.sleep(jitter) |
|
171
|
|
|
except mistralclient_base.APIException as mistral_exc: |
|
172
|
|
|
if 'not found' in mistral_exc.message: |
|
173
|
|
|
raise exceptions.ReferenceNotFoundError(mistral_exc.message) |
|
174
|
|
|
raise mistral_exc |
|
175
|
|
|
|
|
176
|
|
|
return [self._format_task_result(task=entry.to_dict()) for entry in result] |
|
177
|
|
|
|
|
178
|
|
|
def _format_task_result(self, task): |
|
179
|
|
|
""" |
|
180
|
|
|
Format task result to follow the unified workflow result format. |
|
181
|
|
|
""" |
|
182
|
|
|
result = { |
|
183
|
|
|
'id': task['id'], |
|
184
|
|
|
'name': task['name'], |
|
185
|
|
|
'workflow_execution_id': task.get('workflow_execution_id', None), |
|
186
|
|
|
'workflow_name': task['workflow_name'], |
|
187
|
|
|
'created_at': task.get('created_at', None), |
|
188
|
|
|
'updated_at': task.get('updated_at', None), |
|
189
|
|
|
'state': task.get('state', None), |
|
190
|
|
|
'state_info': task.get('state_info', None) |
|
191
|
|
|
} |
|
192
|
|
|
|
|
193
|
|
|
for attr in ['result', 'input', 'published']: |
|
194
|
|
|
result[attr] = jsonify.try_loads(task.get(attr, None)) |
|
195
|
|
|
|
|
196
|
|
|
return result |
|
197
|
|
|
|
|
198
|
|
|
def _format_query_result(self, current_result, new_wf_result, new_wf_tasks_result): |
|
199
|
|
|
result = new_wf_result |
|
200
|
|
|
|
|
201
|
|
|
new_wf_task_ids = [entry['id'] for entry in new_wf_tasks_result] |
|
202
|
|
|
|
|
203
|
|
|
old_wf_tasks_result_to_keep = [ |
|
204
|
|
|
entry for entry in current_result.get('tasks', []) |
|
205
|
|
|
if entry['id'] not in new_wf_task_ids |
|
206
|
|
|
] |
|
207
|
|
|
|
|
208
|
|
|
result['tasks'] = old_wf_tasks_result_to_keep + new_wf_tasks_result |
|
209
|
|
|
|
|
210
|
|
|
return result |
|
211
|
|
|
|
|
212
|
|
|
def _determine_execution_status(self, liveaction_db, wf_state, tasks): |
|
213
|
|
|
# Determine if liveaction is canceled or being canceled. |
|
214
|
|
|
is_action_canceled = liveaction_db.status in CANCELED_STATES |
|
215
|
|
|
|
|
216
|
|
|
# Identify the list of tasks that are not still running. |
|
217
|
|
|
active_tasks = [t for t in tasks if t['state'] in ACTIVE_STATES] |
|
218
|
|
|
|
|
219
|
|
|
# Keep the execution in running state if there are active tasks. |
|
220
|
|
|
# In certain use cases, Mistral sets the workflow state to |
|
221
|
|
|
# completion prior to task completion. |
|
222
|
|
|
if is_action_canceled and active_tasks: |
|
223
|
|
|
status = action_constants.LIVEACTION_STATUS_CANCELING |
|
224
|
|
|
elif is_action_canceled and not active_tasks and wf_state not in DONE_STATES: |
|
225
|
|
|
status = action_constants.LIVEACTION_STATUS_CANCELING |
|
226
|
|
|
elif not is_action_canceled and active_tasks and wf_state == 'CANCELLED': |
|
227
|
|
|
status = action_constants.LIVEACTION_STATUS_CANCELING |
|
228
|
|
|
elif wf_state in DONE_STATES and active_tasks: |
|
229
|
|
|
status = action_constants.LIVEACTION_STATUS_RUNNING |
|
230
|
|
|
elif wf_state in DONE_STATES and not active_tasks: |
|
231
|
|
|
status = DONE_STATES[wf_state] |
|
232
|
|
|
else: |
|
233
|
|
|
status = action_constants.LIVEACTION_STATUS_RUNNING |
|
234
|
|
|
|
|
235
|
|
|
return status |
|
236
|
|
|
|
It is generally discouraged to redefine built-ins as this makes code very hard to read.