Passed
Pull Request — master (#3163)
by W
05:12
created

MistralCallbackHandler   A

Complexity

Total Complexity 9

Size/Duplication

Total Lines 40
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 40
rs 10
c 1
b 0
f 0
wmc 9

2 Methods

Rating   Name   Duplication   Size   Complexity  
A _update_action_execution() 0 14 1
D callback() 0 23 8
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import ast
17
import json
18
import re
19
import retrying
20
21
from oslo_config import cfg
22
23
from st2common.constants import action as action_constants
24
from st2common import log as logging
25
from st2common.callback import base as callback
26
from st2common.persistence.execution import ActionExecution
27
from st2common.util import url as url_utils
28
from st2common.util.workflow import mistral as utils
29
30
31
LOG = logging.getLogger(__name__)
32
33
34
STATUS_MAP = {
35
    action_constants.LIVEACTION_STATUS_REQUESTED: 'RUNNING',
36
    action_constants.LIVEACTION_STATUS_SCHEDULED: 'RUNNING',
37
    action_constants.LIVEACTION_STATUS_DELAYED: 'RUNNING',
38
    action_constants.LIVEACTION_STATUS_RUNNING: 'RUNNING',
39
    action_constants.LIVEACTION_STATUS_SUCCEEDED: 'SUCCESS',
40
    action_constants.LIVEACTION_STATUS_FAILED: 'ERROR',
41
    action_constants.LIVEACTION_STATUS_TIMED_OUT: 'ERROR',
42
    action_constants.LIVEACTION_STATUS_ABANDONED: 'ERROR',
43
    action_constants.LIVEACTION_STATUS_CANCELING: 'RUNNING',
44
    action_constants.LIVEACTION_STATUS_CANCELED: 'CANCELLED'
45
}
46
47
48
def get_instance():
49
    return MistralCallbackHandler
50
51
52
def get_action_execution_id_from_url(url):
53
    match = re.search('(.+)/action_executions/(.+)', url)
54
    if not match or len(match.groups()) != 2:
55
        raise ValueError('Unable to extract the action execution ID '
56
                         'from the callback URL (%s).' % (url))
57
58
    return match.group(2)
59
60
61
class MistralCallbackHandler(callback.AsyncActionExecutionCallbackHandler):
62
63
    @classmethod
64
    @retrying.retry(
65
        retry_on_exception=utils.retry_on_exceptions,
66
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
67
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
68
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
69
    def _update_action_execution(cls, url, data, auth_token):
70
        action_execution_id = get_action_execution_id_from_url(url)
71
72
        LOG.info('Sending callback to %s with data %s.', url, data)
73
74
        base_url = url_utils.get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
75
        client = utils.get_client(base_url, auth_token=auth_token)
76
        client.action_executions.update(action_execution_id, **data)
77
78
    @classmethod
79
    def callback(cls, url, context, status, result):
80
        if status not in action_constants.LIVEACTION_COMPLETED_STATES:
81
            return
82
83
        parent_ex_id = context['parent']['execution_id']
84
        parent_ex = ActionExecution.get_by_id(parent_ex_id)
85
        parent_ex_ctx = parent_ex.context
86
        mistral_ctx = parent_ex_ctx.get('mistral', {})
87
        auth_token = mistral_ctx.get('auth_token', None)
88
89
        try:
90
            if isinstance(result, basestring) and len(result) > 0 and result[0] in ['{', '[']:
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'basestring'
Loading history...
91
                value = ast.literal_eval(result)
92
                if type(value) in [dict, list]:
93
                    result = value
94
95
            output = json.dumps(result) if type(result) in [dict, list] else str(result)
96
            data = {'state': STATUS_MAP[status], 'output': output}
97
98
            cls._update_action_execution(url, data, auth_token)
99
        except Exception as e:
100
            LOG.exception(e)
101