Completed
Pull Request — master (#395)
by
unknown
01:36
created

ThirdPartyResource   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 92
Duplicated Lines 0 %
Metric Value
dl 0
loc 92
rs 10
wmc 15

10 Methods

Rating   Name   Duplication   Size   Complexity  
A setup() 0 10 1
A remove_trigger() 0 2 1
A run() 0 16 4
A cleanup() 0 2 1
A _build_a_trigger() 0 15 1
A add_trigger() 0 2 1
A _k8s_object() 0 23 3
A update_trigger() 0 2 1
A _get_trigger_payload_from_line() 0 7 1
A _process_message() 0 3 1
1
#!/usr/bin/env python
2
import json
3
import requests
4
import sys
5
from requests.auth import HTTPBasicAuth
6
import ast
7
# from [email protected]:mward29/python-k8sclient.git
8
9
from st2reactor.sensor.base import Sensor
10
11
12
class ThirdPartyResource(Sensor):
13
    def setup(self):
14
        self._log = self._sensor_service.get_logger(__name__)
0 ignored issues
show
Coding Style introduced by
The attribute _log was defined outside __init__.

It is generally a good practice to initialize all attributes to default values in the __init__ method:

class Foo:
    def __init__(self, x=None):
        self.x = x
Loading history...
15
        self._log.debug('Connecting to Kubernetes via api_client')
16
        extension = self._config['extension_url']
17
        KUBERNETES_API_URL = self._config['kubernetes_api_url'] + extension
18
        user = self._config['user']
19
        password = self._config['password']
20
#        verify = self._config['verify']
21
        self.client = requests.get(KUBERNETES_API_URL, auth=HTTPBasicAuth(user, password),
0 ignored issues
show
Coding Style introduced by
The attribute client was defined outside __init__.

It is generally a good practice to initialize all attributes to default values in the __init__ method:

class Foo:
    def __init__(self, x=None):
        self.x = x
Loading history...
22
                                   verify=False, stream=True)
23
24
    def run(self):
25
        self._log.debug('Watch Kubernetes for thirdpartyresource information')
26
        r = self.client
27
        lines = r.iter_lines()
28
        # Save the first line for later or just skip it
29
        # first_line = next(lines)
30
31
        trigger = 'kubernetes.thirdpartyobject'
32
33
        for line in lines:
34
            try:
35
                trigger_payload = self._get_trigger_payload_from_line(line)
36
            except:
37
                sys.exit(1)
38
            else:
39
                self._sensor_service.dispatch(trigger=trigger, payload=trigger_payload)
40
41
    def _get_trigger_payload_from_line(self, line):
42
        # need to perform a json dump due to uft8 error prior to performing a json.load
43
        io = json.dumps(line)
44
        n = json.loads(io)
45
        line = ast.literal_eval(n)
46
        payload = self._k8s_object(line)
47
        return payload
48
49
    def _k8s_object(self, line):
50
        # Define some variables
51
        try:
52
            resource_type = line['type']
53
            object_kind = line['object']['kind']
54
            name = line['object']['metadata']['name']
55
            namespace = line['object']['metadata']['namespace']
56
            uid = line['object']['metadata']['uid']
57
        except:
58
            self._log.debug("type, kind, name, namespace or uid do not exist in the object.\
59
                              must exit")
60
            sys.exit()
61
62
        # Now lets see if labels exist, if so build a trigger else exit
63
        if 'labels' in line['object']['metadata']:
64
            labels_data = line['object']['metadata']['labels']
65
            self._build_a_trigger(resource_type=resource_type, name=name, labels=labels_data,
66
                                  namespace=namespace, object_kind=object_kind, uid=uid)
67
        else:
68
            self._log.debug("No Labels for the resource below. Tough to proceed without knowing how \
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (101/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
69
                  to work with this object.")
70
            self._log.debug(name, namespace, uid)
71
            sys.exit(1)
72
73
    def _build_a_trigger(self, resource_type, name, labels, namespace, object_kind, uid):
74
        trigger = 'kubernetes.thirdpartyobject'
75
        payload = {
76
            'resource': resource_type,
77
            'name': name,
78
            'labels': labels,
79
            'namespace': namespace,
80
            'object_kind': object_kind,
81
            'uid': uid
82
        }
83
84
        self._log.debug('Triggering Dispatch Now')
85
86
        # Create dispatch trigger
87
        self._sensor_service.dispatch(trigger=trigger, payload=payload)
88
89
    def cleanup(self):
90
        pass
91
92
    def add_trigger(self, trigger):
93
        pass
94
95
    def update_trigger(self, trigger):
96
        pass
97
98
    def remove_trigger(self, trigger):
99
        pass
100
101
    def _process_message(self, message):
102
103
        pass
104