1
|
|
|
import random |
2
|
|
|
import time |
3
|
|
|
import uuid |
4
|
|
|
|
5
|
|
|
from mistralclient.api import base as mistralclient_base |
6
|
|
|
from mistralclient.api import client as mistral |
7
|
|
|
from oslo_config import cfg |
8
|
|
|
import eventlet |
9
|
|
|
import retrying |
10
|
|
|
|
11
|
|
|
from st2common.query.base import Querier |
12
|
|
|
from st2common.constants import action as action_constants |
13
|
|
|
from st2common.exceptions import resultstracker as exceptions |
14
|
|
|
from st2common import log as logging |
15
|
|
|
from st2common.util import action_db as action_utils |
16
|
|
|
from st2common.util import jsonify |
17
|
|
|
from st2common.util.url import get_url_without_trailing_slash |
18
|
|
|
from st2common.util.workflow import mistral as utils |
19
|
|
|
|
20
|
|
|
|
21
|
|
|
LOG = logging.getLogger(__name__) |
22
|
|
|
|
23
|
|
|
DONE_STATES = { |
24
|
|
|
'ERROR': action_constants.LIVEACTION_STATUS_FAILED, |
25
|
|
|
'SUCCESS': action_constants.LIVEACTION_STATUS_SUCCEEDED, |
26
|
|
|
'CANCELLED': action_constants.LIVEACTION_STATUS_CANCELED |
27
|
|
|
} |
28
|
|
|
|
29
|
|
|
ACTIVE_STATES = { |
30
|
|
|
'RUNNING': action_constants.LIVEACTION_STATUS_RUNNING |
31
|
|
|
} |
32
|
|
|
|
33
|
|
|
CANCELED_STATES = [ |
34
|
|
|
action_constants.LIVEACTION_STATUS_CANCELED, |
35
|
|
|
action_constants.LIVEACTION_STATUS_CANCELING |
36
|
|
|
] |
37
|
|
|
|
38
|
|
|
|
39
|
|
|
def get_instance(): |
40
|
|
|
return MistralResultsQuerier(str(uuid.uuid4())) |
41
|
|
|
|
42
|
|
|
|
43
|
|
|
class MistralResultsQuerier(Querier): |
44
|
|
|
delete_state_object_on_error = False |
45
|
|
|
|
46
|
|
|
def __init__(self, id, *args, **kwargs): |
|
|
|
|
47
|
|
|
super(MistralResultsQuerier, self).__init__(*args, **kwargs) |
48
|
|
|
self._base_url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url) |
49
|
|
|
self._client = mistral.client( |
50
|
|
|
mistral_url=self._base_url, |
51
|
|
|
username=cfg.CONF.mistral.keystone_username, |
52
|
|
|
api_key=cfg.CONF.mistral.keystone_password, |
53
|
|
|
project_name=cfg.CONF.mistral.keystone_project_name, |
54
|
|
|
auth_url=cfg.CONF.mistral.keystone_auth_url, |
55
|
|
|
cacert=cfg.CONF.mistral.cacert, |
56
|
|
|
insecure=cfg.CONF.mistral.insecure) |
57
|
|
|
self._jitter = cfg.CONF.mistral.jitter_interval |
58
|
|
|
|
59
|
|
|
@retrying.retry( |
60
|
|
|
retry_on_exception=utils.retry_on_exceptions, |
61
|
|
|
wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec, |
62
|
|
|
wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec, |
63
|
|
|
stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec) |
64
|
|
|
def query(self, execution_id, query_context, last_query_time=None): |
65
|
|
|
""" |
66
|
|
|
Queries mistral for workflow results using v2 APIs. |
67
|
|
|
:param execution_id: st2 execution_id (context to be used for logging/audit) |
68
|
|
|
:type execution_id: ``str`` |
69
|
|
|
:param query_context: context for the query to be made to mistral. This contains mistral |
70
|
|
|
execution id. |
71
|
|
|
:type query_context: ``object`` |
72
|
|
|
:param last_query_time: Timestamp of last query. |
73
|
|
|
:type last_query_time: ``float`` |
74
|
|
|
:rtype: (``str``, ``object``) |
75
|
|
|
""" |
76
|
|
|
dt_last_query_time = ( |
77
|
|
|
time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(last_query_time)) |
78
|
|
|
if last_query_time else None |
79
|
|
|
) |
80
|
|
|
|
81
|
|
|
liveaction_db = action_utils.get_liveaction_by_id(execution_id) |
82
|
|
|
|
83
|
|
|
mistral_exec_id = query_context.get('mistral', {}).get('execution_id', None) |
84
|
|
|
if not mistral_exec_id: |
85
|
|
|
raise Exception('[%s] Missing mistral workflow execution ID in query context. %s', |
86
|
|
|
execution_id, query_context) |
87
|
|
|
|
88
|
|
|
try: |
89
|
|
|
wf_result = self._get_workflow_result(mistral_exec_id) |
90
|
|
|
|
91
|
|
|
wf_tasks_result = self._get_workflow_tasks( |
92
|
|
|
mistral_exec_id, |
93
|
|
|
last_query_time=dt_last_query_time |
94
|
|
|
) |
95
|
|
|
|
96
|
|
|
result = self._format_query_result( |
97
|
|
|
liveaction_db.result, |
98
|
|
|
wf_result, |
99
|
|
|
wf_tasks_result |
100
|
|
|
) |
101
|
|
|
except exceptions.ReferenceNotFoundError as exc: |
102
|
|
|
LOG.exception('[%s] Unable to find reference.', execution_id) |
103
|
|
|
return (action_constants.LIVEACTION_STATUS_FAILED, exc.message) |
104
|
|
|
except Exception: |
105
|
|
|
LOG.exception('[%s] Unable to fetch mistral workflow result and tasks. %s', |
106
|
|
|
execution_id, query_context) |
107
|
|
|
raise |
108
|
|
|
|
109
|
|
|
status = self._determine_execution_status( |
110
|
|
|
liveaction_db, |
111
|
|
|
result['extra']['state'], |
112
|
|
|
result['tasks'] |
113
|
|
|
) |
114
|
|
|
|
115
|
|
|
LOG.debug('[%s] mistral workflow execution status: %s' % (execution_id, status)) |
116
|
|
|
LOG.debug('[%s] mistral workflow execution result: %s' % (execution_id, result)) |
117
|
|
|
|
118
|
|
|
return (status, result) |
119
|
|
|
|
120
|
|
|
def _get_workflow_result(self, exec_id): |
121
|
|
|
""" |
122
|
|
|
Returns the workflow status and output. Mistral workflow status will be converted |
123
|
|
|
to st2 action status. |
124
|
|
|
:param exec_id: Mistral execution ID |
125
|
|
|
:type exec_id: ``str`` |
126
|
|
|
:rtype: (``str``, ``dict``) |
127
|
|
|
""" |
128
|
|
|
try: |
129
|
|
|
jitter = random.uniform(0, self._jitter) |
130
|
|
|
eventlet.sleep(jitter) |
131
|
|
|
execution = self._client.executions.get(exec_id) |
132
|
|
|
except mistralclient_base.APIException as mistral_exc: |
133
|
|
|
if 'not found' in mistral_exc.message: |
134
|
|
|
raise exceptions.ReferenceNotFoundError(mistral_exc.message) |
135
|
|
|
raise mistral_exc |
136
|
|
|
|
137
|
|
|
result = jsonify.try_loads(execution.output) if execution.state in DONE_STATES else {} |
138
|
|
|
|
139
|
|
|
result['extra'] = { |
140
|
|
|
'state': execution.state, |
141
|
|
|
'state_info': execution.state_info |
142
|
|
|
} |
143
|
|
|
|
144
|
|
|
return result |
145
|
|
|
|
146
|
|
|
def _get_workflow_tasks(self, exec_id, last_query_time=None): |
147
|
|
|
""" |
148
|
|
|
Returns the list of tasks for a workflow execution. |
149
|
|
|
:param exec_id: Mistral execution ID |
150
|
|
|
:type exec_id: ``str`` |
151
|
|
|
:param last_query_time: Timestamp to filter tasks |
152
|
|
|
:type last_query_time: ``str`` |
153
|
|
|
:rtype: ``list`` |
154
|
|
|
""" |
155
|
|
|
result = [] |
156
|
|
|
|
157
|
|
|
try: |
158
|
|
|
query_filters = {} |
159
|
|
|
|
160
|
|
|
if last_query_time: |
161
|
|
|
query_filters['updated_at'] = 'gte:%s' % last_query_time |
162
|
|
|
|
163
|
|
|
wf_tasks = self._client.tasks.list(workflow_execution_id=exec_id, **query_filters) |
164
|
|
|
|
165
|
|
|
for wf_task in wf_tasks: |
166
|
|
|
result.append(self._client.tasks.get(wf_task.id)) |
167
|
|
|
|
168
|
|
|
# Lets not blast requests but just space it out for better CPU profile |
169
|
|
|
jitter = random.uniform(0, self._jitter) |
170
|
|
|
eventlet.sleep(jitter) |
171
|
|
|
except mistralclient_base.APIException as mistral_exc: |
172
|
|
|
if 'not found' in mistral_exc.message: |
173
|
|
|
raise exceptions.ReferenceNotFoundError(mistral_exc.message) |
174
|
|
|
raise mistral_exc |
175
|
|
|
|
176
|
|
|
return [self._format_task_result(task=entry.to_dict()) for entry in result] |
177
|
|
|
|
178
|
|
|
def _format_task_result(self, task): |
179
|
|
|
""" |
180
|
|
|
Format task result to follow the unified workflow result format. |
181
|
|
|
""" |
182
|
|
|
result = { |
183
|
|
|
'id': task['id'], |
184
|
|
|
'name': task['name'], |
185
|
|
|
'workflow_execution_id': task.get('workflow_execution_id', None), |
186
|
|
|
'workflow_name': task['workflow_name'], |
187
|
|
|
'created_at': task.get('created_at', None), |
188
|
|
|
'updated_at': task.get('updated_at', None), |
189
|
|
|
'state': task.get('state', None), |
190
|
|
|
'state_info': task.get('state_info', None) |
191
|
|
|
} |
192
|
|
|
|
193
|
|
|
for attr in ['result', 'input', 'published']: |
194
|
|
|
result[attr] = jsonify.try_loads(task.get(attr, None)) |
195
|
|
|
|
196
|
|
|
return result |
197
|
|
|
|
198
|
|
|
def _format_query_result(self, current_result, new_wf_result, new_wf_tasks_result): |
199
|
|
|
result = new_wf_result |
200
|
|
|
|
201
|
|
|
new_wf_task_ids = [entry['id'] for entry in new_wf_tasks_result] |
202
|
|
|
|
203
|
|
|
old_wf_tasks_result_to_keep = [ |
204
|
|
|
entry for entry in current_result.get('tasks', []) |
205
|
|
|
if entry['id'] not in new_wf_task_ids |
206
|
|
|
] |
207
|
|
|
|
208
|
|
|
result['tasks'] = old_wf_tasks_result_to_keep + new_wf_tasks_result |
209
|
|
|
|
210
|
|
|
return result |
211
|
|
|
|
212
|
|
|
def _determine_execution_status(self, liveaction_db, wf_state, tasks): |
213
|
|
|
# Determine if liveaction is canceled or being canceled. |
214
|
|
|
is_action_canceled = liveaction_db.status in CANCELED_STATES |
215
|
|
|
|
216
|
|
|
# Identify the list of tasks that are not still running. |
217
|
|
|
active_tasks = [t for t in tasks if t['state'] in ACTIVE_STATES] |
218
|
|
|
|
219
|
|
|
# Keep the execution in running state if there are active tasks. |
220
|
|
|
# In certain use cases, Mistral sets the workflow state to |
221
|
|
|
# completion prior to task completion. |
222
|
|
|
if is_action_canceled and active_tasks: |
223
|
|
|
status = action_constants.LIVEACTION_STATUS_CANCELING |
224
|
|
|
elif is_action_canceled and not active_tasks and wf_state not in DONE_STATES: |
225
|
|
|
status = action_constants.LIVEACTION_STATUS_CANCELING |
226
|
|
|
elif not is_action_canceled and active_tasks and wf_state == 'CANCELLED': |
227
|
|
|
status = action_constants.LIVEACTION_STATUS_CANCELING |
228
|
|
|
elif wf_state in DONE_STATES and active_tasks: |
229
|
|
|
status = action_constants.LIVEACTION_STATUS_RUNNING |
230
|
|
|
elif wf_state in DONE_STATES and not active_tasks: |
231
|
|
|
status = DONE_STATES[wf_state] |
232
|
|
|
else: |
233
|
|
|
status = action_constants.LIVEACTION_STATUS_RUNNING |
234
|
|
|
|
235
|
|
|
return status |
236
|
|
|
|
It is generally discouraged to redefine built-ins as this makes code very hard to read.