Test Failed
Push — master ( 05aaee...10b5c2 )
by Nicolas
04:12 queued 14s
created

DockerContainersExtension.generate_stats()   B

Complexity

Conditions 7

Size

Total Lines 58
Code Lines 44

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 44
nop 2
dl 0
loc 58
rs 7.424
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Docker Extension unit for Glances' Containers plugin."""
11
import time
12
13
import requests
14
15
from glances.compat import iterkeys, itervalues, nativestr, pretty_date
16
from glances.logger import logger
17
from glances.plugins.containers.stats_streamer import StatsStreamer
18
19
# Docker-py library (optional and Linux-only)
20
# https://github.com/docker/docker-py
21
try:
22
    import docker
23
    from dateutil import parser, tz
24
except Exception as e:
25
    import_docker_error_tag = True
26
    # Display debug message if import KeyError
27
    logger.debug("Error loading Docker deps Lib. Docker plugin is disabled ({})".format(e))
28
else:
29
    import_docker_error_tag = False
30
31
32
class DockerStatsFetcher:
33
    MANDATORY_MEMORY_FIELDS = ["usage", 'limit']
34
35
    def __init__(self, container):
36
        self._container = container
37
38
        # Previous computes stats are stored in the self._old_computed_stats variable
39
        # We store time data to enable IoR/s & IoW/s calculations to avoid complexity for consumers of the APIs exposed.
40
        self._old_computed_stats = {}
41
42
        # Last time when output stats (results) were computed
43
        self._last_stats_computed_time = 0
44
45
        # Threaded Streamer
46
        stats_iterable = container.stats(decode=True)
47
        self._streamer = StatsStreamer(stats_iterable, initial_stream_value={})
48
49
    def _log_debug(self, msg, exception=None):
50
        logger.debug("containers (Docker) ID: {} - {} ({}) ".format(self._container.id, msg, exception))
51
        logger.debug(self._streamer.stats)
52
53
    def stop(self):
54
        self._streamer.stop()
55
56
    @property
57
    def activity_stats(self):
58
        """Activity Stats
59
60
        Each successive access of activity_stats will cause computation of activity_stats
61
        """
62
        computed_activity_stats = self._compute_activity_stats()
63
        self._old_computed_stats = computed_activity_stats
64
        self._last_stats_computed_time = time.time()
65
        return computed_activity_stats
66
67
    def _compute_activity_stats(self):
68
        with self._streamer.result_lock:
69
            io_stats = self._get_io_stats()
70
            cpu_stats = self._get_cpu_stats()
71
            memory_stats = self._get_memory_stats()
72
            network_stats = self._get_network_stats()
73
74
        computed_stats = {
75
            "io": io_stats or {},
76
            "memory": memory_stats or {},
77
            "network": network_stats or {},
78
            "cpu": cpu_stats or {"total": 0.0},
79
        }
80
        return computed_stats
81
82
    @property
83
    def time_since_update(self):
84
        # In case no update, default to 1
85
        return max(1, self._streamer.last_update_time - self._last_stats_computed_time)
86
87
    def _get_cpu_stats(self):
88
        """Return the container CPU usage.
89
90
        Output: a dict {'total': 1.49}
91
        """
92
        stats = {'total': 0.0}
93
94
        try:
95
            cpu_stats = self._streamer.stats['cpu_stats']
96
            precpu_stats = self._streamer.stats['precpu_stats']
97
            cpu = {'system': cpu_stats['system_cpu_usage'], 'total': cpu_stats['cpu_usage']['total_usage']}
98
            precpu = {'system': precpu_stats['system_cpu_usage'], 'total': precpu_stats['cpu_usage']['total_usage']}
99
100
            # Issue #1857
101
            # If either precpu_stats.online_cpus or cpu_stats.online_cpus is nil
102
            # then for compatibility with older daemons the length of
103
            # the corresponding cpu_usage.percpu_usage array should be used.
104
            cpu['nb_core'] = cpu_stats.get('online_cpus') or len(cpu_stats['cpu_usage']['percpu_usage'] or [])
105
        except KeyError as e:
106
            self._log_debug("Can't grab CPU stats", e)
107
            return None
108
109
        try:
110
            cpu_delta = cpu['total'] - precpu['total']
111
            system_cpu_delta = cpu['system'] - precpu['system']
112
            # CPU usage % = (cpu_delta / system_cpu_delta) * number_cpus * 100.0
113
            stats['total'] = (cpu_delta / system_cpu_delta) * cpu['nb_core'] * 100.0
114
        except TypeError as e:
115
            self._log_debug("Can't compute CPU usage", e)
116
            return None
117
118
        # Return the stats
119
        return stats
120
121
    def _get_memory_stats(self):
