Passed
Pull Request — master (#3507)
by W
05:41
created

MistralResultsQuerier.query()   B

Complexity

Conditions 5

Size

Total Lines 65

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
dl 0
loc 65
rs 8.2662
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
import random
2
import time
3
import uuid
4
5
from mistralclient.api import base as mistralclient_base
6
from mistralclient.api import client as mistral
7
from oslo_config import cfg
8
import eventlet
9
import retrying
10
11
from st2common.query.base import Querier
12
from st2common.constants import action as action_constants
13
from st2common.exceptions import resultstracker as exceptions
14
from st2common import log as logging
15
from st2common.persistence.execution import ActionExecution
16
from st2common.util import action_db as action_utils
17
from st2common.util import jsonify
18
from st2common.util.url import get_url_without_trailing_slash
19
from st2common.util.workflow import mistral as utils
20
21
22
LOG = logging.getLogger(__name__)
23
24
DONE_STATES = {
25
    'ERROR': action_constants.LIVEACTION_STATUS_FAILED,
26
    'SUCCESS': action_constants.LIVEACTION_STATUS_SUCCEEDED,
27
    'CANCELLED': action_constants.LIVEACTION_STATUS_CANCELED,
28
    'PAUSED': action_constants.LIVEACTION_STATUS_PAUSED
29
}
30
31
ACTIVE_STATES = {
32
    'RUNNING': action_constants.LIVEACTION_STATUS_RUNNING
33
}
34
35
CANCELED_STATES = [
36
    action_constants.LIVEACTION_STATUS_CANCELED,
37
    action_constants.LIVEACTION_STATUS_CANCELING
38
]
39
40
PAUSED_STATES = [
41
    action_constants.LIVEACTION_STATUS_PAUSED,
42
    action_constants.LIVEACTION_STATUS_PAUSING
43
]
44
45
RESUMING_STATES = [
46
    action_constants.LIVEACTION_STATUS_RESUMING
47
]
48
49
50
def get_instance():
51
    return MistralResultsQuerier(str(uuid.uuid4()))
52
53
54
class MistralResultsQuerier(Querier):
55
    delete_state_object_on_error = False
56
57
    def __init__(self, id, *args, **kwargs):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
58
        super(MistralResultsQuerier, self).__init__(*args, **kwargs)
59
        self._base_url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
60
        self._client = mistral.client(
61
            mistral_url=self._base_url,
62
            username=cfg.CONF.mistral.keystone_username,
63
            api_key=cfg.CONF.mistral.keystone_password,
64
            project_name=cfg.CONF.mistral.keystone_project_name,
65
            auth_url=cfg.CONF.mistral.keystone_auth_url,
66
            cacert=cfg.CONF.mistral.cacert,
67
            insecure=cfg.CONF.mistral.insecure)
68
        self._jitter = cfg.CONF.mistral.jitter_interval
69
70
    @retrying.retry(
71
        retry_on_exception=utils.retry_on_exceptions,
72
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
73
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
74
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
75
    def query(self, execution_id, query_context, last_query_time=None):
76
        """
77
        Queries mistral for workflow results using v2 APIs.
78
        :param execution_id: st2 execution_id (context to be used for logging/audit)
79
        :type execution_id: ``str``
80
        :param query_context: context for the query to be made to mistral. This contains mistral
81
                              execution id.
82
        :type query_context: ``object``
83
        :param last_query_time: Timestamp of last query.
84
        :type last_query_time: ``float``
85
        :rtype: (``str``, ``object``)
86
        """
87
        dt_last_query_time = (
88
            time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(last_query_time))
89
            if last_query_time else None
90
        )
91
92
        # Retrieve liveaction_db to append new result to existing result.
93
        liveaction_db = action_utils.get_liveaction_by_id(execution_id)
94
95
        mistral_exec_id = query_context.get('mistral', {}).get('execution_id', None)
96
        if not mistral_exec_id:
97
            raise Exception('[%s] Missing mistral workflow execution ID in query context. %s',
98
                            execution_id, query_context)
99
100
        try:
101
            wf_result = self._get_workflow_result(mistral_exec_id)
102
103
            wf_tasks_result = self._get_workflow_tasks(
104
                mistral_exec_id,
105
                last_query_time=dt_last_query_time
106
            )
107
108
            result = self._format_query_result(
109
                liveaction_db.result,
110
                wf_result,
111
                wf_tasks_result
112
            )
113
        except exceptions.ReferenceNotFoundError as exc:
114
            LOG.exception('[%s] Unable to find reference.', execution_id)
115
            return (action_constants.LIVEACTION_STATUS_FAILED, exc.message)
116
        except Exception:
117
            LOG.exception('[%s] Unable to fetch mistral workflow result and tasks. %s',
118
                          execution_id, query_context)
119
            raise
120
121
        # Retrieve liveaction_db again in case state has changed
122
        # while the querier get results from mistral API above.
123
        liveaction_db = action_utils.get_liveaction_by_id(execution_id)
124
125
        status = self._determine_execution_status(
126
            liveaction_db,
127
            result['extra']['state'],
128
            result['tasks']
129
        )
130
131
        LOG.debug('[%s] mistral workflow execution status: %s' % (execution_id, status))
132
        LOG.debug('[%s] mistral workflow execution result: %s' % (execution_id, result))
133
134
        return (status, result)
135
136
    def _get_workflow_result(self, exec_id):
137
        """
138
        Returns the workflow status and output. Mistral workflow status will be converted
139
        to st2 action status.
140
        :param exec_id: Mistral execution ID
141
        :type exec_id: ``str``
142
        :rtype: (``str``, ``dict``)
143
        """
144
        try:
145
            jitter = random.uniform(0, self._jitter)
146
            eventlet.sleep(jitter)
147
            execution = self._client.executions.get(exec_id)
148
        except mistralclient_base.APIException as mistral_exc:
149
            if 'not found' in mistral_exc.message:
150
                raise exceptions.ReferenceNotFoundError(mistral_exc.message)
151
            raise mistral_exc
152
153
        result = jsonify.try_loads(execution.output) if execution.state in DONE_STATES else {}
154
155
        result['extra'] = {
156
            'state': execution.state,
157
            'state_info': execution.state_info
158
        }
159
160
        return result
161
162
    def _get_workflow_tasks(self, exec_id, last_query_time=None):
163
        """
164
        Returns the list of tasks for a workflow execution.
165
        :param exec_id: Mistral execution ID
166
        :type exec_id: ``str``
167
        :param last_query_time: Timestamp to filter tasks
168
        :type last_query_time: ``str``
169
        :rtype: ``list``
170
        """
171
        result = []
172
173
        try:
174
            wf_tasks = self._client.tasks.list(workflow_execution_id=exec_id)
175
176
            if last_query_time:
177
                wf_tasks = [
178
                    t for t in wf_tasks
179
                    if ((t.created_at is not None and t.created_at >= last_query_time) or
180
                        (t.updated_at is not None and t.updated_at >= last_query_time))
181
                ]
182
183
            for wf_task in wf_tasks:
184
                result.append(self._client.tasks.get(wf_task.id))
185
186
                # Lets not blast requests but just space it out for better CPU profile
187
                jitter = random.uniform(0, self._jitter)
188
                eventlet.sleep(jitter)
189
        except mistralclient_base.APIException as mistral_exc:
190
            if 'not found' in mistral_exc.message:
191
                raise exceptions.ReferenceNotFoundError(mistral_exc.message)
192
            raise mistral_exc
193
194
        return [self._format_task_result(task=entry.to_dict()) for entry in result]
195
196
    def _format_task_result(self, task):
197
        """
198
        Format task result to follow the unified workflow result format.
199
        """
200
        result = {
201
            'id': task['id'],
202
            'name': task['name'],
203
            'workflow_execution_id': task.get('workflow_execution_id', None),
204
            'workflow_name': task['workflow_name'],
205
            'created_at': task.get('created_at', None),
206
            'updated_at': task.get('updated_at', None),
207
            'state': task.get('state', None),
208
            'state_info': task.get('state_info', None)
209
        }
210
211
        for attr in ['result', 'input', 'published']:
212
            result[attr] = jsonify.try_loads(task.get(attr, None))
213
214
        return result
215
216
    def _format_query_result(self, current_result, new_wf_result, new_wf_tasks_result):
217
        result = new_wf_result
218
219
        new_wf_task_ids = [entry['id'] for entry in new_wf_tasks_result]
220
221
        old_wf_tasks_result_to_keep = [
222
            entry for entry in current_result.get('tasks', [])
223
            if entry['id'] not in new_wf_task_ids
224
        ]
225
226
        result['tasks'] = old_wf_tasks_result_to_keep + new_wf_tasks_result
227
228
        return result
229
230
    def _has_active_tasks(self, liveaction_db, mistral_tasks):
231
        # Identify if there are any active tasks in Mistral.
232
        active_mistral_tasks = len([t for t in mistral_tasks if t['state'] in ACTIVE_STATES]) > 0
233
234
        active_st2_tasks = False
235
        execution = ActionExecution.get(liveaction__id=str(liveaction_db.id))
236
237
        for child_exec_id in execution.children:
238
            child_exec = ActionExecution.get(id=child_exec_id)
239
240
            if (child_exec.status not in action_constants.LIVEACTION_COMPLETED_STATES and
241
                    child_exec.status != action_constants.LIVEACTION_STATUS_PAUSED):
242
                active_st2_tasks = True
243
                break
244
245
        if active_mistral_tasks:
246
            LOG.info('There are active mistral tasks for %s.', str(liveaction_db.id))
247
248
        if active_st2_tasks:
249
            LOG.info('There are active st2 tasks for %s.', str(liveaction_db.id))
250
251
        return active_mistral_tasks or active_st2_tasks
252
253
    def _determine_execution_status(self, liveaction_db, wf_state, tasks):
254
        # Determine if liveaction is being canceled, paused, or resumed.
255
        is_action_canceled = liveaction_db.status in CANCELED_STATES
256
        is_action_paused = liveaction_db.status in PAUSED_STATES
257
        is_action_resuming = liveaction_db.status in RESUMING_STATES
258
259
        # Identify the list of tasks that are still running or pausing.
260
        active_tasks = self._has_active_tasks(liveaction_db, tasks)
261
262
        # Keep the execution in running state if there are active tasks.
263
        # In certain use cases, Mistral sets the workflow state to
264
        # completion prior to task completion.
265
        if is_action_canceled and active_tasks:
266
            status = action_constants.LIVEACTION_STATUS_CANCELING
267
        elif is_action_canceled and not active_tasks and wf_state not in DONE_STATES:
268
            status = action_constants.LIVEACTION_STATUS_CANCELING
269
        elif not is_action_canceled and active_tasks and wf_state == 'CANCELLED':
270
            status = action_constants.LIVEACTION_STATUS_CANCELING
271
        elif is_action_paused and active_tasks:
272
            status = action_constants.LIVEACTION_STATUS_PAUSING
273
        elif is_action_paused and not active_tasks and wf_state not in DONE_STATES:
274
            status = action_constants.LIVEACTION_STATUS_PAUSING
275
        elif not is_action_paused and active_tasks and wf_state == 'PAUSED':
276
            status = action_constants.LIVEACTION_STATUS_PAUSING
277
        elif is_action_resuming and wf_state == 'PAUSED':
278
            status = action_constants.LIVEACTION_STATUS_RESUMING
279
        elif wf_state in DONE_STATES and active_tasks:
280
            status = action_constants.LIVEACTION_STATUS_RUNNING
281
        elif wf_state in DONE_STATES and not active_tasks:
282
            status = DONE_STATES[wf_state]
283
        else:
284
            status = action_constants.LIVEACTION_STATUS_RUNNING
285
286
        return status
287