Completed
Pull Request — master (#376)
by Tomaz
05:40
created

St2BaseAction._get_cacert()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 3
rs 10
1
import os
2
3
from st2actions.runners.pythonrunner import Action
4
from st2client.client import Client
5
from st2client.models.keyvalue import KeyValuePair  # pylint: disable=no-name-in-module
6
7
from lib.utils import filter_none_values
8
9
10
__all__ = [
11
    'St2BaseAction'
12
]
13
14
15
class St2BaseAction(Action):
16
    def __init__(self, config):
17
        super(St2BaseAction, self).__init__(config)
18
        self._client = Client
19
        self._kvp = KeyValuePair
20
        self.client = self._get_client()
21
22
    def _get_client(self):
23
        base_url, api_url = self._get_st2_urls()
24
        token = self._get_auth_token()
25
        cacert = self._get_cacert()
26
27
        client_kwargs = {}
28
        if cacert:
29
            client_kwargs['cacert'] = cacert
30
31
        return self._client(base_url=base_url, api_url=api_url, token=token,
32
                            **client_kwargs)
33
34
    def _get_st2_urls(self):
35
        # First try to use base_url from config.
36
        base_url = self.config.get('base_url', None)
37
        api_url = None
38
39
        # not found look up from env vars. Assuming the pack is
40
        # configuered to work with current StackStorm instance.
41
        if not base_url:
42
            api_url = os.environ.get('ST2_ACTION_API_URL', None)
43
44
        return base_url, api_url
45
46
    def _get_auth_token(self):
47
        # First try to use auth_token from config.
48
        token = self.config.get('auth_token', None)
49
50
        # not found look up from env vars. Assuming the pack is
51
        # configuered to work with current StackStorm instance.
52
        if not token:
53
            token = os.environ.get('ST2_ACTION_AUTH_TOKEN', None)
54
55
        return token
56
57
    def _get_cacert(self):
58
        cacert = self.config.get('cacert', None)
59
        return cacert
60
61
    def _run_client_method(self, method, method_kwargs, format_func, format_kwargs=None):
62
        """
63
        Run the provided client method and format the result.
64
65
        :param method: Client method to run.
66
        :type method: ``func``
67
68
        :param method_kwargs: Keyword arguments passed to the client method.
69
        :type method_kwargs: ``dict``
70
71
        :param format_func: Function for formatting the result.
72
        :type format_func: ``func``
73
74
        :rtype: ``list`` of ``dict``
75
        """
76
        # Filter out parameters with string value of "None"
77
        # This is a work around since the default values can only be strings
78
        method_kwargs = filter_none_values(method_kwargs)
79
        method_name = method.__name__
80
        self.logger.debug('Calling client method "%s" with kwargs "%s"' % (method_name,
81
                                                                           method_kwargs))
82
83
        result = method(**method_kwargs)
84
        result = format_func(result, **format_kwargs or {})
85
        return result
86