Passed
Push — master ( 61e6a6...28b40d )
by Emmanuel
07:42
created

stakkr.docker_actions   C

Complexity

Total Complexity 53

Size/Duplication

Total Lines 278
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
eloc 172
dl 0
loc 278
ccs 0
cts 158
cp 0
rs 6.96
c 0
b 0
f 0
wmc 53

22 Functions

Rating   Name   Duplication   Size   Complexity  
A _get_ip_from_networks() 0 7 3
A get_ct_name() 0 7 2
A get_network_name() 0 9 2
A guess_shell() 0 14 3
A get_api_client() 0 11 4
A network_exists() 0 7 2
A get_running_containers() 0 22 3
A _extract_container_info() 0 19 2
A add_container_to_network() 0 9 2
A get_running_containers_names() 0 5 1
A get_switch_ip() 0 15 2
A get_subnet() 0 6 1
A _container_in_network() 0 12 4
A _allow_contact_subnet() 0 14 2
A get_client() 0 7 2
A get_ct_item() 0 10 4
A check_cts_are_running() 0 5 2
A block_ct_ports() 0 25 4
A create_network() 0 6 2
A _get_traefik_host() 0 7 2
A _extract_host_ports() 0 6 2
A container_running() 0 6 2

How to fix   Complexity   

Complexity

Complex classes like stakkr.docker_actions often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# coding: utf-8
2
"""Docker functions to get info about containers."""
3
4
from docker.errors import NotFound, NullResource
5
__st__ = {'cts_info': dict(), 'running_cts': 0}
6
7
8
def add_container_to_network(container: str, network: str):
9
    """Attach a container to a network."""
10
    if _container_in_network(container, network) is True:
11
        return False
12
13
    docker_network = get_client().networks.get(network)
14
    docker_network.connect(container)
15
16
    return True
17
18
19
def block_ct_ports(service: str, ports: list, project_name: str) -> tuple:
20
    """Run iptables commands to block a list of port on a specific container."""
21
    try:
22
        container = get_client().containers.get(get_ct_item(service, 'id'))
23
    except (LookupError, NullResource):
24
        return False, '{} is not started, no port to block'.format(service)
25
26
    _, iptables = container.exec_run(['which', 'iptables'])
27
    iptables = iptables.decode().strip()
28
    if iptables == '':
29
        return True, "Can't block ports on {}, is iptables installed ?".format(service)
30
31
    _allow_contact_subnet(project_name, container)
32
33
    # Now for each port, add an iptable rule
34
    for port in ports:
35
        rule = ['OUTPUT', '-p', 'tcp', '--dport', str(port), '-j', 'REJECT']
36
        try:
37
            container.exec_run([iptables, '-D'] + rule)
38
        finally:
39
            container.exec_run([iptables, '-A'] + rule)
40
41
    ports_list = ', '.join(map(str, ports))
42
43
    return False, 'Blocked ports {} on container {}'.format(ports_list, service)
44
45
46
def check_cts_are_running(project_name: str):
47
    """Throw an error if cts are not running."""
48
    get_running_containers(project_name)
49
    if not __st__['running_cts']:
50
        raise SystemError('Have you started stakkr with the start action ?')
51
52
53
def container_running(container: str):
54
    """Return True if the container is running else False."""
55
    try:
56
        return get_api_client().inspect_container(container)['State']['Running']
57
    except (NotFound, NullResource):
58
        return False
59
60
61
def create_network(network: str):
62
    """Create a Network."""
63
    if network_exists(network):
64
        return False
65
66
    return get_client().networks.create(network, driver='bridge').id
67
68
69
def get_api_client():
70
    """Return the API client or initialize it."""
71
    if 'api_client' not in __st__:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable __st__ does not seem to be defined.
Loading history...
72
        from docker import APIClient, utils
73
        params = utils.kwargs_from_env()
74
        base_url = None if 'base_url' not in params else params['base_url']
75
        tls = None if 'tls' not in params else params['tls']
76
77
        __st__['api_client'] = APIClient(base_url=base_url, tls=tls)
78
79
    return __st__['api_client']
80
81
82
def get_client():
83
    """Return the client or initialize it."""
84
    if 'client' not in __st__:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable __st__ does not seem to be defined.
Loading history...
85
        from docker import client
86
        __st__['client'] = client.from_env()
87
88
    return __st__['client']
89
90
91
def get_ct_item(compose_name: str, item_name: str):
92
    """Get a value from a container, such as name or IP."""
93
    if 'cts_info' not in __st__:
94
        raise LookupError('Before getting an info from a ct, run check_cts_are_running()')
95
96
    for _, ct_data in __st__['cts_info'].items():
97
        if ct_data['compose_name'] == compose_name:
98
            return ct_data[item_name]
99
100
    return ''
101
102
103
def get_ct_name(container: str):
104
    """Return the system name of a container, generated by docker-compose."""
105
    ct_name = get_ct_item(container, 'name')
106
    if ct_name == '':
107
        raise LookupError('{} does not seem to be started ...'.format(container))
108
109
    return ct_name
110
111
112
def get_network_name(project_name: str):
113
    """Find the full network name."""
114
    try:
115
        guessed_network_name = '{}_stakkr'.format(project_name).lower()
116
        network = get_client().networks.get(guessed_network_name)
117
    except NotFound:
118
        raise RuntimeError("Couldn't identify network (check your project name)")
119
120
    return network.name
121
122
123
def get_subnet(project_name: str):
124
    """Find the subnet of the current project."""
125
    network_name = get_network_name(project_name)
