Completed
Pull Request — master (#395)
by
unknown
01:51
created

ThirdPartyResource   A

Complexity

Total Complexity 12

Size/Duplication

Total Lines 82
Duplicated Lines 0 %
Metric Value
dl 0
loc 82
rs 10
wmc 12

10 Methods

Rating   Name   Duplication   Size   Complexity  
A remove_trigger() 0 2 1
A run() 0 14 2
A cleanup() 0 2 1
A setup() 0 10 1
A _build_a_trigger() 0 16 1
A __init__() 0 3 1
A add_trigger() 0 2 1
A _k8s_object() 0 17 2
A update_trigger() 0 2 1
A _process_message() 0 3 1
1
#!/usr/bin/env python
2
import json
3
import requests
4
from requests.auth import HTTPBasicAuth
5
import ast
6
# from [email protected]:mward29/python-k8sclient.git
7
8
from st2reactor.sensor.base import Sensor
9
10
11
class ThirdPartyResource(Sensor):
12
    def __init__(self, sensor_service, config=None):
13
        super(ThirdPartyResource, self).__init__(sensor_service=sensor_service,
14
                                                 config=config)
15
#        self._labels = self._config['labels'].get('thirdpartyresource', [])\
16
17
    def setup(self):
18
        self._logger = self._sensor_service.get_logger(name=self.__class__.__name__)
0 ignored issues
show
Coding Style introduced by
The attribute _logger was defined outside __init__.

It is generally a good practice to initialize all attributes to default values in the __init__ method:

class Foo:
    def __init__(self, x=None):
        self.x = x
Loading history...
19
        self._logger.debug('Connecting to Kubernetes via api_client')
20
        extension = self._config['extension_url']
21
        KUBERNETES_API_URL = self._config['kubernetes_api_url'] + extension
22
        user = self._config['user']
23
        password = self._config['password']
24
        self.client = requests.get(KUBERNETES_API_URL, auth=HTTPBasicAuth(user, password),
0 ignored issues
show
Coding Style introduced by
The attribute client was defined outside __init__.

It is generally a good practice to initialize all attributes to default values in the __init__ method:

class Foo:
    def __init__(self, x=None):
        self.x = x
Loading history...
25
                                   verify=False, stream=True)
26
        pass
0 ignored issues
show
Unused Code introduced by
Unnecessary pass statement
Loading history...
27
28
    def run(self):
29
        self._logger = self._sensor_service.get_logger(name=self.__class__.__name__)
0 ignored issues
show
Coding Style introduced by
The attribute _logger was defined outside __init__.

It is generally a good practice to initialize all attributes to default values in the __init__ method:

class Foo:
    def __init__(self, x=None):
        self.x = x
Loading history...
30
        self._logger.debug('Watch Kubernetes for thirdpartyresource information')
31
        r = self.client
32
        lines = r.iter_lines()
33
        # Save the first line for later or just skip it
34
#        first_line = next(lines)
35
        for line in lines:
36
            io = json.dumps(line)
37
            n = json.loads(io)
38
            d_list = ast.literal_eval(n)
39
            self._k8s_object(d_list=d_list)
40
#            self._resource_version(d_list=d_list)
41
        pass
0 ignored issues
show
Unused Code introduced by
Unnecessary pass statement
Loading history...
42
43
    def _k8s_object(self, d_list):
44
        # Define some variables
45
        resource_type = d_list['type']
46
        object_kind = d_list['object']['kind']
47
        name = d_list['object']['metadata']['name']
48
        namespace = d_list['object']['metadata']['namespace']
49
        uid = d_list['object']['metadata']['uid']
50
        # Now lets see if labels exist, if so build a trigger
51
        if 'labels' in d_list['object']['metadata']:
52
            labels_data = d_list['object']['metadata']['labels']
53
            self._build_a_trigger(resource_type=resource_type, name=name, labels=labels_data,
54
                                  namespace=namespace, object_kind=object_kind, uid=uid)
55
        else:
56
            print("No Labels for the resource below. Tough to proceed without knowing how \
57
                  to work with this object.")
58
            print(name, namespace, uid)
59
        pass
0 ignored issues
show
Unused Code introduced by
Unnecessary pass statement
Loading history...
60
61
    def _build_a_trigger(self, resource_type, name, labels, namespace, object_kind, uid):
62
        trigger = 'kubernetes.thirdpartyobject'
63
        payload = {
64
            'resource': resource_type,
65
            'name': name,
66
            'labels': labels,
67
            'namespace': namespace,
68
            'object_kind': object_kind,
69
            'uid': uid
70
        }
71
        self._logger = self._sensor_service.get_logger(name=self.__class__.__name__)
0 ignored issues
show
Coding Style introduced by
The attribute _logger was defined outside __init__.

It is generally a good practice to initialize all attributes to default values in the __init__ method:

class Foo:
    def __init__(self, x=None):
        self.x = x
Loading history...
72
        self._logger.debug('Triggering Dispatch Now')
73
74
        # Create dispatch trigger
75
        self._sensor_service.dispatch(trigger=trigger, payload=payload)
76
        pass
0 ignored issues
show
Unused Code introduced by
Unnecessary pass statement
Loading history...
77
78
    def cleanup(self):
79
        pass
80
81
    def add_trigger(self, trigger):
82
        pass
83
84
    def update_trigger(self, trigger):
85
        pass
86
87
    def remove_trigger(self, trigger):
88
        pass
89
90
    def _process_message(self, message):
91
92
        pass
93