Test Failed
Push — develop ( d9d450...bdd66c )
by Nicolas
02:39
created

DockerExtension.generate_stats()   D

Complexity

Conditions 12

Size

Total Lines 76
Code Lines 56

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 12
eloc 56
nop 2
dl 0
loc 76
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like glances.plugins.containers.engines.docker.DockerExtension.generate_stats() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""Docker Extension unit for Glances' Containers plugin."""
10
11
import time
12
from typing import Any, Optional
13
14
from glances.globals import nativestr, pretty_date, replace_special_chars
15
from glances.logger import logger
16
from glances.stats_streamer import ThreadedIterableStreamer
17
18
# Docker-py library (optional and Linux-only)
19
# https://github.com/docker/docker-py
20
try:
21
    import docker
22
    import requests
23
    from dateutil import parser, tz
24
except Exception as e:
25
    disable_plugin_docker = True
26
    # Display debug message if import KeyError
27
    logger.warning(f"Error loading Docker deps Lib. Docker plugin is disabled ({e})")
28
else:
29
    disable_plugin_docker = False
30
31
32
class DockerStatsFetcher:
33
    MANDATORY_MEMORY_FIELDS = ['usage', 'limit']
34
35
    def __init__(self, container):
36
        self._container = container
37
38
        # Previous computes stats are stored in the self._old_computed_stats variable
39
        # We store time data to enable IoR/s & IoW/s calculations to avoid complexity for consumers of the APIs exposed.
40
        self._old_computed_stats = {}
41
42
        # Last time when output stats (results) were computed
43
        self._last_stats_computed_time = 0
44
45
        # Threaded Streamer
46
        stats_iterable = container.stats(decode=True)
47
        self._streamer = ThreadedIterableStreamer(stats_iterable, initial_stream_value={})
48
49
    def _log_debug(self, msg, exception=None):
50
        logger.debug(f"containers (Docker) ID: {self._container.id} - {msg} ({exception}) ")
51
        logger.debug(self._streamer.stats)
52
53
    def stop(self):
54
        self._streamer.stop()
55
56
    @property
57
    def activity_stats(self) -> dict[str, dict[str, Any]]:
58
        """Activity Stats
59
60
        Each successive access of activity_stats will cause computation of activity_stats
61
        """
62
        computed_activity_stats = self._compute_activity_stats()
63
        self._old_computed_stats = computed_activity_stats
64
        self._last_stats_computed_time = time.time()
65
        return computed_activity_stats
66
67
    def _compute_activity_stats(self) -> dict[str, dict[str, Any]]:
68
        with self._streamer.result_lock:
69
            io_stats = self._get_io_stats()
70
            cpu_stats = self._get_cpu_stats()
71
            memory_stats = self._get_memory_stats()
72
            network_stats = self._get_network_stats()
73
74
        return {
75
            "io": io_stats or {},
76
            "memory": memory_stats or {},
77
            "network": network_stats or {},
78
            "cpu": cpu_stats or {"total": 0.0},
79
        }
80
81
    @property
82
    def time_since_update(self) -> float:
83
        # In case no update, default to 1
84
        return max(1, self._streamer.last_update_time - self._last_stats_computed_time)
85
86
    def _get_cpu_stats(self) -> Optional[dict[str, float]]:
87
        """Return the container CPU usage.
88
89
        Output: a dict {'total': 1.49}
90
        """
91
        stats = {'total': 0.0}
92
93
        try:
94
            cpu_stats = self._streamer.stats['cpu_stats']
95
            precpu_stats = self._streamer.stats['precpu_stats']
96
            cpu = {'system': cpu_stats['system_cpu_usage'], 'total': cpu_stats['cpu_usage']['total_usage']}
97
            precpu = {'system': precpu_stats['system_cpu_usage'], 'total': precpu_stats['cpu_usage']['total_usage']}
98
99
            # Issue #1857
100
            # If either precpu_stats.online_cpus or cpu_stats.online_cpus is nil
101
            # then for compatibility with older daemons the length of
102
            # the corresponding cpu_usage.percpu_usage array should be used.
103
            cpu['nb_core'] = cpu_stats.get('online_cpus') or len(cpu_stats['cpu_usage']['percpu_usage'] or [])
104
        except KeyError as e:
105
            self._log_debug("Can't grab CPU stats", e)
106
            return None
107
108
        try:
109
            cpu_delta = cpu['total'] - precpu['total']
110
            system_cpu_delta = cpu['system'] - precpu['system']
111
            # CPU usage % = (cpu_delta / system_cpu_delta) * number_cpus * 100.0
112
            stats['total'] = (cpu_delta / system_cpu_delta) * cpu['nb_core'] * 100.0
113
        except TypeError as e:
114
            self._log_debug("Can't compute CPU usage", e)
115
            return None
116
117
        # Return the stats
118
        return stats
119
120
    def _get_memory_stats(self) -> Optional[dict[str, float]]:
121
        """Return the container MEMORY.
122
123
        Output: a dict {'usage': ..., 'limit': ..., 'inactive_file': ...}
124
125
        Note:the displayed memory usage is 'usage - inactive_file'
126
        """
