Test Failed
Pull Request — master (#4068)
by W
04:27
created

MistralCallbackHandler   A

Complexity

Total Complexity 20

Size/Duplication

Total Lines 72
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 72
rs 10
wmc 20

3 Methods

Rating   Name   Duplication   Size   Complexity  
A _update_action_execution() 0 21 1
C _encode() 0 20 9
F callback() 0 27 10
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import ast
18
import copy
19
import json
20
import re
21
import retrying
22
import six
23
24
from oslo_config import cfg
25
from mistralclient.api import client as mistral
26
27
from st2common.constants import action as action_constants
28
from st2common import log as logging
29
from st2common.callback import base as callback
30
from st2common.util.workflow import mistral as utils
31
32
33
LOG = logging.getLogger(__name__)
34
35
36
STATUS_MAP = {
37
    action_constants.LIVEACTION_STATUS_REQUESTED: 'RUNNING',
38
    action_constants.LIVEACTION_STATUS_SCHEDULED: 'RUNNING',
39
    action_constants.LIVEACTION_STATUS_DELAYED: 'RUNNING',
40
    action_constants.LIVEACTION_STATUS_RUNNING: 'RUNNING',
41
    action_constants.LIVEACTION_STATUS_SUCCEEDED: 'SUCCESS',
42
    action_constants.LIVEACTION_STATUS_FAILED: 'ERROR',
43
    action_constants.LIVEACTION_STATUS_TIMED_OUT: 'ERROR',
44
    action_constants.LIVEACTION_STATUS_ABANDONED: 'ERROR',
45
    action_constants.LIVEACTION_STATUS_PENDING: 'PAUSED',
46
    action_constants.LIVEACTION_STATUS_CANCELING: 'CANCELLED',
47
    action_constants.LIVEACTION_STATUS_CANCELED: 'CANCELLED',
48
    action_constants.LIVEACTION_STATUS_PAUSING: 'PAUSED',
49
    action_constants.LIVEACTION_STATUS_PAUSED: 'PAUSED',
50
    action_constants.LIVEACTION_STATUS_RESUMING: 'RUNNING'
51
}
52
53
MISTRAL_ACCEPTED_STATES = copy.deepcopy(action_constants.LIVEACTION_COMPLETED_STATES)
54
MISTRAL_ACCEPTED_STATES += [action_constants.LIVEACTION_STATUS_PAUSED]
55
56
57
def get_instance():
58
    return MistralCallbackHandler
59
60
61
def get_action_execution_id_from_url(url):
62
    match = re.search('(.+)/action_executions/(.+)', url)
63
    if not match or len(match.groups()) != 2:
64
        raise ValueError('Unable to extract the action execution ID '
65
                         'from the callback URL (%s).' % (url))
66
67
    return match.group(2)
68
69
70
class MistralCallbackHandler(callback.AsyncActionExecutionCallbackHandler):
71
72
    @classmethod
73
    @retrying.retry(
74
        retry_on_exception=utils.retry_on_exceptions,
75
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
76
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
77
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
78
    def _update_action_execution(cls, url, data):
79
        action_execution_id = get_action_execution_id_from_url(url)
80
81
        LOG.info('Sending callback to %s with data %s.', url, data)
82
83
        client = mistral.client(
84
            mistral_url=cfg.CONF.mistral.v2_base_url,
85
            username=cfg.CONF.mistral.keystone_username,
86
            api_key=cfg.CONF.mistral.keystone_password,
87
            project_name=cfg.CONF.mistral.keystone_project_name,
88
            auth_url=cfg.CONF.mistral.keystone_auth_url,
89
            cacert=cfg.CONF.mistral.cacert,
90
            insecure=cfg.CONF.mistral.insecure)
91
92
        client.action_executions.update(action_execution_id, **data)
93
94
    @classmethod
95
    def _encode(cls, value):
96
        if isinstance(value, dict):
97
            return {k: cls._encode(v) for k, v in six.iteritems(value)}
98
        elif isinstance(value, list):
99
            return [cls._encode(item) for item in value]
100
        elif isinstance(value, six.string_types) and not six.PY3:
101
            try:
102
                value = value.decode('utf-8')
103
            except Exception:
104
                LOG.exception('Unable to decode value to utf-8.')
105
106
            try:
107
                value = value.encode('unicode_escape')
108
            except Exception:
109
                LOG.exception('Unable to unicode escape value.')
110
111
            return value
112
        else:
113
            return value
114
115
    @classmethod
116
    def callback(cls, liveaction):
117
        assert isinstance(liveaction.callback, dict)
118
        assert 'url' in liveaction.callback
119
120
        url = liveaction.callback['url']
121
        status = liveaction.status
122
        result = liveaction.result
123
124
        if status not in MISTRAL_ACCEPTED_STATES:
125
            LOG.warning('Unable to callback %s because status "%s" is not supported.', url, status)
126
            return
127
128
        try:
129
            if isinstance(result, six.string_types) and len(result) > 0 and result[0] in ['{', '[']:
130
                value = ast.literal_eval(result)
131
                if type(value) in [dict, list]:
132
                    result = value
133
134
            result = cls._encode(result)
135
            output = json.dumps(result) if type(result) in [dict, list] else str(result)
136
            output = output.replace('\\\\\\\\u', '\\\\u')
137
            data = {'state': STATUS_MAP[status], 'output': output}
138
139
            cls._update_action_execution(url, data)
140
        except Exception as e:
141
            LOG.exception(e)
142