stakkr.docker_actions   C
last analyzed

Complexity

Total Complexity 55

Size/Duplication

Total Lines 286
Duplicated Lines 0 %

Test Coverage

Coverage 87.27%

Importance

Changes 0
Metric Value
eloc 179
dl 0
loc 286
ccs 144
cts 165
cp 0.8727
rs 6
c 0
b 0
f 0
wmc 55

22 Functions

Rating   Name   Duplication   Size   Complexity  
A _get_ip_from_networks() 0 7 3
A get_ct_name() 0 7 2
A get_network_name() 0 9 2
A guess_shell() 0 14 3
A get_api_client() 0 11 4
A network_exists() 0 7 2
A get_running_containers() 0 22 3
A _extract_container_info() 0 19 2
A add_container_to_network() 0 9 2
A get_running_containers_names() 0 5 1
A get_switch_ip() 0 15 2
A get_subnet() 0 6 1
A _container_in_network() 0 12 4
A _allow_contact_subnet() 0 14 2
A get_client() 0 7 2
A get_ct_item() 0 10 4
A check_cts_are_running() 0 5 2
A block_ct_ports() 0 25 4
A create_network() 0 6 2
A _extract_host_ports() 0 6 2
A container_running() 0 6 2
A _get_traefik_host() 0 14 4

How to fix   Complexity   

Complexity

Complex classes like stakkr.docker_actions often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# coding: utf-8
2 1
"""Docker functions to get info about containers."""
3
4 1
import re
5 1
from docker.errors import NotFound, NullResource
6 1
__st__ = {'cts_info': dict(), 'running_cts': 0}
7
8
9 1
def add_container_to_network(container: str, network: str):
10
    """Attach a container to a network."""
11 1
    if _container_in_network(container, network) is True:
12 1
        return False
13
14 1
    docker_network = get_client().networks.get(network)
15 1
    docker_network.connect(container)
16
17 1
    return True
18
19
20 1
def block_ct_ports(service: str, ports: list, project_name: str) -> tuple:
21
    """Run iptables commands to block a list of port on a specific container."""
22 1
    try:
23 1
        container = get_client().containers.get(get_ct_item(service, 'id'))
24
    except (LookupError, NullResource):
25
        return False, '{} is not started, no port to block'.format(service)
26
27 1
    _, iptables = container.exec_run(['which', 'iptables'])
28 1
    iptables = iptables.decode().strip()
29 1
    if iptables == '':
30
        return True, "Can't block ports on {}, is iptables installed ?".format(service)
31
32 1
    _allow_contact_subnet(project_name, container)
33
34
    # Now for each port, add an iptable rule
35 1
    for port in ports:
36 1
        rule = ['OUTPUT', '-p', 'tcp', '--dport', str(port), '-j', 'REJECT']
37 1
        try:
38 1
            container.exec_run([iptables, '-D'] + rule)
39
        finally:
40 1
            container.exec_run([iptables, '-A'] + rule)
41
42 1
    ports_list = ', '.join(map(str, ports))
43
44 1
    return False, 'Blocked ports {} on container {}'.format(ports_list, service)
45
46
47 1
def check_cts_are_running(project_name: str):
48
    """Throw an error if cts are not running."""
49 1
    get_running_containers(project_name)
50 1
    if not __st__['running_cts']:
51 1
        raise SystemError('Have you started stakkr with the start action ?')
52
53
54 1
def container_running(container: str):
55
    """Return True if the container is running else False."""
56 1
    try:
57 1
        return get_api_client().inspect_container(container)['State']['Running']
58 1
    except (NotFound, NullResource):
59 1
        return False
60
61
62 1
def create_network(network: str):
63
    """Create a Network."""
64 1
    if network_exists(network):
65 1
        return False
66
67 1
    return get_client().networks.create(network, driver='bridge').id
68
69
70 1
def get_api_client():
71
    """Return the API client or initialize it."""
72 1
    if 'api_client' not in __st__:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable __st__ does not seem to be defined.
Loading history...
73 1
        from docker import APIClient, utils
74 1
        params = utils.kwargs_from_env()