127
        memory_stats = self._streamer.stats.get('memory_stats')
128
129
        # Checks for memory_stats & mandatory fields
130
        if not memory_stats or any(field not in memory_stats for field in self.MANDATORY_MEMORY_FIELDS):
131
            self._log_debug("Missing MEM usage fields")
132
            return None
133
134
        stats = {field: memory_stats[field] for field in self.MANDATORY_MEMORY_FIELDS}
135
136
        # Optional field stats: inactive_file
137
        if memory_stats.get('stats', {}).get('inactive_file') is not None:
138
            stats['inactive_file'] = memory_stats['stats']['inactive_file']
139
140
        # Return the stats
141
        return stats
142
143
    def _get_network_stats(self) -> Optional[dict[str, float]]:
144
        """Return the container network usage using the Docker API (v1.0 or higher).
145
146
        Output: a dict {'time_since_update': 3000, 'rx': 10, 'tx': 65}.
147
        with:
148
            time_since_update: number of seconds elapsed between the latest grab
149
            rx: Number of bytes received
150
            tx: Number of bytes transmitted
151
        """
152
        eth0_stats = self._streamer.stats.get('networks', {}).get('eth0')
153
154
        # Checks for net_stats & mandatory fields
155
        if not eth0_stats or any(field not in eth0_stats for field in ['rx_bytes', 'tx_bytes']):
156
            self._log_debug("Missing Network usage fields")
157
            return None
158
159
        # Read the rx/tx stats (in bytes)
160
        stats = {'cumulative_rx': eth0_stats["rx_bytes"], 'cumulative_tx': eth0_stats["tx_bytes"]}
161
162
        # Using previous stats to calculate rates
163
        old_network_stats = self._old_computed_stats.get("network")
164
        if old_network_stats:
165
            stats['time_since_update'] = round(self.time_since_update)
166
            stats['rx'] = stats['cumulative_rx'] - old_network_stats["cumulative_rx"]
167
            stats['tx'] = stats['cumulative_tx'] - old_network_stats['cumulative_tx']
168
169
        # Return the stats
170
        return stats
171
172
    def _get_io_stats(self) -> Optional[dict[str, float]]:
173
        """Return the container IO usage using the Docker API (v1.0 or higher).
174
175
        Output: a dict {'time_since_update': 3000, 'ior': 10, 'iow': 65}.
176
        with:
177
            time_since_update: number of seconds elapsed between the latest grab
178
            ior: Number of bytes read
179
            iow: Number of bytes written
180
        """
181
        io_service_bytes_recursive = self._streamer.stats.get('blkio_stats', {}).get('io_service_bytes_recursive')
182
183
        # Checks for net_stats
184
        if not io_service_bytes_recursive:
185
            self._log_debug("Missing blockIO usage fields")
186
            return None
187
188
        # Read the ior/iow stats (in bytes)
189
        try:
190
            # Read IOR and IOW value in the structure list of dict
191
            cumulative_ior = [i for i in io_service_bytes_recursive if i['op'].lower() == 'read'][0]['value']
192
            cumulative_iow = [i for i in io_service_bytes_recursive if i['op'].lower() == 'write'][0]['value']
193
        except (TypeError, IndexError, KeyError, AttributeError) as e:
194
            self._log_debug("Can't grab blockIO usage", e)  # stats do not have io information
195
            return None
196
197
        stats = {'cumulative_ior': cumulative_ior, 'cumulative_iow': cumulative_iow}
198
199
        # Using previous stats to calculate difference
200
        old_io_stats = self._old_computed_stats.get("io")
201
        if old_io_stats:
202
            stats['time_since_update'] = round(self.time_since_update)
203
            stats['ior'] = stats['cumulative_ior'] - old_io_stats["cumulative_ior"]
204
            stats['iow'] = stats['cumulative_iow'] - old_io_stats["cumulative_iow"]
205
206
        # Return the stats
207
        return stats
208
209
210
class DockerExtension:
211
    """Glances' Containers Plugin's Docker Extension unit"""
212
213
    CONTAINER_ACTIVE_STATUS = ['running', 'paused']
214
215
    def __init__(self):
216
        self.disable = disable_plugin_docker
217
        if self.disable:
218
            raise Exception("Missing libs required to run Docker Extension (Containers) ")
219
220
        self.display_error = True
221
222
        self.client = None
223
        self.ext_name = "containers (Docker)"
224
        self.stats_fetchers = {}
225
226
        self.connect()
227
228
    def connect(self) -> None:
229
        """Connect to the Docker server."""
230
        # Init the Docker API Client
231
        try:
232
            # Do not use the timeout option (see issue #1878)
233
            self.client = docker.from_env()
234
        except Exception as e:
235
            logger.error(f"{self.ext_name} plugin - Can't connect to Docker ({e})")
236
            self.client = None
237
238
    def update_version(self):
239
        # Long and not useful anymore because the information is no more displayed in UIs
240
        # return self.client.version()
241
        return {}
242
243
    def stop(self) -> None:
244
        # Stop all streaming threads
245
        for t in self.stats_fetchers.values():
246
            t.stop()
