Passed
Pull Request — master (#3182)
by W
04:52
created

MistralResultsQuerier._get_workflow_tasks()   B

Complexity

Conditions 5

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
c 1
b 0
f 0
dl 0
loc 18
rs 8.5454
1
import uuid
2
3
from mistralclient.api import base as mistralclient_base
4
from mistralclient.api import client as mistral
5
from oslo_config import cfg
6
import retrying
7
8
from st2common.query.base import Querier
9
from st2common.constants import action as action_constants
10
from st2common.exceptions import resultstracker as exceptions
11
from st2common import log as logging
12
from st2common.services import action as action_service
13
from st2common.util import jsonify
14
from st2common.util.url import get_url_without_trailing_slash
15
from st2common.util.workflow import mistral as utils
16
17
18
LOG = logging.getLogger(__name__)
19
20
DONE_STATES = {
21
    'ERROR': action_constants.LIVEACTION_STATUS_FAILED,
22
    'SUCCESS': action_constants.LIVEACTION_STATUS_SUCCEEDED,
23
    'CANCELLED': action_constants.LIVEACTION_STATUS_CANCELED
24
}
25
26
ACTIVE_STATES = {
27
    'RUNNING': action_constants.LIVEACTION_STATUS_RUNNING
28
}
29
30
31
def get_instance():
32
    return MistralResultsQuerier(str(uuid.uuid4()))
33
34
35
class MistralResultsQuerier(Querier):
36
    delete_state_object_on_error = False
37
38
    def __init__(self, id, *args, **kwargs):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
39
        super(MistralResultsQuerier, self).__init__(*args, **kwargs)
40
        self._base_url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
41
        self._client = mistral.client(
42
            mistral_url=self._base_url,
43
            username=cfg.CONF.mistral.keystone_username,
44
            api_key=cfg.CONF.mistral.keystone_password,
45
            project_name=cfg.CONF.mistral.keystone_project_name,
46
            auth_url=cfg.CONF.mistral.keystone_auth_url,
47
            cacert=cfg.CONF.mistral.cacert,
48
            insecure=cfg.CONF.mistral.insecure)
49
50
    @retrying.retry(
51
        retry_on_exception=utils.retry_on_exceptions,
52
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
53
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
54
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
55
    def query(self, execution_id, query_context):
56
        """
57
        Queries mistral for workflow results using v2 APIs.
58
        :param execution_id: st2 execution_id (context to be used for logging/audit)
59
        :type execution_id: ``str``
60
        :param query_context: context for the query to be made to mistral. This contains mistral
61
                              execution id.
62
        :type query_context: ``objext``
63
        :rtype: (``str``, ``object``)
64
        """
65
        mistral_exec_id = query_context.get('mistral', {}).get('execution_id', None)
66
        if not mistral_exec_id:
67
            raise Exception('[%s] Missing mistral workflow execution ID in query context. %s',
68
                            execution_id, query_context)
69
70
        try:
71
            result = self._get_workflow_result(mistral_exec_id)
72
            result['tasks'] = self._get_workflow_tasks(mistral_exec_id)
73
        except exceptions.ReferenceNotFoundError as exc:
74
            LOG.exception('[%s] Unable to find reference.', execution_id)
75
            return (action_constants.LIVEACTION_STATUS_FAILED, exc.message)
76
        except Exception:
77
            LOG.exception('[%s] Unable to fetch mistral workflow result and tasks. %s',
78
                          execution_id, query_context)
79
            raise
80
81
        status = self._determine_execution_status(
82
            execution_id, result['extra']['state'], result['tasks'])
83
84
        LOG.debug('[%s] mistral workflow execution status: %s' % (execution_id, status))
85
        LOG.debug('[%s] mistral workflow execution result: %s' % (execution_id, result))
86
87
        return (status, result)
88
89
    def _get_workflow_result(self, exec_id):
90
        """
91
        Returns the workflow status and output. Mistral workflow status will be converted
92
        to st2 action status.
93
        :param exec_id: Mistral execution ID
94
        :type exec_id: ``str``
95
        :rtype: (``str``, ``dict``)
96
        """
97
        try:
98
            execution = self._client.executions.get(exec_id)
99
        except mistralclient_base.APIException as mistral_exc:
100
            if 'not found' in mistral_exc.message:
101
                raise exceptions.ReferenceNotFoundError(mistral_exc.message)
102
            raise mistral_exc
103
104
        result = jsonify.try_loads(execution.output) if execution.state in DONE_STATES else {}
105
106
        result['extra'] = {
107
            'state': execution.state,
108
            'state_info': execution.state_info
109
        }
110
111
        return result
112
113
    def _get_workflow_tasks(self, exec_id):
114
        """
115
        Returns the list of tasks for a workflow execution.
116
        :param exec_id: Mistral execution ID
117
        :type exec_id: ``str``
118
        :rtype: ``list``
119
        """
120
        wf_tasks = []
121
122
        try:
123
            for task in self._client.tasks.list(workflow_execution_id=exec_id):
124
                wf_tasks.append(self._client.tasks.get(task.id))
125
        except mistralclient_base.APIException as mistral_exc:
126
            if 'not found' in mistral_exc.message:
127
                raise exceptions.ReferenceNotFoundError(mistral_exc.message)
128
            raise mistral_exc
129
130
        return [self._format_task_result(task=wf_task.to_dict()) for wf_task in wf_tasks]
131
132
    def _format_task_result(self, task):
133
        """
134
        Format task result to follow the unified workflow result format.
135
        """
136
        result = {
137
            'id': task['id'],
138
            'name': task['name'],
139
            'workflow_execution_id': task.get('workflow_execution_id', None),
140
            'workflow_name': task['workflow_name'],
141
            'created_at': task.get('created_at', None),
142
            'updated_at': task.get('updated_at', None),
143
            'state': task.get('state', None),
144
            'state_info': task.get('state_info', None)
145
        }
146
147
        for attr in ['result', 'input', 'published']:
148
            result[attr] = jsonify.try_loads(task.get(attr, None))
149
150
        return result
151
152
    def _determine_execution_status(self, execution_id, wf_state, tasks):
153
        # Get the liveaction object to compare state.
154
        is_action_canceled = action_service.is_action_canceled_or_canceling(execution_id)
155
156
        # Identify the list of tasks that are not still running.
157
        active_tasks = [t for t in tasks if t['state'] in ACTIVE_STATES]
158
159
        # Keep the execution in running state if there are active tasks.
160
        # In certain use cases, Mistral sets the workflow state to
161
        # completion prior to task completion.
162
        if is_action_canceled and active_tasks:
163
            status = action_constants.LIVEACTION_STATUS_CANCELING
164
        elif is_action_canceled and not active_tasks and wf_state not in DONE_STATES:
165
            status = action_constants.LIVEACTION_STATUS_CANCELING
166
        elif not is_action_canceled and active_tasks and wf_state == 'CANCELLED':
167
            status = action_constants.LIVEACTION_STATUS_CANCELING
168
        elif wf_state in DONE_STATES and active_tasks:
169
            status = action_constants.LIVEACTION_STATUS_RUNNING
170
        elif wf_state in DONE_STATES and not active_tasks:
171
            status = DONE_STATES[wf_state]
172
        else:
173
            status = action_constants.LIVEACTION_STATUS_RUNNING
174
175
        return status
176