Passed
Push — master ( 0891a6...20e5ca )
by
unknown
03:21
created

MistralResultsQuerier._get_workflow_result()   B

Complexity

Conditions 4

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
dl 0
loc 25
rs 8.5806
c 0
b 0
f 0
1
import random
2
import time
3
import uuid
4
5
from mistralclient.api import base as mistralclient_base
6
from mistralclient.api import client as mistral
7
from oslo_config import cfg
8
import eventlet
9
import retrying
10
11
from st2common.query.base import Querier
12
from st2common.constants import action as action_constants
13
from st2common.exceptions import resultstracker as exceptions
14
from st2common import log as logging
15
from st2common.util import action_db as action_utils
16
from st2common.util import jsonify
17
from st2common.util.url import get_url_without_trailing_slash
18
from st2common.util.workflow import mistral as utils
19
20
21
LOG = logging.getLogger(__name__)
22
23
DONE_STATES = {
24
    'ERROR': action_constants.LIVEACTION_STATUS_FAILED,
25
    'SUCCESS': action_constants.LIVEACTION_STATUS_SUCCEEDED,
26
    'CANCELLED': action_constants.LIVEACTION_STATUS_CANCELED
27
}
28
29
ACTIVE_STATES = {
30
    'RUNNING': action_constants.LIVEACTION_STATUS_RUNNING
31
}
32
33
CANCELED_STATES = [
34
    action_constants.LIVEACTION_STATUS_CANCELED,
35
    action_constants.LIVEACTION_STATUS_CANCELING
36
]
37
38
39
def get_instance():
40
    return MistralResultsQuerier(str(uuid.uuid4()))
41
42
43
class MistralResultsQuerier(Querier):
44
    delete_state_object_on_error = False
45
46
    def __init__(self, id, *args, **kwargs):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
47
        super(MistralResultsQuerier, self).__init__(*args, **kwargs)
48
        self._base_url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
49
        self._client = mistral.client(
50
            mistral_url=self._base_url,
51
            username=cfg.CONF.mistral.keystone_username,
52
            api_key=cfg.CONF.mistral.keystone_password,
53
            project_name=cfg.CONF.mistral.keystone_project_name,
54
            auth_url=cfg.CONF.mistral.keystone_auth_url,
55
            cacert=cfg.CONF.mistral.cacert,
56
            insecure=cfg.CONF.mistral.insecure)
57
        self._jitter = cfg.CONF.mistral.jitter_interval
58
59
    @retrying.retry(
60
        retry_on_exception=utils.retry_on_exceptions,
61
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
62
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
63
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
64
    def query(self, execution_id, query_context, last_query_time=None):
65
        """
66
        Queries mistral for workflow results using v2 APIs.
67
        :param execution_id: st2 execution_id (context to be used for logging/audit)
68
        :type execution_id: ``str``
69
        :param query_context: context for the query to be made to mistral. This contains mistral
70
                              execution id.
71
        :type query_context: ``object``
72
        :param last_query_time: Timestamp of last query.
73
        :type last_query_time: ``float``
74
        :rtype: (``str``, ``object``)
75
        """
76
        dt_last_query_time = (
77
            time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(last_query_time))
78
            if last_query_time else None
79
        )
80
81
        liveaction_db = action_utils.get_liveaction_by_id(execution_id)
82
83
        mistral_exec_id = query_context.get('mistral', {}).get('execution_id', None)
84
        if not mistral_exec_id:
85
            raise Exception('[%s] Missing mistral workflow execution ID in query context. %s',
86
                            execution_id, query_context)
87
88
        try:
89
            wf_result = self._get_workflow_result(mistral_exec_id)
90
91
            wf_tasks_result = self._get_workflow_tasks(
92
                mistral_exec_id,
93
                last_query_time=dt_last_query_time
94
            )
95
96
            result = self._format_query_result(
97
                liveaction_db.result,
98
                wf_result,
99
                wf_tasks_result
100
            )
101
        except exceptions.ReferenceNotFoundError as exc:
102
            LOG.exception('[%s] Unable to find reference.', execution_id)
103
            return (action_constants.LIVEACTION_STATUS_FAILED, exc.message)
104
        except Exception:
105
            LOG.exception('[%s] Unable to fetch mistral workflow result and tasks. %s',
106
                          execution_id, query_context)
107
            raise
108
109
        status = self._determine_execution_status(
110
            liveaction_db,
111
            result['extra']['state'],
112
            result['tasks']
113
        )
114
115
        LOG.debug('[%s] mistral workflow execution status: %s' % (execution_id, status))
116
        LOG.debug('[%s] mistral workflow execution result: %s' % (execution_id, result))
117
118
        return (status, result)
