Completed
Pull Request — master (#395)
by
unknown
01:42
created

ThirdPartyResource   A

Complexity

Total Complexity 12

Size/Duplication

Total Lines 83
Duplicated Lines 0 %
Metric Value
dl 0
loc 83
rs 10
wmc 12

10 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 3 1
A remove_trigger() 0 2 1
A run() 0 14 2
A cleanup() 0 2 1
A setup() 0 11 1
A _build_a_trigger() 0 16 1
A add_trigger() 0 2 1
A _k8s_object() 0 17 2
A update_trigger() 0 2 1
A _process_message() 0 3 1
1
#!/usr/bin/env python
2
import json
3
import requests
4
from requests.auth import HTTPBasicAuth
5
import ast
6
# from [email protected]:mward29/python-k8sclient.git
7
8
from st2reactor.sensor.base import Sensor
9
10
11
class ThirdPartyResource(Sensor):
12
    def __init__(self, sensor_service, config=None):
13
        super(ThirdPartyResource, self).__init__(sensor_service=sensor_service,
14
                                                 config=config)
15
#        self._labels = self._config['labels'].get('thirdpartyresource', [])\
16
17
    def setup(self):
18
        self._logger = self._sensor_service.get_logger(name=self.__class__.__name__)
0 ignored issues
show
Coding Style introduced by
The attribute _logger was defined outside __init__.

It is generally a good practice to initialize all attributes to default values in the __init__ method:

class Foo:
    def __init__(self, x=None):
        self.x = x
Loading history...
19
        self._logger.debug('Connecting to Kubernetes via api_client')
20
        extension = self._config['extension_url']
21
        KUBERNETES_API_URL = self._config['kubernetes_api_url'] + extension
22
        user = self._config['user']
23
        password = self._config['password']
24
        verify = self._config['verify']
25
        self.client = requests.get(KUBERNETES_API_URL, auth=HTTPBasicAuth(user, password),
0 ignored issues
show
Coding Style introduced by
The attribute client was defined outside __init__.

It is generally a good practice to initialize all attributes to default values in the __init__ method:

class Foo:
    def __init__(self, x=None):
        self.x = x
Loading history...
26
                                   verify=verify, stream=True)
27
        pass
0 ignored issues
show
Unused Code introduced by
Unnecessary pass statement
Loading history...
28
29
    def run(self):
30
        self._logger = self._sensor_service.get_logger(name=self.__class__.__name__)
0 ignored issues
show
Coding Style introduced by
The attribute _logger was defined outside __init__.

It is generally a good practice to initialize all attributes to default values in the __init__ method:

class Foo:
    def __init__(self, x=None):
        self.x = x
Loading history...
31
        self._logger.debug('Watch Kubernetes for thirdpartyresource information')
32
        r = self.client
33
        lines = r.iter_lines()
34
        # Save the first line for later or just skip it
35
#        first_line = next(lines)
36
        for line in lines:
37
            io = json.dumps(line)
38
            n = json.loads(io)
39
            d_list = ast.literal_eval(n)
40
            self._k8s_object(d_list=d_list)
41
#            self._resource_version(d_list=d_list)
42
        pass
0 ignored issues
show
Unused Code introduced by
Unnecessary pass statement
Loading history...
43
44
    def _k8s_object(self, d_list):
45
        # Define some variables
46
        resource_type = d_list['type']
47
        object_kind = d_list['object']['kind']
48
        name = d_list['object']['metadata']['name']
49
        namespace = d_list['object']['metadata']['namespace']
50
        uid = d_list['object']['metadata']['uid']
51
        # Now lets see if labels exist, if so build a trigger
52
        if 'labels' in d_list['object']['metadata']:
53
            labels_data = d_list['object']['metadata']['labels']
54
            self._build_a_trigger(resource_type=resource_type, name=name, labels=labels_data,
55
                                  namespace=namespace, object_kind=object_kind, uid=uid)
56
        else:
57
            print("No Labels for the resource below. Tough to proceed without knowing how \
58
                  to work with this object.")
59
            print(name, namespace, uid)
60
        pass
0 ignored issues
show
Unused Code introduced by
Unnecessary pass statement
Loading history...
61
62
    def _build_a_trigger(self, resource_type, name, labels, namespace, object_kind, uid):
63
        trigger = 'kubernetes.thirdpartyobject'
64
        payload = {
65
            'resource': resource_type,
66
            'name': name,
67
            'labels': labels,
68
            'namespace': namespace,
69
            'object_kind': object_kind,
70
            'uid': uid
71
        }
72
        self._logger = self._sensor_service.get_logger(name=self.__class__.__name__)
0 ignored issues
show
Coding Style introduced by
The attribute _logger was defined outside __init__.

It is generally a good practice to initialize all attributes to default values in the __init__ method:

class Foo:
    def __init__(self, x=None):
        self.x = x
Loading history...
73
        self._logger.debug('Triggering Dispatch Now')
74
75
        # Create dispatch trigger
76
        self._sensor_service.dispatch(trigger=trigger, payload=payload)
77
        pass
0 ignored issues
show
Unused Code introduced by
Unnecessary pass statement
Loading history...
78
79
    def cleanup(self):
80
        pass
81
82
    def add_trigger(self, trigger):
83
        pass
84
85
    def update_trigger(self, trigger):
86
        pass
87
88
    def remove_trigger(self, trigger):
89
        pass
90
91
    def _process_message(self, message):
92
93
        pass
94