Passed
Push — master ( 0b1d4e...594593 )
by W
04:59
created

_determine_execution_status()   F

Complexity

Conditions 15

Size

Total Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 15
c 0
b 0
f 0
dl 0
loc 24
rs 2.8197

How to fix   Complexity   

Complexity

Complex classes like MistralResultsQuerier._determine_execution_status() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import uuid
2
3
from mistralclient.api import client as mistral
4
from oslo_config import cfg
5
import retrying
6
7
from st2common.query.base import Querier
8
from st2common.constants import action as action_constants
9
from st2common import log as logging
10
from st2common.services import action as action_service
11
from st2common.util import jsonify
12
from st2common.util.url import get_url_without_trailing_slash
13
from st2common.util.workflow import mistral as utils
14
15
16
LOG = logging.getLogger(__name__)
17
18
DONE_STATES = {
19
    'ERROR': action_constants.LIVEACTION_STATUS_FAILED,
20
    'SUCCESS': action_constants.LIVEACTION_STATUS_SUCCEEDED,
21
    'CANCELLED': action_constants.LIVEACTION_STATUS_CANCELED
22
}
23
24
ACTIVE_STATES = {
25
    'RUNNING': action_constants.LIVEACTION_STATUS_RUNNING
26
}
27
28
29
def get_instance():
30
    return MistralResultsQuerier(str(uuid.uuid4()))
31
32
33
class MistralResultsQuerier(Querier):
34
    def __init__(self, id, *args, **kwargs):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
35
        super(MistralResultsQuerier, self).__init__(*args, **kwargs)
36
        self._base_url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
37
        self._client = mistral.client(
38
            mistral_url=self._base_url,
39
            username=cfg.CONF.mistral.keystone_username,
40
            api_key=cfg.CONF.mistral.keystone_password,
41
            project_name=cfg.CONF.mistral.keystone_project_name,
42
            auth_url=cfg.CONF.mistral.keystone_auth_url,
43
            cacert=cfg.CONF.mistral.cacert,
44
            insecure=cfg.CONF.mistral.insecure)
45
46
    @retrying.retry(
47
        retry_on_exception=utils.retry_on_exceptions,
48
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
49
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
50
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
51
    def query(self, execution_id, query_context):
52
        """
53
        Queries mistral for workflow results using v2 APIs.
54
        :param execution_id: st2 execution_id (context to be used for logging/audit)
55
        :type execution_id: ``str``
56
        :param query_context: context for the query to be made to mistral. This contains mistral
57
                              execution id.
58
        :type query_context: ``objext``
59
        :rtype: (``str``, ``object``)
60
        """
61
        mistral_exec_id = query_context.get('mistral', {}).get('execution_id', None)
62
        if not mistral_exec_id:
63
            raise Exception('[%s] Missing mistral workflow execution ID in query context. %s',
64
                            execution_id, query_context)
65
66
        try:
67
            result = self._get_workflow_result(mistral_exec_id)
68
            result['tasks'] = self._get_workflow_tasks(mistral_exec_id)
69
        except Exception:
70
            LOG.exception('[%s] Unable to fetch mistral workflow result and tasks. %s',
71
                          execution_id, query_context)
72
            raise
73
74
        status = self._determine_execution_status(
75
            execution_id, result['extra']['state'], result['tasks'])
76
77
        LOG.debug('[%s] mistral workflow execution status: %s' % (execution_id, status))
78
        LOG.debug('[%s] mistral workflow execution result: %s' % (execution_id, result))
79
80
        return (status, result)
81
82
    def _get_workflow_result(self, exec_id):
83
        """
84
        Returns the workflow status and output. Mistral workflow status will be converted
85
        to st2 action status.
86
        :param exec_id: Mistral execution ID
87
        :type exec_id: ``str``
88
        :rtype: (``str``, ``dict``)
89
        """
90
        execution = self._client.executions.get(exec_id)
91
92
        result = jsonify.try_loads(execution.output) if execution.state in DONE_STATES else {}
93
94
        result['extra'] = {
95
            'state': execution.state,
96
            'state_info': execution.state_info
97
        }
98
99
        return result
100
101
    def _get_workflow_tasks(self, exec_id):
102
        """
103
        Returns the list of tasks for a workflow execution.
104
        :param exec_id: Mistral execution ID
105
        :type exec_id: ``str``
106
        :rtype: ``list``
107
        """
108
        wf_tasks = [
109
            self._client.tasks.get(task.id)
110
            for task in self._client.tasks.list(workflow_execution_id=exec_id)
111
        ]
112
113
        return [self._format_task_result(task=wf_task.to_dict()) for wf_task in wf_tasks]
114
115
    def _format_task_result(self, task):
116
        """
117
        Format task result to follow the unified workflow result format.
118
        """
119
        result = {
120
            'id': task['id'],
121
            'name': task['name'],
122
            'workflow_execution_id': task.get('workflow_execution_id', None),
123
            'workflow_name': task['workflow_name'],
124
            'created_at': task.get('created_at', None),
125
            'updated_at': task.get('updated_at', None),
126
            'state': task.get('state', None),
127
            'state_info': task.get('state_info', None)
128
        }
129
130
        for attr in ['result', 'input', 'published']:
131
            result[attr] = jsonify.try_loads(task.get(attr, None))
132
133
        return result
134
135
    def _determine_execution_status(self, execution_id, wf_state, tasks):
136
        # Get the liveaction object to compare state.
137
        is_action_canceled = action_service.is_action_canceled_or_canceling(execution_id)
138
139
        # Identify the list of tasks that are not still running.
140
        active_tasks = [t for t in tasks if t['state'] in ACTIVE_STATES]
141
142
        # Keep the execution in running state if there are active tasks.
143
        # In certain use cases, Mistral sets the workflow state to
144
        # completion prior to task completion.
145
        if is_action_canceled and active_tasks:
146
            status = action_constants.LIVEACTION_STATUS_CANCELING
147
        elif is_action_canceled and not active_tasks and wf_state not in DONE_STATES:
148
            status = action_constants.LIVEACTION_STATUS_CANCELING
149
        elif not is_action_canceled and active_tasks and wf_state == 'CANCELLED':
150
            status = action_constants.LIVEACTION_STATUS_CANCELING
151
        elif wf_state in DONE_STATES and active_tasks:
152
            status = action_constants.LIVEACTION_STATUS_RUNNING
153
        elif wf_state in DONE_STATES and not active_tasks:
154
            status = DONE_STATES[wf_state]
155
        else:
156
            status = action_constants.LIVEACTION_STATUS_RUNNING
157
158
        return status
159