1
|
|
|
from st2tests.base import BaseSensorTestCase |
2
|
|
|
|
3
|
|
|
from third_party_resource import ThirdPartyResource |
4
|
|
|
|
5
|
|
|
|
6
|
|
|
class ThirdPartyResourceTestCase(BaseSensorTestCase): |
7
|
|
|
sensor_cls = ThirdPartyResource |
8
|
|
|
|
9
|
|
|
def test_k8s_object_to_st2_trigger_bad_object(self): |
10
|
|
|
k8s_obj = { |
11
|
|
|
'type': 'kanye', |
12
|
|
|
'object': { |
13
|
|
|
'kind': 'president', |
14
|
|
|
'metadata': { |
15
|
|
|
'name': 'west', |
16
|
|
|
'namespace': 'westashians' |
17
|
|
|
# uid missing |
18
|
|
|
# label missing |
19
|
|
|
} |
20
|
|
|
} |
21
|
|
|
} |
22
|
|
|
sensor = self.get_sensor_instance() |
23
|
|
|
self.assertRaises(KeyError, sensor._k8s_object_to_st2_trigger, k8s_obj) |
24
|
|
|
|
25
|
|
|
def test_k8s_object_to_st2_trigger(self): |
26
|
|
|
k8s_obj = { |
27
|
|
|
'type': 'kanye', |
28
|
|
|
'object': { |
29
|
|
|
'kind': 'president', |
30
|
|
|
'metadata': { |
31
|
|
|
'name': 'west', |
32
|
|
|
'namespace': 'westashians', |
33
|
|
|
'uid': 'coinye', |
34
|
|
|
'labels': ['rapper', 'train wrecker'] |
35
|
|
|
} |
36
|
|
|
} |
37
|
|
|
} |
38
|
|
|
sensor = self.get_sensor_instance() |
39
|
|
|
payload = sensor._k8s_object_to_st2_trigger(k8s_obj) |
40
|
|
|
self.assertTrue('resource' in payload) |
41
|
|
|
self.assertEqual(payload['resource'], k8s_obj['type']) |
42
|
|
|
self.assertTrue('object_kind' in payload) |
43
|
|
|
self.assertEqual(payload['object_kind'], k8s_obj['object']['kind']) |
44
|
|
|
self.assertTrue('name' in payload) |
45
|
|
|
self.assertEqual(payload['name'], k8s_obj['object']['metadata']['name']) |
46
|
|
|
self.assertTrue('labels' in payload) |
47
|
|
|
self.assertListEqual(payload['labels'], k8s_obj['object']['metadata']['labels']) |
48
|
|
|
self.assertTrue('namespace' in payload) |
49
|
|
|
self.assertEqual(payload['namespace'], k8s_obj['object']['metadata']['namespace']) |
50
|
|
|
self.assertTrue('uid' in payload) |
51
|
|
|
self.assertEqual(payload['uid'], k8s_obj['object']['metadata']['uid']) |
52
|
|
|
|
53
|
|
|
def test_get_trigger_payload_from_line(self): |
54
|
|
|
line = '{"object": {"kind": "president", ' + \ |
55
|
|
|
'"metadata": {"labels": ["rapper", "train wrecker"], ' + \ |
56
|
|
|
'"namespace": "westashians", ' + \ |
57
|
|
|
'"name": "west", "uid": "coinye"}}, "type": "kanye"}' |
58
|
|
|
sensor = self.get_sensor_instance() |
59
|
|
|
payload = sensor._get_trigger_payload_from_line(line) |
60
|
|
|
self.assertTrue(payload is not None) |
61
|
|
|
self.assertTrue('resource' in payload) |
62
|
|
|
self.assertTrue('object_kind' in payload) |
63
|
|
|
self.assertTrue('name' in payload) |
64
|
|
|
self.assertTrue('labels' in payload) |
65
|
|
|
self.assertTrue('namespace' in payload) |
66
|
|
|
self.assertTrue('uid' in payload) |
67
|
|
|
|