119
120
    def _get_workflow_result(self, exec_id):
121
        """
122
        Returns the workflow status and output. Mistral workflow status will be converted
123
        to st2 action status.
124
        :param exec_id: Mistral execution ID
125
        :type exec_id: ``str``
126
        :rtype: (``str``, ``dict``)
127
        """
128
        try:
129
            jitter = random.uniform(0, self._jitter)
130
            eventlet.sleep(jitter)
131
            execution = self._client.executions.get(exec_id)
132
        except mistralclient_base.APIException as mistral_exc:
133
            if 'not found' in mistral_exc.message:
134
                raise exceptions.ReferenceNotFoundError(mistral_exc.message)
135
            raise mistral_exc
136
137
        result = jsonify.try_loads(execution.output) if execution.state in DONE_STATES else {}
138
139
        result['extra'] = {
140
            'state': execution.state,
141
            'state_info': execution.state_info
142
        }
143
144
        return result
145
146
    def _get_workflow_tasks(self, exec_id, last_query_time=None):
147
        """
148
        Returns the list of tasks for a workflow execution.
149
        :param exec_id: Mistral execution ID
150
        :type exec_id: ``str``
151
        :param last_query_time: Timestamp to filter tasks
152
        :type last_query_time: ``str``
153
        :rtype: ``list``
154
        """
155
        result = []
156
157
        try:
158
            query_filters = {}
159
160
            if last_query_time:
161
                query_filters['updated_at'] = 'gte:%s' % last_query_time
162
163
            wf_tasks = self._client.tasks.list(workflow_execution_id=exec_id, **query_filters)
164
165
            for wf_task in wf_tasks:
166
                result.append(self._client.tasks.get(wf_task.id))
167
168
                # Lets not blast requests but just space it out for better CPU profile
169
                jitter = random.uniform(0, self._jitter)
170
                eventlet.sleep(jitter)
171
        except mistralclient_base.APIException as mistral_exc:
172
            if 'not found' in mistral_exc.message:
173
                raise exceptions.ReferenceNotFoundError(mistral_exc.message)
174
            raise mistral_exc
175
176
        return [self._format_task_result(task=entry.to_dict()) for entry in result]
177
178
    def _format_task_result(self, task):
179
        """
180
        Format task result to follow the unified workflow result format.
181
        """
182
        result = {
183
            'id': task['id'],
184
            'name': task['name'],
185
            'workflow_execution_id': task.get('workflow_execution_id', None),
186
            'workflow_name': task['workflow_name'],
187
            'created_at': task.get('created_at', None),
188
            'updated_at': task.get('updated_at', None),
189
            'state': task.get('state', None),
190
            'state_info': task.get('state_info', None)
191
        }
192
193
        for attr in ['result', 'input', 'published']:
194
            result[attr] = jsonify.try_loads(task.get(attr, None))
195
196
        return result
197
198
    def _format_query_result(self, current_result, new_wf_result, new_wf_tasks_result):
199
        result = new_wf_result
200
201
        new_wf_task_ids = [entry['id'] for entry in new_wf_tasks_result]
202
203
        old_wf_tasks_result_to_keep = [
204
            entry for entry in current_result.get('tasks', [])
205
            if entry['id'] not in new_wf_task_ids
206
        ]
207
208
        result['tasks'] = old_wf_tasks_result_to_keep + new_wf_tasks_result
209
210
        return result
211
212
    def _determine_execution_status(self, liveaction_db, wf_state, tasks):
213
        # Determine if liveaction is canceled or being canceled.
214
        is_action_canceled = liveaction_db.status in CANCELED_STATES
215
216
        # Identify the list of tasks that are not still running.
217
        active_tasks = [t for t in tasks if t['state'] in ACTIVE_STATES]
218
219
        # Keep the execution in running state if there are active tasks.
220
        # In certain use cases, Mistral sets the workflow state to
221
        # completion prior to task completion.
222
        if is_action_canceled and active_tasks:
223
            status = action_constants.LIVEACTION_STATUS_CANCELING
224
        elif is_action_canceled and not active_tasks and wf_state not in DONE_STATES:
225
            status = action_constants.LIVEACTION_STATUS_CANCELING
226
        elif not is_action_canceled and active_tasks and wf_state == 'CANCELLED':
227
            status = action_constants.LIVEACTION_STATUS_CANCELING
228
        elif wf_state in DONE_STATES and active_tasks:
229
            status = action_constants.LIVEACTION_STATUS_RUNNING
230
        elif wf_state in DONE_STATES and not active_tasks:
231
            status = DONE_STATES[wf_state]
232
        else:
233
            status = action_constants.LIVEACTION_STATUS_RUNNING
234
235
        return status
236