Completed
Pull Request — master (#2989)
by W
06:37
created

get_query_instance()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
import uuid
2
3
from mistralclient.api import client as mistral
4
from mistralclient.api.v2 import tasks
5
from mistralclient.api.v2 import executions
6
from oslo_config import cfg
7
import retrying
8
9
from st2common.query.base import Querier
10
from st2common.constants import action as action_constants
11
from st2common import log as logging
12
from st2common.services import action as action_service
13
from st2common.util import jsonify
14
from st2common.util.url import get_url_without_trailing_slash
15
from st2common.util.workflow import mistral as utils
16
17
18
LOG = logging.getLogger(__name__)
19
20
DONE_STATES = {
21
    'ERROR': action_constants.LIVEACTION_STATUS_FAILED,
22
    'SUCCESS': action_constants.LIVEACTION_STATUS_SUCCEEDED
23
}
24
25
26
def get_instance():
27
    return MistralResultsQuerier(str(uuid.uuid4()))
28
29
30
class MistralResultsQuerier(Querier):
31
    def __init__(self, id, *args, **kwargs):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
32
        super(MistralResultsQuerier, self).__init__(*args, **kwargs)
33
        self._base_url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
34
        self._client = mistral.client(
35
            mistral_url=self._base_url,
36
            username=cfg.CONF.mistral.keystone_username,
37
            api_key=cfg.CONF.mistral.keystone_password,
38
            project_name=cfg.CONF.mistral.keystone_project_name,
39
            auth_url=cfg.CONF.mistral.keystone_auth_url,
40
            cacert=cfg.CONF.mistral.cacert,
41
            insecure=cfg.CONF.mistral.insecure)
42
43
    @retrying.retry(
44
        retry_on_exception=utils.retry_on_exceptions,
45
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
46
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
47
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
48
    def query(self, execution_id, query_context):
49
        """
50
        Queries mistral for workflow results using v2 APIs.
51
        :param execution_id: st2 execution_id (context to be used for logging/audit)
52
        :type execution_id: ``str``
53
        :param query_context: context for the query to be made to mistral. This contains mistral
54
                              execution id.
55
        :type query_context: ``objext``
56
        :rtype: (``str``, ``object``)
57
        """
58
        mistral_exec_id = query_context.get('mistral', {}).get('execution_id', None)
59
        if not mistral_exec_id:
60
            raise Exception('[%s] Missing mistral workflow execution ID in query context. %s',
61
                            execution_id, query_context)
62
63
        try:
64
            result = self._get_workflow_result(mistral_exec_id)
65
            result['tasks'] = self._get_workflow_tasks(mistral_exec_id)
66
        except Exception:
67
            LOG.exception('[%s] Unable to fetch mistral workflow result and tasks. %s',
68
                          execution_id, query_context)
69
            raise
70
71
        status = self._determine_execution_status(
72
            execution_id, result['extra']['state'], result['tasks'])
73
74
        LOG.debug('[%s] mistral workflow execution status: %s' % (execution_id, status))
75
        LOG.debug('[%s] mistral workflow execution result: %s' % (execution_id, result))
76
77
        return (status, result)
78
79
    def _get_workflow_result(self, exec_id):
80
        """
81
        Returns the workflow status and output. Mistral workflow status will be converted
82
        to st2 action status.
83
        :param exec_id: Mistral execution ID
84
        :type exec_id: ``str``
85
        :rtype: (``str``, ``dict``)
86
        """
87
        execution = executions.ExecutionManager(self._client).get(exec_id)
88
89
        result = jsonify.try_loads(execution.output) if execution.state in DONE_STATES else {}
90
91
        result['extra'] = {
92
            'state': execution.state,
93
            'state_info': execution.state_info
94
        }
95
96
        return result
97
98
    def _get_workflow_tasks(self, exec_id):
99
        """
100
        Returns the list of tasks for a workflow execution.
101
        :param exec_id: Mistral execution ID
102
        :type exec_id: ``str``
103
        :rtype: ``list``
104
        """
105
        wf_tasks = tasks.TaskManager(self._client).list(workflow_execution_id=exec_id)
106
107
        return [self._format_task_result(task=wf_task.to_dict()) for wf_task in wf_tasks]
108
109
    def _format_task_result(self, task):
110
        """
111
        Format task result to follow the unified workflow result format.
112
        """
113
        result = {
114
            'id': task['id'],
115
            'name': task['name'],
116
            'workflow_execution_id': task.get('workflow_execution_id', None),
117
            'workflow_name': task['workflow_name'],
118
            'created_at': task.get('created_at', None),
119
            'updated_at': task.get('updated_at', None),
120
            'state': task.get('state', None),
121
            'state_info': task.get('state_info', None)
122
        }
123
124
        for attr in ['result', 'input', 'published']:
125
            result[attr] = jsonify.try_loads(task.get(attr, None))
126
127
        return result
128
129
    def _determine_execution_status(self, execution_id, wf_state, tasks):
0 ignored issues
show
Comprehensibility Bug introduced by
tasks is re-defining a name which is already available in the outer-scope (previously defined on line 4).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
130
        # Get the liveaction object to compare state.
131
        is_action_canceled = action_service.is_action_canceled_or_canceling(execution_id)
132
133
        # Identify the list of tasks that are not in completed states.
134
        active_tasks = [t for t in tasks if t['state'] not in DONE_STATES]
135
136
        # On cancellation, mistral workflow executions are paused so that tasks
137
        # can gracefully reach completion. This is only temporary until a canceled
138
        # status is added to mistral.
139
        if (wf_state in DONE_STATES or wf_state == 'PAUSED') and is_action_canceled:
140
            status = action_constants.LIVEACTION_STATUS_CANCELED
141
        elif wf_state in DONE_STATES and not is_action_canceled and active_tasks:
142
            status = action_constants.LIVEACTION_STATUS_RUNNING
143
        elif wf_state not in DONE_STATES:
144
            status = action_constants.LIVEACTION_STATUS_RUNNING
145
        else:
146
            status = DONE_STATES[wf_state]
147
148
        return status
149