Completed
Pull Request — master (#2333)
by Arma
08:08
created

_get_workflow_result()   A

Complexity

Conditions 2

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 18
rs 9.4286
cc 2
1
import uuid
2
3
from mistralclient.api import client as mistral
4
from mistralclient.api.v2 import tasks
5
from mistralclient.api.v2 import executions
6
from oslo_config import cfg
7
import retrying
8
9
from st2actions.query.base import Querier
10
from st2common.constants import action as action_constants
11
from st2common import log as logging
12
from st2common.services import action as action_service
13
from st2common.util import jsonify
14
from st2common.util.url import get_url_without_trailing_slash
15
from st2common.util.workflow import mistral as utils
16
17
18
LOG = logging.getLogger(__name__)
19
20
DONE_STATES = {
21
    'ERROR': action_constants.LIVEACTION_STATUS_FAILED,
22
    'SUCCESS': action_constants.LIVEACTION_STATUS_SUCCEEDED
23
}
24
25
26
def get_query_instance():
27
    return MistralResultsQuerier(str(uuid.uuid4()))
28
29
30
class MistralResultsQuerier(Querier):
31
    def __init__(self, id, *args, **kwargs):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
32
        super(MistralResultsQuerier, self).__init__(*args, **kwargs)
33
        self._base_url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
34
        self._client = mistral.client(
35
            mistral_url=self._base_url,
36
            username=cfg.CONF.mistral.keystone_username,
37
            api_key=cfg.CONF.mistral.keystone_password,
38
            project_name=cfg.CONF.mistral.keystone_project_name,
39
            auth_url=cfg.CONF.mistral.keystone_auth_url)
40
41
    @retrying.retry(
42
        retry_on_exception=utils.retry_on_exceptions,
43
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
44
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
45
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
46
    def query(self, execution_id, query_context):
47
        """
48
        Queries mistral for workflow results using v2 APIs.
49
        :param execution_id: st2 execution_id (context to be used for logging/audit)
50
        :type execution_id: ``str``
51
        :param query_context: context for the query to be made to mistral. This contains mistral
52
                              execution id.
53
        :type query_context: ``objext``
54
        :rtype: (``str``, ``object``)
55
        """
56
        mistral_exec_id = query_context.get('mistral', {}).get('execution_id', None)
57
        if not mistral_exec_id:
58
            raise Exception('[%s] Missing mistral workflow execution ID in query context. %s',
59
                            execution_id, query_context)
60
61
        try:
62
            result = self._get_workflow_result(mistral_exec_id)
63
            result['tasks'] = self._get_workflow_tasks(mistral_exec_id)
64
        except Exception:
65
            LOG.exception('[%s] Unable to fetch mistral workflow result and tasks. %s',
66
                          execution_id, query_context)
67
            raise
68
69
        status = self._determine_execution_status(
70
            execution_id, result['extra']['state'], result['tasks'])
71
72
        LOG.debug('[%s] mistral workflow execution status: %s' % (execution_id, status))
73
        LOG.debug('[%s] mistral workflow execution result: %s' % (execution_id, result))
74
75
        return (status, result)
76
77
    def _get_workflow_result(self, exec_id):
78
        """
79
        Returns the workflow status and output. Mistral workflow status will be converted
80
        to st2 action status.
81
        :param exec_id: Mistral execution ID
82
        :type exec_id: ``str``
83
        :rtype: (``str``, ``dict``)
84
        """
85
        execution = executions.ExecutionManager(self._client).get(exec_id)
86
87
        result = jsonify.try_loads(execution.output) if execution.state in DONE_STATES else {}
88
89
        result['extra'] = {
90
            'state': execution.state,
91
            'state_info': execution.state_info
92
        }
93
94
        return result
95
96
    def _get_workflow_tasks(self, exec_id):
97
        """
98
        Returns the list of tasks for a workflow execution.
99
        :param exec_id: Mistral execution ID
100
        :type exec_id: ``str``
101
        :rtype: ``list``
102
        """
103
        wf_tasks = tasks.TaskManager(self._client).list(workflow_execution_id=exec_id)
104
105
        return [self._format_task_result(task=wf_task.to_dict()) for wf_task in wf_tasks]
106
107
    def _format_task_result(self, task):
108
        """
109
        Format task result to follow the unified workflow result format.
110
        """
111
        result = {
112
            'id': task['id'],
113
            'name': task['name'],
114
            'workflow_execution_id': task.get('workflow_execution_id', None),
115
            'workflow_name': task['workflow_name'],
116
            'created_at': task.get('created_at', None),
117
            'updated_at': task.get('updated_at', None),
118
            'state': task.get('state', None),
119
            'state_info': task.get('state_info', None)
120
        }
121
122
        for attr in ['result', 'input', 'published']:
123
            result[attr] = jsonify.try_loads(task.get(attr, None))
124
125
        return result
126
127
    def _determine_execution_status(self, execution_id, wf_state, tasks):
0 ignored issues
show
Comprehensibility Bug introduced by
tasks is re-defining a name which is already available in the outer-scope (previously defined on line 4).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
128
        # Get the liveaction object to compare state.
129
        is_action_canceled = action_service.is_action_canceled_or_canceling(execution_id)
130
131
        # Identify the list of tasks that are not in completed states.
132
        active_tasks = [t for t in tasks if t['state'] not in DONE_STATES]
133
134
        # On cancellation, mistral workflow executions are paused so that tasks can
135
        # gracefully reach completion. If any task is not completed, do not mark st2
136
        # action execution for the workflow complete. By marking the st2 action execution
137
        # as running, this will keep the query for this mistral workflow execution active.
138
        if wf_state not in DONE_STATES and not active_tasks and is_action_canceled:
139
            status = action_constants.LIVEACTION_STATUS_CANCELED
140
        elif wf_state in DONE_STATES and active_tasks:
141
            status = action_constants.LIVEACTION_STATUS_RUNNING
142
        elif wf_state not in DONE_STATES:
143
            status = action_constants.LIVEACTION_STATUS_RUNNING
144
        else:
145
            status = DONE_STATES[wf_state]
146
147
        return status
148
149
150
def get_instance():
151
    return MistralResultsQuerier(str(uuid.uuid4()))
152