Completed
Pull Request — master (#327)
by Tomaz
03:46
created

DockerSensorTestCase   A

Complexity

Total Complexity 1

Size/Duplication

Total Lines 27
Duplicated Lines 0 %
Metric Value
dl 0
loc 27
rs 10
wmc 1

1 Method

Rating   Name   Duplication   Size   Complexity  
B test_poll() 0 26 1
1
import mock
2
3
from st2tests.base import BaseSensorTestCase
4
5
from docker_container_sensor import DockerSensor
6
7
MOCK_CONTAINER_DATA = {
8
    "Id": "8dfafdbc3a40",
9
    "Names": ["/boring_feynman"],
10
    "Image": "ubuntu:latest",
11
    "Command": "echo 1",
12
    "Created": 1367854155,
13
    "Status": "Exit 0",
14
    "Ports": [{"PrivatePort": 2222, "PublicPort": 3333, "Type": "tcp"}],
15
    "SizeRw": 12288,
16
    "SizeRootFs": 0
17
}
18
19
20
class DockerSensorTestCase(BaseSensorTestCase):
21
    def test_poll(self):
22
        # TODO: Move to base test class
23
        sensor = DockerSensor(sensor_service=self.sensor_service)
24
25
        # No existing and no running containers (e.g. after initial sensor poll)
26
        sensor._running_containers = {}
27
        sensor._get_active_containers = mock.Mock()
28
        sensor._get_active_containers.return_value = {}
29
30
        sensor.poll()
31
        self.assertEqual(self.get_dispatched_triggers(), [])
32
33
        # One container started
34
        sensor._get_active_containers.return_value = {'1': MOCK_CONTAINER_DATA}
35
36
        sensor.poll()
37
        self.assertEqual(len(self.get_dispatched_triggers()), 1)
38
        self.assertTriggerDispatched(trigger='docker.container_tracker.started',
39
                                     payload={'container_info': MOCK_CONTAINER_DATA})
40
41
        # One container stopped
42
        sensor._get_active_containers.return_value = {}
43
        sensor.poll()
44
        self.assertEqual(len(self.get_dispatched_triggers()), 2)
45
        self.assertTriggerDispatched(trigger='docker.container_tracker.stopped',
46
                                     payload={'container_info': MOCK_CONTAINER_DATA})
47