GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — plexxi-v2.2.1 ( 00dc5d...9862bf )
by
unknown
04:14
created

MistralResultsQuerier.query()   B

Complexity

Conditions 5

Size

Total Lines 60

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
c 0
b 0
f 0
dl 0
loc 60
rs 8.3554

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
import random
2
import time
3
import uuid
4
5
from mistralclient.api import base as mistralclient_base
6
from mistralclient.api import client as mistral
7
from oslo_config import cfg
8
import eventlet
9
import retrying
10
11
from st2common.query.base import Querier
12
from st2common.constants import action as action_constants
13
from st2common.exceptions import resultstracker as exceptions
14
from st2common import log as logging
15
from st2common.util import action_db as action_utils
16
from st2common.util import jsonify
17
from st2common.util.url import get_url_without_trailing_slash
18
from st2common.util.workflow import mistral as utils
19
20
21
LOG = logging.getLogger(__name__)
22
23
DONE_STATES = {
24
    'ERROR': action_constants.LIVEACTION_STATUS_FAILED,
25
    'SUCCESS': action_constants.LIVEACTION_STATUS_SUCCEEDED,
26
    'CANCELLED': action_constants.LIVEACTION_STATUS_CANCELED
27
}
28
29
ACTIVE_STATES = {
30
    'RUNNING': action_constants.LIVEACTION_STATUS_RUNNING
31
}
32
33
CANCELED_STATES = [
34
    action_constants.LIVEACTION_STATUS_CANCELED,
35
    action_constants.LIVEACTION_STATUS_CANCELING
36
]
37
38
39
def get_instance():
40
    return MistralResultsQuerier(str(uuid.uuid4()))
41
42
43
class MistralResultsQuerier(Querier):
44
    delete_state_object_on_error = False
45
46
    def __init__(self, id, *args, **kwargs):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
47
        super(MistralResultsQuerier, self).__init__(*args, **kwargs)
48
        self._base_url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
49
        self._client = mistral.client(
50
            mistral_url=self._base_url,
51
            username=cfg.CONF.mistral.keystone_username,
52
            api_key=cfg.CONF.mistral.keystone_password,
53
            project_name=cfg.CONF.mistral.keystone_project_name,
54
            auth_url=cfg.CONF.mistral.keystone_auth_url,
55
            cacert=cfg.CONF.mistral.cacert,
56
            insecure=cfg.CONF.mistral.insecure)
57
        self._jitter = cfg.CONF.mistral.jitter_interval
58
59
    @retrying.retry(
60
        retry_on_exception=utils.retry_on_exceptions,
61
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
62
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
63
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
64
    def query(self, execution_id, query_context, last_query_time=None):
65
        """
66
        Queries mistral for workflow results using v2 APIs.
67
        :param execution_id: st2 execution_id (context to be used for logging/audit)
68
        :type execution_id: ``str``
69
        :param query_context: context for the query to be made to mistral. This contains mistral
70
                              execution id.
71
        :type query_context: ``object``
72
        :param last_query_time: Timestamp of last query.
73
        :type last_query_time: ``float``
74
        :rtype: (``str``, ``object``)
75
        """
76
        dt_last_query_time = (
77
            time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(last_query_time))
78
            if last_query_time else None
79
        )
80
81
        liveaction_db = action_utils.get_liveaction_by_id(execution_id)
82
83
        mistral_exec_id = query_context.get('mistral', {}).get('execution_id', None)
84
        if not mistral_exec_id:
85
            raise Exception('[%s] Missing mistral workflow execution ID in query context. %s',
86
                            execution_id, query_context)
87
88
        try:
89
            wf_result = self._get_workflow_result(mistral_exec_id)
90
91
            wf_tasks_result = self._get_workflow_tasks(
92
                mistral_exec_id,
93
                last_query_time=dt_last_query_time
94
            )
95
96
            result = self._format_query_result(
97
                liveaction_db.result,
98
                wf_result,
99
                wf_tasks_result
100
            )
101
        except exceptions.ReferenceNotFoundError as exc:
102
            LOG.exception('[%s] Unable to find reference.', execution_id)
103
            return (action_constants.LIVEACTION_STATUS_FAILED, exc.message)
104
        except Exception:
105
            LOG.exception('[%s] Unable to fetch mistral workflow result and tasks. %s',
106
                          execution_id, query_context)
107
            raise
108
109
        status = self._determine_execution_status(
110
            liveaction_db,
111
            result['extra']['state'],
112
            result['tasks']
113
        )
114
115
        LOG.debug('[%s] mistral workflow execution status: %s' % (execution_id, status))
116
        LOG.debug('[%s] mistral workflow execution result: %s' % (execution_id, result))
117
118
        return (status, result)
