Completed
Pull Request — master (#327)
by Tomaz
01:49
created

DockerSensorTestCase   A

Complexity

Total Complexity 1

Size/Duplication

Total Lines 29
Duplicated Lines 0 %
Metric Value
dl 0
loc 29
rs 10
wmc 1

1 Method

Rating   Name   Duplication   Size   Complexity  
B test_poll() 0 26 1
1
import mock
2
3
from st2tests.base import BaseSensorTestCase
4
5
from docker_container_sensor import DockerSensor
6
7
MOCK_CONTAINER_DATA = {
8
    "Id": "8dfafdbc3a40",
9
    "Names": ["/boring_feynman"],
10
    "Image": "ubuntu:latest",
11
    "Command": "echo 1",
12
    "Created": 1367854155,
13
    "Status": "Exit 0",
14
    "Ports": [{"PrivatePort": 2222, "PublicPort": 3333, "Type": "tcp"}],
15
    "SizeRw": 12288,
16
    "SizeRootFs": 0
17
}
18
19
20
class DockerSensorTestCase(BaseSensorTestCase):
21
    sensor_cls = DockerSensor
22
23
    def test_poll(self):
24
        sensor = self.get_sensor_instance()
25
26
        # No existing and no running containers (e.g. after initial sensor poll)
27
        sensor._running_containers = {}
28
        sensor._get_active_containers = mock.Mock()
29
        sensor._get_active_containers.return_value = {}
30
31
        # Initial poll, no containers
32
        sensor.poll()
33
        self.assertEqual(self.get_dispatched_triggers(), [])
34
35
        # One container started
36
        sensor._get_active_containers.return_value = {'1': MOCK_CONTAINER_DATA}
37
38
        sensor.poll()
39
        self.assertEqual(len(self.get_dispatched_triggers()), 1)
40
        self.assertTriggerDispatched(trigger='docker.container_tracker.started',
41
                                     payload={'container_info': MOCK_CONTAINER_DATA})
42
43
        # One container stopped
44
        sensor._get_active_containers.return_value = {}
45
        sensor.poll()
46
        self.assertEqual(len(self.get_dispatched_triggers()), 2)
47
        self.assertTriggerDispatched(trigger='docker.container_tracker.stopped',
48
                                     payload={'container_info': MOCK_CONTAINER_DATA})
49