Test Failed
Push — master ( 1826f0...f8aa98 )
by Nicolas
02:28 queued 15s
created

DockerExtension.generate_stats()   C

Complexity

Conditions 10

Size

Total Lines 64
Code Lines 48

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 48
nop 2
dl 0
loc 64
rs 5.9018
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like glances.plugins.containers.engines.docker.DockerExtension.generate_stats() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""Docker Extension unit for Glances' Containers plugin."""
10
11
import time
12
from typing import Any, Dict, List, Optional, Tuple
13
14
from glances.globals import iterkeys, itervalues, nativestr, pretty_date, replace_special_chars
15
from glances.logger import logger
16
from glances.plugins.containers.stats_streamer import ThreadedIterableStreamer
17
18
# Docker-py library (optional and Linux-only)
19
# https://github.com/docker/docker-py
20
try:
21
    import docker
22
    import requests
23
    from dateutil import parser, tz
24
except Exception as e:
25
    import_docker_error_tag = True
26
    # Display debug message if import KeyError
27
    logger.warning(f"Error loading Docker deps Lib. Docker plugin is disabled ({e})")
28
else:
29
    import_docker_error_tag = False
30
31
32
class DockerStatsFetcher:
33
    MANDATORY_MEMORY_FIELDS = ['usage', 'limit']
34
35
    def __init__(self, container):
36
        self._container = container
37
38
        # Previous computes stats are stored in the self._old_computed_stats variable
39
        # We store time data to enable IoR/s & IoW/s calculations to avoid complexity for consumers of the APIs exposed.
40
        self._old_computed_stats = {}
41
42
        # Last time when output stats (results) were computed
43
        self._last_stats_computed_time = 0
44
45
        # Threaded Streamer
46
        stats_iterable = container.stats(decode=True)
47
        self._streamer = ThreadedIterableStreamer(stats_iterable, initial_stream_value={})
48
49
    def _log_debug(self, msg, exception=None):
50
        logger.debug(f"containers (Docker) ID: {self._container.id} - {msg} ({exception}) ")
51
        logger.debug(self._streamer.stats)
52
53
    def stop(self):
54
        self._streamer.stop()
55
56
    @property
57
    def activity_stats(self) -> Dict[str, Dict[str, Any]]:
58
        """Activity Stats
59
60
        Each successive access of activity_stats will cause computation of activity_stats
61
        """
62
        computed_activity_stats = self._compute_activity_stats()
63
        self._old_computed_stats = computed_activity_stats
64
        self._last_stats_computed_time = time.time()
65
        return computed_activity_stats
66
67
    def _compute_activity_stats(self) -> Dict[str, Dict[str, Any]]:
68
        with self._streamer.result_lock:
69
            io_stats = self._get_io_stats()
70
            cpu_stats = self._get_cpu_stats()
71
            memory_stats = self._get_memory_stats()
72
            network_stats = self._get_network_stats()
73
74
        return {
75
            "io": io_stats or {},
76
            "memory": memory_stats or {},
77
            "network": network_stats or {},
78
            "cpu": cpu_stats or {"total": 0.0},
79
        }
80
81
    @property
82
    def time_since_update(self) -> float:
83
        # In case no update, default to 1
84
        return max(1, self._streamer.last_update_time - self._last_stats_computed_time)
85
86
    def _get_cpu_stats(self) -> Optional[Dict[str, float]]:
87
        """Return the container CPU usage.
88
89
        Output: a dict {'total': 1.49}
90
        """
91
        stats = {'total': 0.0}
92
93
        try:
94
            cpu_stats = self._streamer.stats['cpu_stats']
95
            precpu_stats = self._streamer.stats['precpu_stats']
96
            cpu = {'system': cpu_stats['system_cpu_usage'], 'total': cpu_stats['cpu_usage']['total_usage']}
97
            precpu = {'system': precpu_stats['system_cpu_usage'], 'total': precpu_stats['cpu_usage']['total_usage']}
98
99
            # Issue #1857
100
            # If either precpu_stats.online_cpus or cpu_stats.online_cpus is nil
101
            # then for compatibility with older daemons the length of
102
            # the corresponding cpu_usage.percpu_usage array should be used.
103
            cpu['nb_core'] = cpu_stats.get('online_cpus') or len(cpu_stats['cpu_usage']['percpu_usage'] or [])
104
        except KeyError as e:
105
            self._log_debug("Can't grab CPU stats", e)
106
            return None
107
108
        try:
109
            cpu_delta = cpu['total'] - precpu['total']
110
            system_cpu_delta = cpu['system'] - precpu['system']
111
            # CPU usage % = (cpu_delta / system_cpu_delta) * number_cpus * 100.0
112
            stats['total'] = (cpu_delta / system_cpu_delta) * cpu['nb_core'] * 100.0
113
        except TypeError as e:
114
            self._log_debug("Can't compute CPU usage", e)
115
            return None
116
117
        # Return the stats