122
        """Return the container MEMORY.
123
124
        Output: a dict {'rss': 1015808, 'cache': 356352,  'usage': ..., 'max_usage': ...}
125
        """
126
        memory_stats = self._streamer.stats.get('memory_stats')
127
128
        # Checks for memory_stats & mandatory fields
129
        if not memory_stats or any(field not in memory_stats for field in self.MANDATORY_MEMORY_FIELDS):
130
            self._log_debug("Missing MEM usage fields")
131
            return None
132
133
        stats = {field: memory_stats[field] for field in self.MANDATORY_MEMORY_FIELDS}
134
        try:
135
            # Issue #1857 - Some stats are not always available in ['memory_stats']['stats']
136
            detailed_stats = memory_stats['stats']
137
            stats['rss'] = detailed_stats.get('rss') or detailed_stats.get('total_rss')
138
            stats['max_usage'] = detailed_stats.get('max_usage')
139
            stats['cache'] = detailed_stats.get('cache')
140
        except (KeyError, TypeError) as e:
141
            self._log_debug("Can't grab MEM usage", e)  # stats do not have MEM information
142
            return None
143
144
        # Return the stats
145
        return stats
146
147
    def _get_network_stats(self):
148
        """Return the container network usage using the Docker API (v1.0 or higher).
149
150
        Output: a dict {'time_since_update': 3000, 'rx': 10, 'tx': 65}.
151
        with:
152
            time_since_update: number of seconds elapsed between the latest grab
153
            rx: Number of bytes received
154
            tx: Number of bytes transmitted
155
        """
156
        eth0_stats = self._streamer.stats.get('networks', {}).get('eth0')
157
158
        # Checks for net_stats & mandatory fields
159
        if not eth0_stats or any(field not in eth0_stats for field in ['rx_bytes', 'tx_bytes']):
160
            self._log_debug("Missing Network usage fields")
161
            return None
162
163
        # Read the rx/tx stats (in bytes)
164
        stats = {'cumulative_rx': eth0_stats["rx_bytes"], 'cumulative_tx': eth0_stats["tx_bytes"]}
165
166
        # Using previous stats to calculate rates
167
        old_network_stats = self._old_computed_stats.get("network")
168
        if old_network_stats:
169
            stats['time_since_update'] = round(self.time_since_update)
170
            stats['rx'] = stats['cumulative_rx'] - old_network_stats["cumulative_rx"]
171
            stats['tx'] = stats['cumulative_tx'] - old_network_stats['cumulative_tx']
172
173
        # Return the stats
174
        return stats
175
176
    def _get_io_stats(self):
177
        """Return the container IO usage using the Docker API (v1.0 or higher).
178
179
        Output: a dict {'time_since_update': 3000, 'ior': 10, 'iow': 65}.
180
        with:
181
            time_since_update: number of seconds elapsed between the latest grab
182
            ior: Number of bytes read
183
            iow: Number of bytes written
184
        """
185
        io_service_bytes_recursive = self._streamer.stats.get('blkio_stats', {}).get('io_service_bytes_recursive')
186
187
        # Checks for net_stats
188
        if not io_service_bytes_recursive:
189
            self._log_debug("Missing blockIO usage fields")
190
            return None
191
192
        # Read the ior/iow stats (in bytes)
193
        try:
194
            # Read IOR and IOW value in the structure list of dict
195
            cumulative_ior = [i for i in io_service_bytes_recursive if i['op'].lower() == 'read'][0]['value']
196
            cumulative_iow = [i for i in io_service_bytes_recursive if i['op'].lower() == 'write'][0]['value']
197
        except (TypeError, IndexError, KeyError, AttributeError) as e:
198
            self._log_debug("Can't grab blockIO usage", e)  # stats do not have io information
199
            return None
200
201
        stats = {'cumulative_ior': cumulative_ior, 'cumulative_iow': cumulative_iow}
202
203
        # Using previous stats to calculate difference
204
        old_io_stats = self._old_computed_stats.get("io")
205
        if old_io_stats:
206
            stats['time_since_update'] = round(self.time_since_update)
207
            stats['ior'] = stats['cumulative_ior'] - old_io_stats["cumulative_ior"]
208
            stats['iow'] = stats['cumulative_iow'] - old_io_stats["cumulative_iow"]
209
210
        # Return the stats
211
        return stats
212
213
214
class DockerContainersExtension:
215
    """Glances' Containers Plugin's Docker Extension unit"""
216
217
    CONTAINER_ACTIVE_STATUS = ['running', 'paused']
218
219
    def __init__(self):
220
        if import_docker_error_tag:
221
            raise Exception("Missing libs required to run Docker Extension (Containers) ")
222
223
        self.client = None
224
        self.ext_name = "containers (Docker)"
225
        self.stats_fetchers = {}
226
227
        self.connect()