75 1
        base_url = None if 'base_url' not in params else params['base_url']
76 1
        tls = None if 'tls' not in params else params['tls']
77
78 1
        __st__['api_client'] = APIClient(base_url=base_url, tls=tls)
79
80 1
    return __st__['api_client']
81
82
83 1
def get_client():
84
    """Return the client or initialize it."""
85 1
    if 'client' not in __st__:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable __st__ does not seem to be defined.
Loading history...
86 1
        from docker import client
87 1
        __st__['client'] = client.from_env()
88
89 1
    return __st__['client']
90
91
92 1
def get_ct_item(compose_name: str, item_name: str):
93
    """Get a value from a container, such as name or IP."""
94 1
    if 'cts_info' not in __st__:
95
        raise LookupError('Before getting an info from a ct, run check_cts_are_running()')
96
97 1
    for _, ct_data in __st__['cts_info'].items():
98 1
        if ct_data['compose_name'] == compose_name:
99 1
            return ct_data[item_name]
100
101
    return ''
102
103
104 1
def get_ct_name(container: str):
105
    """Return the system name of a container, generated by docker-compose."""
106 1
    ct_name = get_ct_item(container, 'name')
107 1
    if ct_name == '':
108
        raise LookupError('{} does not seem to be started ...'.format(container))
109
110 1
    return ct_name
111
112
113 1
def get_network_name(project_name: str):
114
    """Find the full network name."""
115 1
    try:
116 1
        guessed_network_name = '{}_stakkr'.format(project_name).lower()
117 1
        network = get_client().networks.get(guessed_network_name)
118
    except NotFound:
119
        raise RuntimeError("Couldn't identify network (check your project name)")
120
121 1
    return network.name
122
123
124 1
def get_subnet(project_name: str):
125
    """Find the subnet of the current project."""
126 1
    network_name = get_network_name(project_name)
127 1
    network_info = get_client().networks.get(network_name).attrs
128
129 1
    return network_info['IPAM']['Config'][0]['Subnet'].split('/')[0]
130
131
132 1
def get_switch_ip():
133
    """Find the main docker daemon IP to add routes."""
134
    import socket
135
136
    cmd = r"""/bin/sh -c "ip addr show hvint0 | grep -oE '([0-9]{1,3}\.){3}[0-9]{1,3}'" """
137
    res = get_client().containers.run(
138
        'alpine', remove=True, tty=True, privileged=True,
139
        network_mode='host', pid_mode='host', command=cmd)
140
    ip_addr = res.strip().decode()
141
142
    try:
143
        socket.inet_aton(ip_addr)
144
        return ip_addr
145
    except socket.error:
146
        raise ValueError('{} is not a valid ip, check docker is running')
147
148
149 1
def get_running_containers(project_name: str) -> tuple:
150
    """Get the number of running containers and theirs details for the current stakkr instance."""
151 1
    from requests import exceptions
152
153 1
    filters = {
154
        'name': '{}_'.format(project_name),
155
        'status': 'running',
156
        'network': '{}_stakkr'.format(project_name).lower()}
157
158 1
    try:
159 1
        cts = get_client().containers.list(filters=filters)
160
    except exceptions.ConnectionError:
161
        raise exceptions.ConnectionError('Make sure docker is installed and running')
162
163 1
    __st__['cts_info'] = dict()
164 1
    for container in cts:
165 1
        container_info = _extract_container_info(project_name, container.id)
166 1
        __st__['cts_info'][container_info['name']] = container_info
167
168 1
    __st__['running_cts'] = len(cts)
169
170 1
    return __st__['running_cts'], __st__['cts_info']
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable __st__ does not seem to be defined.
Loading history...
171
172
173 1
def get_running_containers_names(project_name: str) -> list:
174
    """Get a list of compose names of running containers for the current stakkr instance."""
175 1
    cts = get_running_containers(project_name)[1]
176
177 1
    return sorted([ct_data['compose_name'] for docker_name, ct_data in cts.items()])
178
179
180 1
def guess_shell(container: str) -> str:
181
    """By searching for binaries, guess what could be the primary shell available."""
182 1
    container = get_client().containers.get(container)