118
        return stats
119
120
    def _get_memory_stats(self) -> Optional[Dict[str, float]]:
121
        """Return the container MEMORY.
122
123
        Output: a dict {'usage': ..., 'limit': ..., 'inactive_file': ...}
124
125
        Note:the displayed memory usage is 'usage - inactive_file'
126
        """
127
        memory_stats = self._streamer.stats.get('memory_stats')
128
129
        # Checks for memory_stats & mandatory fields
130
        if not memory_stats or any(field not in memory_stats for field in self.MANDATORY_MEMORY_FIELDS):
131
            self._log_debug("Missing MEM usage fields")
132
            return None
133
134
        stats = {field: memory_stats[field] for field in self.MANDATORY_MEMORY_FIELDS}
135
136
        # Optional field stats: inactive_file
137
        if memory_stats.get('stats', {}).get('inactive_file') is not None:
138
            stats['inactive_file'] = memory_stats['stats']['inactive_file']
139
140
        # Return the stats
141
        return stats
142
143
    def _get_network_stats(self) -> Optional[Dict[str, float]]:
144
        """Return the container network usage using the Docker API (v1.0 or higher).
145
146
        Output: a dict {'time_since_update': 3000, 'rx': 10, 'tx': 65}.
147
        with:
148
            time_since_update: number of seconds elapsed between the latest grab
149
            rx: Number of bytes received
150
            tx: Number of bytes transmitted
151
        """
152
        eth0_stats = self._streamer.stats.get('networks', {}).get('eth0')
153
154
        # Checks for net_stats & mandatory fields
155
        if not eth0_stats or any(field not in eth0_stats for field in ['rx_bytes', 'tx_bytes']):
156
            self._log_debug("Missing Network usage fields")
157
            return None
158
159
        # Read the rx/tx stats (in bytes)
160
        stats = {'cumulative_rx': eth0_stats["rx_bytes"], 'cumulative_tx': eth0_stats["tx_bytes"]}
161
162
        # Using previous stats to calculate rates
163
        old_network_stats = self._old_computed_stats.get("network")
164
        if old_network_stats:
165
            stats['time_since_update'] = round(self.time_since_update)
166
            stats['rx'] = stats['cumulative_rx'] - old_network_stats["cumulative_rx"]
167
            stats['tx'] = stats['cumulative_tx'] - old_network_stats['cumulative_tx']
168
169
        # Return the stats
170
        return stats
171
172
    def _get_io_stats(self) -> Optional[Dict[str, float]]:
173
        """Return the container IO usage using the Docker API (v1.0 or higher).
174
175
        Output: a dict {'time_since_update': 3000, 'ior': 10, 'iow': 65}.
176
        with:
177
            time_since_update: number of seconds elapsed between the latest grab
178
            ior: Number of bytes read
179
            iow: Number of bytes written
180
        """
181
        io_service_bytes_recursive = self._streamer.stats.get('blkio_stats', {}).get('io_service_bytes_recursive')
182
183
        # Checks for net_stats
184
        if not io_service_bytes_recursive:
185
            self._log_debug("Missing blockIO usage fields")
186
            return None
187
188
        # Read the ior/iow stats (in bytes)
189
        try:
190
            # Read IOR and IOW value in the structure list of dict
191
            cumulative_ior = [i for i in io_service_bytes_recursive if i['op'].lower() == 'read'][0]['value']
192
            cumulative_iow = [i for i in io_service_bytes_recursive if i['op'].lower() == 'write'][0]['value']
193
        except (TypeError, IndexError, KeyError, AttributeError) as e:
194
            self._log_debug("Can't grab blockIO usage", e)  # stats do not have io information
195
            return None
196
197
        stats = {'cumulative_ior': cumulative_ior, 'cumulative_iow': cumulative_iow}
198
199
        # Using previous stats to calculate difference
200
        old_io_stats = self._old_computed_stats.get("io")
201
        if old_io_stats:
202
            stats['time_since_update'] = round(self.time_since_update)
203
            stats['ior'] = stats['cumulative_ior'] - old_io_stats["cumulative_ior"]
204
            stats['iow'] = stats['cumulative_iow'] - old_io_stats["cumulative_iow"]
205
206
        # Return the stats
207
        return stats
208
209
210
class DockerExtension:
211
    """Glances' Containers Plugin's Docker Extension unit"""
212
213
    CONTAINER_ACTIVE_STATUS = ['running', 'paused']
214
215
    def __init__(self):
216
        if import_docker_error_tag:
217
            raise Exception("Missing libs required to run Docker Extension (Containers) ")
218
219
        self.client = None
220
        self.ext_name = "containers (Docker)"
221
        self.stats_fetchers = {}
222
223
        self.connect()
224
225
    def connect(self) -> None:
226
        """Connect to the Docker server."""
227
        # Init the Docker API Client
228
        try:
229
            # Do not use the timeout option (see issue #1878)
230
            self.client = docker.from_env()
