Completed
Pull Request — master (#392)
by
unknown
02:13
created

ThirdPartyResource   A

Complexity

Total Complexity 9

Size/Duplication

Total Lines 49
Duplicated Lines 0 %
Metric Value
dl 0
loc 49
rs 10
wmc 9

8 Methods

Rating   Name   Duplication   Size   Complexity  
A remove_trigger() 0 2 1
A cleanup() 0 2 1
A setup() 0 7 1
A __init__() 0 4 1
A add_trigger() 0 2 1
A update_trigger() 0 2 1
A poll() 0 16 2
A _process_message() 0 2 1
1
#!/usr/bin/env python
2
import base64
3
import json
4
import requests
5
import yaml
6
import argparse
7
import os, sys
8
import pprint
9
import ast
10
from labels import Labels
11
from swagger_client import *
12
from swagger_client_beta1 import *
13
# from [email protected]:mward29/python-k8sclient.git
14
15
from st2reactor.sensor.base import PollingSensor
16
17
18
class ThirdPartyResource(PollingSensor):
19
    def __init__(self, sensor_service, config=None, poll_interval=None):
20
        super(ThirdPartyResource, self).__init__(sensor_service=sensor_service,
21
                                                  config=config,
22
                                                  poll_interval=poll_interval)
23
#        self._labels = self._config['labels'].get('thirdpartyresource', [])
24
25
26
    def setup(self):
27
        KUBERNETES_API_URL = self._config['kubernetes_api_url']
28
        auth = self._config['auth']
29
        encoded_auth = base64.b64encode(auth)
30
        Configuration().verify_ssl = False
31
        self.client = api_client.ApiClient(KUBERNETES_API_URL, 'Authorization', "Basic %s" % encoded_auth)
32
        pass
33
34
    def poll(self):
35
        client = self.client
36
        api_beta1 = apis.ApisextensionsvbetaApi(client)
37
        d = api_beta1.list_third_party_resource().to_dict()
38
        d_list = d['items']
39
        for i in xrange(len(d_list)):
40
            name =  d_list[i]['metadata']['name']
41
            uid = d_list[i]['metadata']['uid']
42
            metadata = d_list[i]['metadata']
43
            versions = d_list[i]['versions']
44
#            pprint.pprint(name)
45
#            pprint.pprint(uid)
46
            self._sensor_service.set_value(name=(name + uid), value=(metadata))
47
        kvp = self._sensor_service.list_values(prefix='mysql')
48
        pprint.pprint(kvp)
49
        pass
50
51
52
53
    def cleanup(self):
54
        pass
55
56
    def add_trigger(self, trigger):
57
        pass
58
59
    def update_trigger(self, trigger):
60
        pass
61
62
    def remove_trigger(self, trigger):
63
        pass
64
65
    def _process_message(self, message):
66
        pass
67
68
#        l = d_list[i]['metadata']['labels']
69
#        a = d_list[i]['metadata']['annotations']
70
71
        #            if l is not None:
72
        #                labels = ast.literal_eval(l)
73
        #            if a is not None:
74
        #                annotations = ast.literal_eval(a)
75
76
            #    labels_config = self._config['labels']['thirdpartyresource']
77
            #    pprint.pprint(labels_config)
78