228
229
    def connect(self):
230
        """Connect to the Docker server."""
231
        # Init the Docker API Client
232
        try:
233
            # Do not use the timeout option (see issue #1878)
234
            self.client = docker.from_env()
235
        except Exception as e:
236
            logger.error("{} plugin - Can't connect to Docker ({})".format(self.ext_name, e))
237
            self.client = None
238
239
    def update_version(self):
240
        # Long and not useful anymore because the information is no more displayed in UIs
241
        # return self.client.version()
242
        return {}
243
244
    def stop(self):
245
        # Stop all streaming threads
246
        for t in itervalues(self.stats_fetchers):
247
            t.stop()
248
249
    def update(self, all_tag):
250
        """Update Docker stats using the input method."""
251
252
        if not self.client:
253
            return {}, []
254
255
        version_stats = self.update_version()
256
257
        # Update current containers list
258
        try:
259
            # Issue #1152: Docker module doesn't export details about stopped containers
260
            # The Containers/all key of the configuration file should be set to True
261
            containers = self.client.containers.list(all=all_tag)
262
        except Exception as e:
263
            logger.error("{} plugin - Can't get containers list ({})".format(self.ext_name, e))
264
            return version_stats, []
265
266
        # Start new thread for new container
267
        for container in containers:
268
            if container.id not in self.stats_fetchers:
269
                # StatsFetcher did not exist in the internal dict
270
                # Create it, add it to the internal dict
271
                logger.debug("{} plugin - Create thread for container {}".format(self.ext_name, container.id[:12]))
272
                self.stats_fetchers[container.id] = DockerStatsFetcher(container)
273
274
        # Stop threads for non-existing containers
275
        absent_containers = set(iterkeys(self.stats_fetchers)) - set(c.id for c in containers)
276
        for container_id in absent_containers:
277
            # Stop the StatsFetcher
278
            logger.debug("{} plugin - Stop thread for old container {}".format(self.ext_name, container_id[:12]))
279
            self.stats_fetchers[container_id].stop()
280
            # Delete the StatsFetcher from the dict
281
            del self.stats_fetchers[container_id]
282
283
        # Get stats for all containers
284
        container_stats = [self.generate_stats(container) for container in containers]
285
        return version_stats, container_stats
286
287
    @property
288
    def key(self):
289
        """Return the key of the list."""
290
        return 'name'
291
292
    def generate_stats(self, container):
293
        # Init the stats for the current container
294
        stats = {
295
            'key': self.key,
296
            # Export name
297
            'name': nativestr(container.name),
298
            # Container Id
299
            'Id': container.id,
300
            # Container Status (from attrs)
301
            'Status': container.attrs['State']['Status'],
302
            'Created': container.attrs['Created'],
303
            'Command': [],
304
        }
305
306
        # Container Image
307
        try:
308
            # API fails on Unraid - See issue 2233
309
            stats['Image'] = container.image.tags
310
        except requests.exceptions.HTTPError:
311
            stats['Image'] = '-'
312
313
        if container.attrs['Config'].get('Entrypoint', None):
314
            stats['Command'].extend(container.attrs['Config'].get('Entrypoint', []))
315
        if container.attrs['Config'].get('Cmd', None):
316
            stats['Command'].extend(container.attrs['Config'].get('Cmd', []))
317
        if not stats['Command']:
318
            stats['Command'] = None
319
320
        if stats['Status'] in self.CONTAINER_ACTIVE_STATUS:
321
            started_at = container.attrs['State']['StartedAt']
322
            stats_fetcher = self.stats_fetchers[container.id]
323
            activity_stats = stats_fetcher.activity_stats
324
            stats.update(activity_stats)
325
326
            # Additional fields
327
            stats['cpu_percent'] = stats["cpu"]['total']
328
            stats['memory_usage'] = stats["memory"].get('usage')
329
            if stats['memory'].get('cache') is not None:
330
                stats['memory_usage'] -= stats['memory']['cache']
331
            stats['io_r'] = stats['io'].get('ior')
332
            stats['io_w'] = stats['io'].get('iow')
333
            stats['network_rx'] = stats['network'].get('rx')
334
            stats['network_tx'] = stats['network'].get('tx')
335
            stats['Uptime'] = pretty_date(parser.parse(started_at).astimezone(tz.tzlocal()).replace(tzinfo=None))
336
        else:
337
            stats['io'] = {}
338
            stats['cpu'] = {}
339
            stats['memory'] = {}
340
            stats['network'] = {}
341
            stats['io_r'] = None
342
            stats['io_w'] = None
343
            stats['cpu_percent'] = None
344
            stats['memory_percent'] = None
345
            stats['network_rx'] = None
346
            stats['network_tx'] = None
347
            stats['Uptime'] = None
348
349
        return stats
350