247
248
    def update(self, all_tag) -> tuple[dict, list[dict]]:
249
        """Update Docker stats using the input method."""
250
251
        if not self.client or self.disable:
252
            return {}, []
253
254
        version_stats = self.update_version()
255
256
        # Update current containers list
257
        try:
258
            # Issue #1152: Docker module doesn't export details about stopped containers
259
            # The Containers/all key of the configuration file should be set to True
260
            containers = self.client.containers.list(all=all_tag)
261
            self.display_error = True
262
        except Exception as e:
263
            if self.display_error:
264
                logger.error(f"{self.ext_name} plugin - Can't get containers list ({e})")
265
                self.display_error = False
266
            else:
267
                logger.debug(f"{self.ext_name} plugin - Can't get containers list ({e})")
268
            return version_stats, []
269
270
        # Start new thread for new container
271
        for container in containers:
272
            if container.id not in self.stats_fetchers:
273
                # StatsFetcher did not exist in the internal dict
274
                # Create it, add it to the internal dict
275
                logger.debug(f"{self.ext_name} plugin - Create thread for container {container.id[:12]}")
276
                self.stats_fetchers[container.id] = DockerStatsFetcher(container)
277
278
        # Stop threads for non-existing containers
279
        absent_containers = set(self.stats_fetchers.keys()) - {c.id for c in containers}
280
        for container_id in absent_containers:
281
            # Stop the StatsFetcher
282
            logger.debug(f"{self.ext_name} plugin - Stop thread for old container {container_id[:12]}")
283
            self.stats_fetchers[container_id].stop()
284
            # Delete the StatsFetcher from the dict
285
            del self.stats_fetchers[container_id]
286
287
        # Get stats for all containers
288
        container_stats = [self.generate_stats(container) for container in containers]
289
        return version_stats, container_stats
290
291
    @property
292
    def key(self) -> str:
293
        """Return the key of the list."""
294
        return 'name'
295
296
    def generate_stats(self, container) -> dict[str, Any]:
297
        # Init the stats for the current container
298
        stats = {
299
            'key': self.key,
300
            'name': nativestr(container.name),
301
            'id': container.id,
302
            'status': container.attrs['State']['Status'],
303
            'created': container.attrs['Created'],
304
            'command': [],
305
            'io': {},
306
            'cpu': {},
307
            'memory': {},
308
            'network': {},
309
            'io_rx': None,
310
            'io_wx': None,
311
            'cpu_percent': None,
312
            'memory_percent': None,
313
            'network_rx': None,
314
            'network_tx': None,
315
            'ports': '',
316
            'uptime': None,
317
        }
318
319
        # Container Image
320
        try:
321
            # API fails on Unraid - See issue 2233
322
            stats['image'] = (','.join(container.image.tags if container.image.tags else []),)
323
        except requests.exceptions.HTTPError:
324
            stats['image'] = ''
325
326
        if container.attrs['Config'].get('Entrypoint', None):
327
            stats['command'].extend(container.attrs['Config'].get('Entrypoint', []))
328
        if container.attrs['Config'].get('Cmd', None):
329
            stats['command'].extend(container.attrs['Config'].get('Cmd', []))
330
        if not stats['command']:
331
            stats['command'] = None
332
333
        if stats['status'] not in self.CONTAINER_ACTIVE_STATUS:
334
            return stats
335
336
        stats_fetcher = self.stats_fetchers[container.id]
337
        activity_stats = stats_fetcher.activity_stats
338
        stats.update(activity_stats)
339
340
        # Additional fields
341
        stats['cpu_percent'] = stats['cpu']['total']
342
        stats['memory_usage'] = stats['memory'].get('usage')
343
        if stats['memory'].get('cache') is not None:
344
            stats['memory_usage'] -= stats['memory']['cache']
345
        stats['memory_inactive_file'] = stats['memory'].get('inactive_file')
346
        stats['memory_limit'] = stats['memory'].get('limit')
347
348
        if all(k in stats['io'] for k in ('ior', 'iow', 'time_since_update')):
349
            stats['io_rx'] = stats['io']['ior'] // stats['io']['time_since_update']
350
            stats['io_wx'] = stats['io']['iow'] // stats['io']['time_since_update']
351
352
        if all(k in stats['network'] for k in ('rx', 'tx', 'time_since_update')):
353
            stats['network_rx'] = stats['network']['rx'] // stats['network']['time_since_update']
354
            stats['network_tx'] = stats['network']['tx'] // stats['network']['time_since_update']
355
356
        started_at = container.attrs['State']['StartedAt']
357
        stats['uptime'] = pretty_date(parser.parse(started_at).astimezone(tz.tzlocal()).replace(tzinfo=None))
358
359
        # Manage special chars in command (see issue#2733)
360
        stats['command'] = replace_special_chars(' '.join(stats['command']))
361
362
        # Manage ports (see issue#2054)
363
        if hasattr(container, 'ports'):
364
            stats['ports'] = ','.join(
365
                [
366
                    f'{container.ports[cp][0]["HostPort"]}->{cp}' if container.ports[cp] else f'{cp}'
367
                    for cp in container.ports
368
                ]
369
            )
370
371
        return stats
372