Passed
Pull Request — master (#3640)
by Lakshmi
06:19
created

MistralCallbackHandler._encode()   C

Complexity

Conditions 8

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
dl 0
loc 20
rs 6.6666
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import ast
17
import copy
18
import json
19
import re
20
import retrying
21
import six
22
23
from oslo_config import cfg
24
from mistralclient.api import client as mistral
25
26
from st2common.constants import action as action_constants
27
from st2common import log as logging
28
from st2common.callback import base as callback
29
from st2common.util.workflow import mistral as utils
30
31
32
LOG = logging.getLogger(__name__)
33
34
35
STATUS_MAP = {
36
    action_constants.LIVEACTION_STATUS_REQUESTED: 'RUNNING',
37
    action_constants.LIVEACTION_STATUS_SCHEDULED: 'RUNNING',
38
    action_constants.LIVEACTION_STATUS_DELAYED: 'RUNNING',
39
    action_constants.LIVEACTION_STATUS_RUNNING: 'RUNNING',
40
    action_constants.LIVEACTION_STATUS_SUCCEEDED: 'SUCCESS',
41
    action_constants.LIVEACTION_STATUS_FAILED: 'ERROR',
42
    action_constants.LIVEACTION_STATUS_TIMED_OUT: 'ERROR',
43
    action_constants.LIVEACTION_STATUS_ABANDONED: 'ERROR',
44
    action_constants.LIVEACTION_STATUS_CANCELING: 'CANCELLED',
45
    action_constants.LIVEACTION_STATUS_CANCELED: 'CANCELLED',
46
    action_constants.LIVEACTION_STATUS_PAUSING: 'PAUSED',
47
    action_constants.LIVEACTION_STATUS_PAUSED: 'PAUSED',
48
    action_constants.LIVEACTION_STATUS_RESUMING: 'RUNNING'
49
}
50
51
MISTRAL_ACCEPTED_STATES = copy.deepcopy(action_constants.LIVEACTION_COMPLETED_STATES)
52
MISTRAL_ACCEPTED_STATES += [action_constants.LIVEACTION_STATUS_PAUSED]
53
54
55
def get_instance():
56
    return MistralCallbackHandler
57
58
59
def get_action_execution_id_from_url(url):
60
    match = re.search('(.+)/action_executions/(.+)', url)
61
    if not match or len(match.groups()) != 2:
62
        raise ValueError('Unable to extract the action execution ID '
63
                         'from the callback URL (%s).' % (url))
64
65
    return match.group(2)
66
67
68
class MistralCallbackHandler(callback.AsyncActionExecutionCallbackHandler):
69
70
    @classmethod
71
    @retrying.retry(
72
        retry_on_exception=utils.retry_on_exceptions,
73
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
74
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
75
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
76
    def _update_action_execution(cls, url, data):
77
        action_execution_id = get_action_execution_id_from_url(url)
78
79
        LOG.info('Sending callback to %s with data %s.', url, data)
80
81
        client = mistral.client(
82
            mistral_url=cfg.CONF.mistral.v2_base_url,
83
            username=cfg.CONF.mistral.keystone_username,
84
            api_key=cfg.CONF.mistral.keystone_password,
85
            project_name=cfg.CONF.mistral.keystone_project_name,
86
            auth_url=cfg.CONF.mistral.keystone_auth_url,
87
            cacert=cfg.CONF.mistral.cacert,
88
            insecure=cfg.CONF.mistral.insecure)
89
90
        client.action_executions.update(action_execution_id, **data)
91
92
    @classmethod
93
    def _encode(cls, value):
94
        if isinstance(value, dict):
95
            return {k: cls._encode(v) for k, v in six.iteritems(value)}
96
        elif isinstance(value, list):
97
            return [cls._encode(item) for item in value]
98
        elif isinstance(value, six.string_types):
99
            try:
100
                value = value.decode('utf-8')
101
            except Exception:
102
                LOG.exception('Unable to decode value to utf-8.')
103
104
            try:
105
                value = value.encode('unicode_escape')
106
            except Exception:
107
                LOG.exception('Unable to unicode escape value.')
108
109
            return value
110
        else:
111
            return value
112
113
    @classmethod
114
    def callback(cls, url, context, status, result):
115
        if status not in MISTRAL_ACCEPTED_STATES:
116
            LOG.warning('Unable to callback %s because status "%s" is not supported.', url, status)
117
            return
118
119
        try:
120
            if isinstance(result, basestring) and len(result) > 0 and result[0] in ['{', '[']:
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'basestring'
Loading history...
121
                value = ast.literal_eval(result)
122
                if type(value) in [dict, list]:
123
                    result = value
124
125
            result = cls._encode(result)
126
            output = json.dumps(result) if type(result) in [dict, list] else str(result)
127
            output = output.replace('\\\\\\\\u', '\\\\u')
128
            data = {'state': STATUS_MAP[status], 'output': output}
129
130
            cls._update_action_execution(url, data)
131
        except Exception as e:
132
            LOG.exception(e)
133