Passed
Pull Request — master (#3507)
by W
04:31
created

MistralResultsQuerier._get_workflow_tasks()   F

Complexity

Conditions 11

Size

Total Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 11
dl 0
loc 33
rs 3.1764
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like MistralResultsQuerier._get_workflow_tasks() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import random
2
import time
3
import uuid
4
5
from mistralclient.api import base as mistralclient_base
6
from mistralclient.api import client as mistral
7
from oslo_config import cfg
8
import eventlet
9
import retrying
10
11
from st2common.query.base import Querier
12
from st2common.constants import action as action_constants
13
from st2common.exceptions import resultstracker as exceptions
14
from st2common import log as logging
15
from st2common.util import action_db as action_utils
16
from st2common.util import jsonify
17
from st2common.util.url import get_url_without_trailing_slash
18
from st2common.util.workflow import mistral as utils
19
20
21
LOG = logging.getLogger(__name__)
22
23
DONE_STATES = {
24
    'ERROR': action_constants.LIVEACTION_STATUS_FAILED,
25
    'SUCCESS': action_constants.LIVEACTION_STATUS_SUCCEEDED,
26
    'CANCELLED': action_constants.LIVEACTION_STATUS_CANCELED,
27
    'PAUSED': action_constants.LIVEACTION_STATUS_PAUSED
28
}
29
30
ACTIVE_STATES = {
31
    'RUNNING': action_constants.LIVEACTION_STATUS_RUNNING
32
}
33
34
CANCELED_STATES = [
35
    action_constants.LIVEACTION_STATUS_CANCELED,
36
    action_constants.LIVEACTION_STATUS_CANCELING
37
]
38
39
PAUSED_STATES = [
40
    action_constants.LIVEACTION_STATUS_PAUSED,
41
    action_constants.LIVEACTION_STATUS_PAUSING
42
]
43
44
RESUMING_STATES = [
45
    action_constants.LIVEACTION_STATUS_RESUMING
46
]
47
48
49
def get_instance():
50
    return MistralResultsQuerier(str(uuid.uuid4()))
51
52
53
class MistralResultsQuerier(Querier):
54
    delete_state_object_on_error = False
55
56
    def __init__(self, id, *args, **kwargs):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
57
        super(MistralResultsQuerier, self).__init__(*args, **kwargs)
58
        self._base_url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
59
        self._client = mistral.client(
60
            mistral_url=self._base_url,
61
            username=cfg.CONF.mistral.keystone_username,
62
            api_key=cfg.CONF.mistral.keystone_password,
63
            project_name=cfg.CONF.mistral.keystone_project_name,
64
            auth_url=cfg.CONF.mistral.keystone_auth_url,
65
            cacert=cfg.CONF.mistral.cacert,
66
            insecure=cfg.CONF.mistral.insecure)
67
        self._jitter = cfg.CONF.mistral.jitter_interval
68
69
    @retrying.retry(
70
        retry_on_exception=utils.retry_on_exceptions,
71
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
72
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
73
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
74
    def query(self, execution_id, query_context, last_query_time=None):
75
        """
76
        Queries mistral for workflow results using v2 APIs.
77
        :param execution_id: st2 execution_id (context to be used for logging/audit)
78
        :type execution_id: ``str``
79
        :param query_context: context for the query to be made to mistral. This contains mistral
80
                              execution id.
81
        :type query_context: ``object``
82
        :param last_query_time: Timestamp of last query.
83
        :type last_query_time: ``float``
84
        :rtype: (``str``, ``object``)
85
        """
86
        dt_last_query_time = (
87
            time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(last_query_time))
88
            if last_query_time else None
89
        )
90
91
        # Retrieve liveaction_db to append new result to existing result.
92
        liveaction_db = action_utils.get_liveaction_by_id(execution_id)
93
94
        mistral_exec_id = query_context.get('mistral', {}).get('execution_id', None)
95
        if not mistral_exec_id:
96
            raise Exception('[%s] Missing mistral workflow execution ID in query context. %s',
97
                            execution_id, query_context)
98
99
        try:
100
            wf_result = self._get_workflow_result(mistral_exec_id)
101
102
            wf_tasks_result = self._get_workflow_tasks(
103
                mistral_exec_id,
104
                last_query_time=dt_last_query_time
105
            )
106
107
            result = self._format_query_result(
108
                liveaction_db.result,
109
                wf_result,
110
                wf_tasks_result
111
            )
112
        except exceptions.ReferenceNotFoundError as exc:
113
            LOG.exception('[%s] Unable to find reference.', execution_id)
114
            return (action_constants.LIVEACTION_STATUS_FAILED, exc.message)
115
        except Exception:
116
            LOG.exception('[%s] Unable to fetch mistral workflow result and tasks. %s',
117
                          execution_id, query_context)
118
            raise
119
120
        # Retrieve liveaction_db again in case state has changed
121
        # while the querier get results from mistral API above.
122
        liveaction_db = action_utils.get_liveaction_by_id(execution_id)
123
124
        status = self._determine_execution_status(
125
            liveaction_db,
126
            result['extra']['state'],
127
            result['tasks']
128
        )
129
130
        LOG.debug('[%s] mistral workflow execution status: %s' % (execution_id, status))
131
        LOG.debug('[%s] mistral workflow execution result: %s' % (execution_id, result))
132
133
        return (status, result)
134
135
    def _get_workflow_result(self, exec_id):
136
        """
137
        Returns the workflow status and output. Mistral workflow status will be converted
138
        to st2 action status.
139
        :param exec_id: Mistral execution ID
140
        :type exec_id: ``str``
141
        :rtype: (``str``, ``dict``)
142
        """
143
        try:
144
            jitter = random.uniform(0, self._jitter)
145
            eventlet.sleep(jitter)
146
            execution = self._client.executions.get(exec_id)
147
        except mistralclient_base.APIException as mistral_exc:
148
            if 'not found' in mistral_exc.message:
149
                raise exceptions.ReferenceNotFoundError(mistral_exc.message)
150
            raise mistral_exc
151
152
        result = jsonify.try_loads(execution.output) if execution.state in DONE_STATES else {}
153
154
        result['extra'] = {
155
            'state': execution.state,
156
            'state_info': execution.state_info
157
        }
158
159
        return result
160
161
    def _get_workflow_tasks(self, exec_id, last_query_time=None):
162
        """
163
        Returns the list of tasks for a workflow execution.
164
        :param exec_id: Mistral execution ID
165
        :type exec_id: ``str``
166
        :param last_query_time: Timestamp to filter tasks
167
        :type last_query_time: ``str``
168
        :rtype: ``list``
169
        """
170
        result = []
171
172
        try:
173
            wf_tasks = self._client.tasks.list(workflow_execution_id=exec_id)
174
175
            if last_query_time:
176
                wf_tasks = [
177
                    t for t in wf_tasks
178
                    if ((t.created_at is not None and t.created_at >= last_query_time) or
179
                        (t.updated_at is not None and t.updated_at >= last_query_time))
180
                ]
181
182
            for wf_task in wf_tasks:
183
                result.append(self._client.tasks.get(wf_task.id))
184
185
                # Lets not blast requests but just space it out for better CPU profile
186
                jitter = random.uniform(0, self._jitter)
187
                eventlet.sleep(jitter)
188
        except mistralclient_base.APIException as mistral_exc:
189
            if 'not found' in mistral_exc.message:
190
                raise exceptions.ReferenceNotFoundError(mistral_exc.message)
191
            raise mistral_exc
192
193
        return [self._format_task_result(task=entry.to_dict()) for entry in result]
194
195
    def _format_task_result(self, task):
196
        """
197
        Format task result to follow the unified workflow result format.
198
        """
199
        result = {
200
            'id': task['id'],
201
            'name': task['name'],
202
            'workflow_execution_id': task.get('workflow_execution_id', None),
203
            'workflow_name': task['workflow_name'],
204
            'created_at': task.get('created_at', None),
205
            'updated_at': task.get('updated_at', None),
206
            'state': task.get('state', None),
207
            'state_info': task.get('state_info', None)
208
        }
209
210
        for attr in ['result', 'input', 'published']:
211
            result[attr] = jsonify.try_loads(task.get(attr, None))
212
213
        return result
214
215
    def _format_query_result(self, current_result, new_wf_result, new_wf_tasks_result):
216
        result = new_wf_result
217
218
        new_wf_task_ids = [entry['id'] for entry in new_wf_tasks_result]
219
220
        old_wf_tasks_result_to_keep = [
221
            entry for entry in current_result.get('tasks', [])
222
            if entry['id'] not in new_wf_task_ids
223
        ]
224
225
        result['tasks'] = old_wf_tasks_result_to_keep + new_wf_tasks_result
226
227
        return result
228
229
    def _determine_execution_status(self, liveaction_db, wf_state, tasks):
230
        # Determine if liveaction is being canceled, paused, or resumed.
231
        is_action_canceled = liveaction_db.status in CANCELED_STATES
232
        is_action_paused = liveaction_db.status in PAUSED_STATES
233
        is_action_resuming = liveaction_db.status in RESUMING_STATES
234
235
        # Identify the list of tasks that are not still running.
236
        active_tasks = [t for t in tasks if t['state'] in ACTIVE_STATES]
237
238
        # Keep the execution in running state if there are active tasks.
239
        # In certain use cases, Mistral sets the workflow state to
240
        # completion prior to task completion.
241
        if is_action_canceled and active_tasks:
242
            status = action_constants.LIVEACTION_STATUS_CANCELING
243
        elif is_action_canceled and not active_tasks and wf_state not in DONE_STATES:
244
            status = action_constants.LIVEACTION_STATUS_CANCELING
245
        elif not is_action_canceled and active_tasks and wf_state == 'CANCELLED':
246
            status = action_constants.LIVEACTION_STATUS_CANCELING
247
        elif is_action_paused and active_tasks:
248
            status = action_constants.LIVEACTION_STATUS_PAUSING
249
        elif is_action_paused and not active_tasks and wf_state not in DONE_STATES:
250
            status = action_constants.LIVEACTION_STATUS_PAUSING
251
        elif not is_action_paused and active_tasks and wf_state == 'PAUSED':
252
            status = action_constants.LIVEACTION_STATUS_PAUSING
253
        elif is_action_resuming and wf_state == 'PAUSED':
254
            status = action_constants.LIVEACTION_STATUS_RESUMING
255
        elif wf_state in DONE_STATES and active_tasks:
256
            status = action_constants.LIVEACTION_STATUS_RUNNING
257
        elif wf_state in DONE_STATES and not active_tasks:
258
            status = DONE_STATES[wf_state]
259
        else:
260
            status = action_constants.LIVEACTION_STATUS_RUNNING
261
262
        return status
263