126
    network_info = get_client().networks.get(network_name).attrs
127
128
    return network_info['IPAM']['Config'][0]['Subnet'].split('/')[0]
129
130
131
def get_switch_ip():
132
    """Find the main docker daemon IP to add routes."""
133
    import socket
134
135
    cmd = r"""/bin/sh -c "ip addr show hvint0 | grep -oE '([0-9]{1,3}\.){3}[0-9]{1,3}'" """
136
    res = get_client().containers.run(
137
        'alpine', remove=True, tty=True, privileged=True,
138
        network_mode='host', pid_mode='host', command=cmd)
139
    ip_addr = res.strip().decode()
140
141
    try:
142
        socket.inet_aton(ip_addr)
143
        return ip_addr
144
    except socket.error:
145
        raise ValueError('{} is not a valid ip, check docker is running')
146
147
148
def get_running_containers(project_name: str) -> tuple:
149
    """Get the number of running containers and theirs details for the current stakkr instance."""
150
    from requests import exceptions
151
152
    filters = {
153
        'name': '{}_'.format(project_name),
154
        'status': 'running',
155
        'network': '{}_stakkr'.format(project_name).lower()}
156
157
    try:
158
        cts = get_client().containers.list(filters=filters)
159
    except exceptions.ConnectionError:
160
        raise exceptions.ConnectionError('Make sure docker is installed and running')
161
162
    __st__['cts_info'] = dict()
163
    for container in cts:
164
        container_info = _extract_container_info(project_name, container.id)
165
        __st__['cts_info'][container_info['name']] = container_info
166
167
    __st__['running_cts'] = len(cts)
168
169
    return __st__['running_cts'], __st__['cts_info']
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable __st__ does not seem to be defined.
Loading history...
170
171
172
def get_running_containers_names(project_name: str) -> list:
173
    """Get a list of compose names of running containers for the current stakkr instance."""
174
    cts = get_running_containers(project_name)[1]
175
176
    return sorted([ct_data['compose_name'] for docker_name, ct_data in cts.items()])
177
178
179
def guess_shell(container: str) -> str:
180
    """By searching for binaries, guess what could be the primary shell available."""
181
    container = get_client().containers.get(container)
182
183
    cmd = 'which -a bash sh'
184
    _, shells = container.exec_run(cmd, stdout=True, stderr=False)
185
    shells = shells.splitlines()
186
    if b'/bin/bash' in shells:
187
        return '/bin/bash'
188
189
    if b'/bin/sh' in shells:
190
        return '/bin/sh'
191
192
    raise EnvironmentError('Could not find a shell for that container')
193
194
195
def network_exists(network: str):
196
    """Return True if a network exists in docker, else False."""
197
    try:
198
        get_client().networks.get(network)
199
        return True
200
    except NotFound:
201
        return False
202
203
204
def _allow_contact_subnet(project_name: str, container: str) -> bool:
205
    _, iptables = container.exec_run(['which', 'iptables'])
206
    iptables = iptables.decode().strip()
207
    if iptables == '':
208
        return False
209
210
    subnet = get_subnet(project_name) + '/24'
211
    # Allow internal network
212
    try:
213
        container.exec_run([iptables, '-D', 'OUTPUT', '-d', subnet, '-j', 'ACCEPT'])
214
    finally:
215
        container.exec_run([iptables, '-A', 'OUTPUT', '-d', subnet, '-j', 'ACCEPT'])
216
217
    return True
218
219
def _extract_container_info(project_name: str, ct_id: str):
220
    """Get a hash of info about a container : name, ports, image, ip ..."""
221
    try:
222
        ct_data = get_api_client().inspect_container(ct_id)
223
    except NotFound:
224
        return None
225
226
    cts_info = {
227
        'id': ct_id,
228
        'name': ct_data['Name'].lstrip('/'),
229
        'compose_name': ct_data['Config']['Labels']['com.docker.compose.service'],
230
        'ports': _extract_host_ports(ct_data),
231
        'image': ct_data['Config']['Image'],
232
        'traefik_host': _get_traefik_host(ct_data['Config']['Labels']),
233
        'ip': _get_ip_from_networks(project_name, ct_data['NetworkSettings']['Networks']),
234
        'running': ct_data['State']['Running']
235
        }
236
237
    return cts_info
238
239
240
def _extract_host_ports(config: list):
241
    ports = []
242
    for _, host_ports in config['HostConfig']['PortBindings'].items():
243
        ports += [host_port['HostPort'] for host_port in host_ports]
244
245
    return ports
246
247
248
def _get_ip_from_networks(project_name: str, networks: list):
249
    """Get the ip of a network."""
250
    network_settings = {}
251
    if '{}_stakkr'.format(project_name) in networks:
252
        network_settings = networks['{}_stakkr'.format(project_name)]
253
254
    return network_settings['IPAddress'] if 'IPAddress' in network_settings else ''
255
256
257
def _container_in_network(container: str, expected_network: str):
258
    """Return True if a container is in a network else false. Used by add_container_to_network."""
259
    try:
260
        ct_data = get_api_client().inspect_container(container)
261
    except NotFound:
262
        raise LookupError('Container {} does not seem to exist'.format(container))
263
264
    for connected_network in ct_data['NetworkSettings']['Networks'].keys():
265
        if connected_network == expected_network:
266
            return True
267
268
    return False
269
270
271
def _get_traefik_host(labels: list):
272
    if 'traefik.frontend.rule' not in labels:
273
        return 'No traefik rule'
274
275
    rules = labels['traefik.frontend.rule'].split(':')
276
277
    return rules[1]
278