Passed
Push — master ( e8584f...303446 )
by Emmanuel
04:55
created

docker_clean_test   A

Complexity

Total Complexity 9

Size/Duplication

Total Lines 156
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 113
dl 0
loc 156
rs 10
c 0
b 0
f 0
wmc 9

2 Functions

Rating   Name   Duplication   Size   Complexity  
A clean_cts() 0 8 3
A clean_images() 0 7 3

3 Methods

Rating   Name   Duplication   Size   Complexity  
B DockerCleanTest.test_full_clean() 0 100 1
A DockerCleanTest.test_bad_arg() 0 4 1
A DockerCleanTest.test_no_arg() 0 11 1
1
import os
2
import sys
3
import unittest
4
5
from click.testing import CliRunner
6
from docker.errors import NotFound
7
from stakkr.docker_clean import clean
8
from stakkr.docker_actions import get_client as get_docker_client
9
from subprocess import Popen, DEVNULL, PIPE
10
11
base_dir = os.path.abspath(os.path.dirname(__file__))
12
sys.path.insert(0, base_dir + '/../')
13
14
15
class DockerCleanTest(unittest.TestCase):
16
    def test_no_arg(self):
17
        result = CliRunner().invoke(clean)
18
        self.assertEqual(0, result.exit_code)
19
        regex = r'.*Cleaning Docker stopped containers.*'
20
        self.assertRegex(result.output, regex)
21
        regex = r'.*Cleaning Docker unused images.*'
22
        self.assertRegex(result.output, regex)
23
        regex = r'.*Cleaning Docker unused volumes.*'
24
        self.assertRegex(result.output, regex)
25
        regex = r'.*Cleaning Docker unused networks.*'
26
        self.assertRegex(result.output, regex)
27
28
    def test_bad_arg(self):
29
        result = CliRunner().invoke(clean, ['hello-world'])
30
        self.assertEqual(2, result.exit_code)
31
        self.assertRegex(result.output, r'Usage: docker-clean \[OPTIONS\].*')
32
33
    def test_full_clean(self):
34
        # Remove data that could create conflicts
35
        # Stop all containers
36
        clean_cts()
37
        # Remove all networks
38
        get_docker_client().networks.prune()
39
        # Remove all volumes
40
        get_docker_client().volumes.prune()
41
        # Remove all images
42
        clean_images()
43
44
        # Standard info
45
        num_default_nets = len(get_docker_client().networks.list())
46
        num_default_cts = len(get_docker_client().containers.list())
47
        num_default_images = len(get_docker_client().images.list())
48
        num_default_vols = len(get_docker_client().volumes.list())
49
50
        # Start specific networks
51
        get_docker_client().networks.create('test_delete')
52
        net_pytest = get_docker_client().networks.create('network_pytest')
53
        nets = get_docker_client().networks.list()
54
        # 5 because by default I have already 3
55
        self.assertIs(len(nets), (2 + num_default_nets))
56
57
        # We should have volumes also stored
58
        get_docker_client().volumes.create('hello')
59
        self.assertIs(len(get_docker_client().volumes.list()), (1 + num_default_vols))
60
61
        # Create 2 ct that'll be off but present
62
        # don't remove the first
63
        get_docker_client().containers.run('alpine:latest', name='hello_world_test')
64
        ct_test = get_docker_client().containers.run('edyan/adminer:latest',
65
                                                     remove=False, detach=True, name='ct_test')
66
67
        net_pytest.connect(ct_test)
68
69
        # Make sure we have two new image : hello-world + adminer
70
        num_images = len(get_docker_client().images.list())
71
        self.assertIs(num_images, (2 + num_default_images))
72
73
        # Make sure we have two new containers
74
        cts = get_docker_client().containers.list(all=True)
75
        self.assertIs(len(cts), (2 + num_default_cts))
76
77
        # CLEAN
78
        result = CliRunner().invoke(clean, ['--force', '--verbose'])
79
        self.assertEqual(0, result.exit_code)
80
        regex = r'.*Cleaning Docker stopped containers.*'
81
        self.assertRegex(result.output, regex)
82
        regex = r'.*Removed 1 exited container\(s\), saved 0 bytes.*'
83
        self.assertRegex(result.output, regex)
84
        regex = r'.*Cleaning Docker unused images.*'
85
        self.assertRegex(result.output, regex)
86
        regex = r'.*Removed [0-9]+ images\(s\).*'
87
        self.assertRegex(result.output, regex)
88
        regex = r'.*Cleaning Docker unused volumes.*'
89
        self.assertRegex(result.output, regex)
90
        regex = r'.*Removed 1 volume\(s\), saved 0 bytes.*'
91
        self.assertRegex(result.output, regex)
92
        regex = r'.*Cleaning Docker unused networks.*'
93
        self.assertRegex(result.output, regex)
94
        regex = r'.*Removed 1 network\(s\).*'
95
        self.assertRegex(result.output, regex)
96
97
        # Make sure it has been cleaned
98
        # Except ct_test that is running so : 1 image, 1 network, 1 container
99
        self.assertIs(len(get_docker_client().networks.list()), num_default_nets + 1)
100
        self.assertIs(len(get_docker_client().volumes.list()), num_default_vols)
101
        debug = 'Total Images: {} / Images by default: {}'.format(len(get_docker_client().images.list()), num_default_images)
102
        self.assertIs(len(get_docker_client().images.list()), num_default_images + 1)
103
        self.assertIs(len(get_docker_client().networks.list()), num_default_nets + 1)
104
105
        ct_test.stop()
106
107
        # Stop adminer and clean again
108
        result = CliRunner().invoke(clean, ['--force', '--verbose'])
109
        self.assertEqual(0, result.exit_code, 'Error: {}'.format(result.output))
110
        regex = r'.*Cleaning Docker stopped containers.*'
111
        self.assertRegex(result.output, regex)
112
        regex = r'.*Removed 1 exited container\(s\), saved 0 bytes.*'
113
        self.assertRegex(result.output, regex)
114
        regex = r'.*Cleaning Docker unused images.*'
115
        self.assertRegex(result.output, regex)
116
        regex = r'.*Removed [0-9]+ images\(s\).*'
117
        self.assertRegex(result.output, regex)
118
        regex = r'.*Cleaning Docker unused volumes.*'
119
        self.assertRegex(result.output, regex)
120
        regex = r'.*No volume to remove*'
121
        self.assertRegex(result.output, regex)
122
        regex = r'.*Cleaning Docker unused networks.*'
123
        self.assertRegex(result.output, regex)
124
        regex = r'.*Removed 1 network\(s\).*'
125
        self.assertRegex(result.output, regex)
126
127
        # Make sure it has been cleaned
128
        # Except ct_test so : 1 image, 1 network, 1 container
129
        self.assertIs(len(get_docker_client().networks.list()), num_default_nets)
130
        self.assertIs(len(get_docker_client().volumes.list()), num_default_vols)
131
        self.assertIs(len(get_docker_client().images.list()), num_default_images)
132
        self.assertIs(len(get_docker_client().networks.list()), num_default_nets)
133
134
135
def clean_cts():
136
    cts = get_docker_client().containers.list(all=True)
137
    for ct in cts:
138
        try:
139
            ct.stop()
140
            ct.remove(v=True, force=True)
141
        except NotFound:
142
            pass
143
144
145
def clean_images():
146
    images = get_docker_client().images.list()
147
    for image in images:
148
        try:
149
            get_docker_client().images.remove(image.id)
150
        except NotFound:
151
            pass
152
153
154
if __name__ == "__main__":
155
    unittest.main()
156