183
184 1
    cmd = 'which -a bash sh'
185 1
    _, shells = container.exec_run(cmd, stdout=True, stderr=False)
186 1
    shells = shells.splitlines()
187 1
    if b'/bin/bash' in shells:
188 1
        return '/bin/bash'
189
190 1
    if b'/bin/sh' in shells:
191 1
        return '/bin/sh'
192
193 1
    raise EnvironmentError('Could not find a shell for that container')
194
195
196 1
def network_exists(network: str):
197
    """Return True if a network exists in docker, else False."""
198 1
    try:
199 1
        get_client().networks.get(network)
200 1
        return True
201 1
    except NotFound:
202 1
        return False
203
204
205 1
def _allow_contact_subnet(project_name: str, container: str) -> bool:
206 1
    _, iptables = container.exec_run(['which', 'iptables'])
207 1
    iptables = iptables.decode().strip()
208 1
    if iptables == '':
209
        return False
210
211 1
    subnet = get_subnet(project_name) + '/24'
212
    # Allow internal network
213 1
    try:
214 1
        container.exec_run([iptables, '-D', 'OUTPUT', '-d', subnet, '-j', 'ACCEPT'])
215
    finally:
216 1
        container.exec_run([iptables, '-A', 'OUTPUT', '-d', subnet, '-j', 'ACCEPT'])
217
218 1
    return True
219
220 1
def _extract_container_info(project_name: str, ct_id: str):
221
    """Get a hash of info about a container : name, ports, image, ip ..."""
222 1
    try:
223 1
        ct_data = get_api_client().inspect_container(ct_id)
224 1
    except NotFound:
225 1
        return None
226
227 1
    cts_info = {
228
        'id': ct_id,
229
        'name': ct_data['Name'].lstrip('/'),
230
        'compose_name': ct_data['Config']['Labels']['com.docker.compose.service'],
231
        'ports': _extract_host_ports(ct_data),
232
        'image': ct_data['Config']['Image'],
233
        'traefik_host': _get_traefik_host(ct_data['Config']['Labels']),
234
        'ip': _get_ip_from_networks(project_name, ct_data['NetworkSettings']['Networks']),
235
        'running': ct_data['State']['Running']
236
        }
237
238 1
    return cts_info
239
240
241 1
def _extract_host_ports(config: list):
242 1
    ports = []
243 1
    for _, host_ports in config['HostConfig']['PortBindings'].items():
244
        ports += [host_port['HostPort'] for host_port in host_ports]
245
246 1
    return ports
247
248
249 1
def _get_ip_from_networks(project_name: str, networks: list):
250
    """Get the ip of a network."""
251 1
    network_settings = {}
252 1
    if '{}_stakkr'.format(project_name) in networks:
253 1
        network_settings = networks['{}_stakkr'.format(project_name)]
254
255 1
    return network_settings['IPAddress'] if 'IPAddress' in network_settings else ''
256
257
258 1
def _container_in_network(container: str, expected_network: str):
259
    """Return True if a container is in a network else false. Used by add_container_to_network."""
260 1
    try:
261 1
        ct_data = get_api_client().inspect_container(container)
262 1
    except NotFound:
263 1
        raise LookupError('Container {} does not seem to exist'.format(container))
264
265 1
    for connected_network in ct_data['NetworkSettings']['Networks'].keys():
266 1
        if connected_network == expected_network:
267 1
            return True
268
269 1
    return False
270
271
272 1
def _get_traefik_host(labels: dict):
273 1
    traefik_label = None
274
    # traefik.http.routers.portainer.rule
275 1
    regexp = re.compile('^traefik.http.routers.(.+).rule$')
276 1
    for label in labels.keys():
277 1
        matches = regexp.match(label)
278 1
        if matches:
279 1
            traefik_label = matches.group()
280 1
            break
281
282 1
    if traefik_label is None:
283 1
        return 'No traefik rule'
284
285
    return labels[label].split('`')[1]
0 ignored issues
show
introduced by
The variable label does not seem to be defined in case the for loop on line 276 is not entered. Are you sure this can never be the case?
Loading history...
286