Completed
Pull Request — master (#400)
by
unknown
01:45
created

ThirdPartyResource.run()   B

Complexity

Conditions 5

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 5
dl 0
loc 18
rs 8.5454
1
#!/usr/bin/env python
2
import json
3
import requests
4
import sys
5
from requests.auth import HTTPBasicAuth
6
import ast
7
# from [email protected]:mward29/python-k8sclient.git
8
9
from st2reactor.sensor.base import Sensor
10
11
12
class ThirdPartyResource(Sensor):
13
    def __init__(self, sensor_service, config=None):
14
        super(ThirdPartyResource, self).__init__(sensor_service=sensor_service,
15
                                                 config=config)
16
        self._log = self._sensor_service.get_logger(__name__)
17
        self.TRIGGER_REF = 'kubernetes.thirdpartyobject'
18
        self.client = None
19
20
    def setup(self):
21
        try:
22
            extension = self._config['extension_url']
23
            KUBERNETES_API_URL = self._config['kubernetes_api_url'] + extension
24
            user = self._config['user']
25
            password = self._config['password']
26
            verify = self._config['verify']
27
        except KeyError:
28
            self._log.exception('Configuration file does not contain required fields.')
29
            raise
30
        self._log.debug('Connecting to Kubernetes endpoint %s via api_client.' %
31
                        KUBERNETES_API_URL)
32
        self.client = requests.get(KUBERNETES_API_URL, auth=HTTPBasicAuth(user, password),
33
                                   verify=verify, stream=True)
34
35
    def run(self):
36
        self._log.debug('Watch Kubernetes for thirdpartyresource information')
37
        r = self.client
38
        lines = r.iter_lines()
39
        # Save the first line for later or just skip it
40
        # first_line = next(lines)
41
42
        for line in lines:
43
            try:
44
                trigger_payload = self._get_trigger_payload_from_line(line)
45
            except:
46
                msg = ('Failed generating trigger payload from line %s. Aborting sensor!!!' %
47
                    line)
48
                self._log.exception(msg)
49
                sys.exit(1)
50
            else:
51
                self._log.debug('Triggering Dispatch Now')
52
                self._sensor_service.dispatch(trigger=self.TRIGGER_REF, payload=trigger_payload)
53
54
    def _get_trigger_payload_from_line(self, line):
55
        k8s_object = self._fix_utf8_enconding_and_eval(line)
56
        self._log.debug('Incoming k8s object (from API response): %s', k8s_object)
57
        payload = self._k8s_object_to_st2_trigger(k8s_object)
58
        return payload
59
60
    def _fix_utf8_enconding_and_eval(self, line):
61
        # need to perform a json dump due to uft8 error prior to performing a json.load
62
        io = json.dumps(line)
63
        n = json.loads(io)
64
        line = ast.literal_eval(n)
65
        return line
66
67
    def _k8s_object_to_st2_trigger(self, k8s_object):
68
        # Define some variables
69
        try:
70
            resource_type = k8s_object['type']
71
            object_kind = k8s_object['object']['kind']
72
            name = k8s_object['object']['metadata']['name']
73
            namespace = k8s_object['object']['metadata']['namespace']
74
            uid = k8s_object['object']['metadata']['uid']
75
            labels_data = k8s_object['object']['metadata']['labels']
76
        except KeyError:
77
            msg = 'One of "type", "kind", "name", "namespace" or "uid" or "labels" ' + \
78
                  'do not exist in the object. Incoming object=%s' % k8s_object
79
            self._log.exception(msg)
80
            raise
81
        else:
82
            payload = self._build_a_trigger(resource_type=resource_type, name=name,
83
                                    labels=labels_data, namespace=namespace,
84
                                    object_kind=object_kind, uid=uid)
85
            self._log.debug('Trigger payload: %s.' % payload)
86
            return payload
87
88
    def _build_a_trigger(self, resource_type, name, labels, namespace, object_kind, uid):
89
        payload = {
90
            'resource': resource_type,
91
            'name': name,
92
            'labels': labels,
93
            'namespace': namespace,
94
            'object_kind': object_kind,
95
            'uid': uid
96
        }
97
98
        return payload
99
100
    def cleanup(self):
101
        pass
102
103
    def add_trigger(self, trigger):
104
        pass
105
106
    def update_trigger(self, trigger):
107
        pass
108
109
    def remove_trigger(self, trigger):
110
        pass
111