119
120
    def _get_workflow_result(self, exec_id):
121
        """
122
        Returns the workflow status and output. Mistral workflow status will be converted
123
        to st2 action status.
124
        :param exec_id: Mistral execution ID
125
        :type exec_id: ``str``
126
        :rtype: (``str``, ``dict``)
127
        """
128
        try:
129
            jitter = random.uniform(0, self._jitter)
130
            eventlet.sleep(jitter)
131
            execution = self._client.executions.get(exec_id)
132
        except mistralclient_base.APIException as mistral_exc:
133
            if 'not found' in mistral_exc.message:
134
                raise exceptions.ReferenceNotFoundError(mistral_exc.message)
135
            raise mistral_exc
136
137
        result = jsonify.try_loads(execution.output) if execution.state in DONE_STATES else {}
138
139
        result['extra'] = {
140
            'state': execution.state,
141
            'state_info': execution.state_info
142
        }
143
144
        return result
145
146
    def _get_workflow_tasks(self, exec_id, last_query_time=None):
147
        """
148
        Returns the list of tasks for a workflow execution.
149
        :param exec_id: Mistral execution ID
150
        :type exec_id: ``str``
151
        :param last_query_time: Timestamp to filter tasks
152
        :type last_query_time: ``str``
153
        :rtype: ``list``
154
        """
155
        result = []
156
157
        try:
158
            query_filters = {}
159
160
            if last_query_time:
161
                query_filters['updated_at'] = 'gte:%s' % last_query_time
162
163
            wf_tasks = self._client.tasks.list(workflow_execution_id=exec_id, **query_filters)
164
165
            for wf_task in wf_tasks:
166
                result.append(self._client.tasks.get(wf_task.id))
167
168
                # Lets not blast requests but just space it out for better CPU profile
169
                jitter = random.uniform(0, self._jitter)
170
                eventlet.sleep(jitter)
171
        except mistralclient_base.APIException as mistral_exc:
172
            if 'not found' in mistral_exc.message:
173
                raise exceptions.ReferenceNotFoundError(mistral_exc.message)
174
            raise mistral_exc
175
176
        return [self._format_task_result(task=entry.to_dict()) for entry in result]
177
178
    def _format_task_result(self, task):
179
        """
180
        Format task result to follow the unified workflow result format.
181
        """
182
        result = {
183
            'id': task['id'],
184
            'name': task['name'],
185
            'workflow_execution_id': task.get('workflow_execution_id', None),
186
            'workflow_name': task['workflow_name'],
187
            'created_at': task.get('created_at', None),
188
            'updated_at': task.get('updated_at', None),
189
            'state': task.get('state', None),
190
            'state_info': task.get('state_info', None)
191
        }
192
193
        for attr in ['result', 'input', 'published']:
194
            result[attr] = jsonify.try_loads(task.get(attr, None))
195
196
        return result
197
198
    def _format_query_result(self, current_result, new_wf_result, new_wf_tasks_result):
199
        result = new_wf_result
200
201
        new_wf_task_ids = [entry['id'] for entry in new_wf_tasks_result]
202
203
        old_wf_tasks_result_to_keep = [
204
            entry for entry in current_result.get('tasks', [])
205
            if entry['id'] not in new_wf_task_ids
206
        ]
207
208
        result['tasks'] = old_wf_tasks_result_to_keep + new_wf_tasks_result
209
210
        return result
211
212
    def _determine_execution_status(self, liveaction_db, wf_state, tasks):
213
        # Determine if liveaction is canceled or being canceled.
214
        is_action_canceled = liveaction_db.status in CANCELED_STATES
215
216
        # Identify the list of tasks that are not still running.
217
        active_tasks = [t for t in tasks if t['state'] in ACTIVE_STATES]
218
219
        # Keep the execution in running state if there are active tasks.
220
        # In certain use cases, Mistral sets the workflow state to
221
        # completion prior to task completion.
222
        if is_action_canceled and active_tasks:
223
            status = action_constants.LIVEACTION_STATUS_CANCELING
224
        elif is_action_canceled and not active_tasks and wf_state not in DONE_STATES:
225
            status = action_constants.LIVEACTION_STATUS_CANCELING
226
        elif not is_action_canceled and active_tasks and wf_state == 'CANCELLED':
227
            status = action_constants.LIVEACTION_STATUS_CANCELING
228
        elif wf_state in DONE_STATES and active_tasks:
229
            status = action_constants.LIVEACTION_STATUS_RUNNING
230
        elif wf_state in DONE_STATES and not active_tasks:
231
            status = DONE_STATES[wf_state]
232
        else:
233
            status = action_constants.LIVEACTION_STATUS_RUNNING
234
235
        return status
236