231
        except Exception as e:
232
            logger.error(f"{self.ext_name} plugin - Can't connect to Docker ({e})")
233
            self.client = None
234
235
    def update_version(self):
236
        # Long and not useful anymore because the information is no more displayed in UIs
237
        # return self.client.version()
238
        return {}
239
240
    def stop(self) -> None:
241
        # Stop all streaming threads
242
        for t in itervalues(self.stats_fetchers):
243
            t.stop()
244
245
    def update(self, all_tag) -> Tuple[Dict, List[Dict]]:
246
        """Update Docker stats using the input method."""
247
248
        if not self.client:
249
            return {}, []
250
251
        version_stats = self.update_version()
252
253
        # Update current containers list
254
        try:
255
            # Issue #1152: Docker module doesn't export details about stopped containers
256
            # The Containers/all key of the configuration file should be set to True
257
            containers = self.client.containers.list(all=all_tag)
258
        except Exception as e:
259
            logger.error(f"{self.ext_name} plugin - Can't get containers list ({e})")
260
            return version_stats, []
261
262
        # Start new thread for new container
263
        for container in containers:
264
            if container.id not in self.stats_fetchers:
265
                # StatsFetcher did not exist in the internal dict
266
                # Create it, add it to the internal dict
267
                logger.debug(f"{self.ext_name} plugin - Create thread for container {container.id[:12]}")
268
                self.stats_fetchers[container.id] = DockerStatsFetcher(container)
269
270
        # Stop threads for non-existing containers
271
        absent_containers = set(iterkeys(self.stats_fetchers)) - {c.id for c in containers}
272
        for container_id in absent_containers:
273
            # Stop the StatsFetcher
274
            logger.debug(f"{self.ext_name} plugin - Stop thread for old container {container_id[:12]}")
275
            self.stats_fetchers[container_id].stop()
276
            # Delete the StatsFetcher from the dict
277
            del self.stats_fetchers[container_id]
278
279
        # Get stats for all containers
280
        container_stats = [self.generate_stats(container) for container in containers]
281
        return version_stats, container_stats
282
283
    @property
284
    def key(self) -> str:
285
        """Return the key of the list."""
286
        return 'name'
287
288
    def generate_stats(self, container) -> Dict[str, Any]:
289
        # Init the stats for the current container
290
        stats = {
291
            'key': self.key,
292
            'name': nativestr(container.name),
293
            'id': container.id,
294
            'status': container.attrs['State']['Status'],
295
            'created': container.attrs['Created'],
296
            'command': [],
297
            'io': {},
298
            'cpu': {},
299
            'memory': {},
300
            'network': {},
301
            'io_rx': None,
302
            'io_wx': None,
303
            'cpu_percent': None,
304
            'memory_percent': None,
305
            'network_rx': None,
306
            'network_tx': None,
307
            'uptime': None,
308
        }
309
310
        # Container Image
311
        try:
312
            # API fails on Unraid - See issue 2233
313
            stats['image'] = (','.join(container.image.tags if container.image.tags else []),)
314
        except requests.exceptions.HTTPError:
315
            stats['image'] = ''
316
317
        if container.attrs['Config'].get('Entrypoint', None):
318
            stats['command'].extend(container.attrs['Config'].get('Entrypoint', []))
319
        if container.attrs['Config'].get('Cmd', None):
320
            stats['command'].extend(container.attrs['Config'].get('Cmd', []))
321
        if not stats['command']:
322
            stats['command'] = None
323
324
        if stats['status'] not in self.CONTAINER_ACTIVE_STATUS:
325
            return stats
326
327
        stats_fetcher = self.stats_fetchers[container.id]
328
        activity_stats = stats_fetcher.activity_stats
329
        stats.update(activity_stats)
330
331
        # Additional fields
332
        stats['cpu_percent'] = stats['cpu']['total']
333
        stats['memory_usage'] = stats['memory'].get('usage')
334
        if stats['memory'].get('cache') is not None:
335
            stats['memory_usage'] -= stats['memory']['cache']
336
337
        if all(k in stats['io'] for k in ('ior', 'iow', 'time_since_update')):
338
            stats['io_rx'] = stats['io']['ior'] // stats['io']['time_since_update']
339
            stats['io_wx'] = stats['io']['iow'] // stats['io']['time_since_update']
340
341
        if all(k in stats['network'] for k in ('rx', 'tx', 'time_since_update')):
342
            stats['network_rx'] = stats['network']['rx'] // stats['network']['time_since_update']
343
            stats['network_tx'] = stats['network']['tx'] // stats['network']['time_since_update']
344
345
        started_at = container.attrs['State']['StartedAt']
346
        stats['uptime'] = pretty_date(parser.parse(started_at).astimezone(tz.tzlocal()).replace(tzinfo=None))
347
348
        # Manage special chars in command (see issue#2733)
349
        stats['command'] = replace_special_chars(' '.join(stats['command']))
350
351
        return stats
352