Test Failed
Push — master ( ee826a...d9056e )
by Nicolas
03:09
created

DockerStatsFetcher._get_cpu_stats()   A

Complexity

Conditions 3

Size

Total Lines 33
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 19
nop 1
dl 0
loc 33
rs 9.45
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Docker Extension unit for Glances' Containers plugin."""
11
import time
12
13
from glances.globals import iterkeys, itervalues, nativestr, pretty_date, replace_special_chars
14
from glances.logger import logger
15
from glances.plugins.containers.stats_streamer import StatsStreamer
16
17
# Docker-py library (optional and Linux-only)
18
# https://github.com/docker/docker-py
19
try:
20
    import requests
21
    import docker
22
    from dateutil import parser, tz
23
except Exception as e:
24
    import_docker_error_tag = True
25
    # Display debug message if import KeyError
26
    logger.warning("Error loading Docker deps Lib. Docker plugin is disabled ({})".format(e))
27
else:
28
    import_docker_error_tag = False
29
30
31
class DockerStatsFetcher:
32
    MANDATORY_MEMORY_FIELDS = ['usage', 'limit']
33
34
    def __init__(self, container):
35
        self._container = container
36
37
        # Previous computes stats are stored in the self._old_computed_stats variable
38
        # We store time data to enable IoR/s & IoW/s calculations to avoid complexity for consumers of the APIs exposed.
39
        self._old_computed_stats = {}
40
41
        # Last time when output stats (results) were computed
42
        self._last_stats_computed_time = 0
43
44
        # Threaded Streamer
45
        stats_iterable = container.stats(decode=True)
46
        self._streamer = StatsStreamer(stats_iterable, initial_stream_value={})
47
48
    def _log_debug(self, msg, exception=None):
49
        logger.debug("containers (Docker) ID: {} - {} ({}) ".format(self._container.id, msg, exception))
50
        logger.debug(self._streamer.stats)
51
52
    def stop(self):
53
        self._streamer.stop()
54
55
    @property
56
    def activity_stats(self):
57
        """Activity Stats
58
59
        Each successive access of activity_stats will cause computation of activity_stats
60
        """
61
        computed_activity_stats = self._compute_activity_stats()
62
        self._old_computed_stats = computed_activity_stats
63
        self._last_stats_computed_time = time.time()
64
        return computed_activity_stats
65
66
    def _compute_activity_stats(self):
67
        with self._streamer.result_lock:
68
            io_stats = self._get_io_stats()
69
            cpu_stats = self._get_cpu_stats()
70
            memory_stats = self._get_memory_stats()
71
            network_stats = self._get_network_stats()
72
73
        computed_stats = {
74
            "io": io_stats or {},
75
            "memory": memory_stats or {},
76
            "network": network_stats or {},
77
            "cpu": cpu_stats or {"total": 0.0},
78
        }
79
        return computed_stats
80
81
    @property
82
    def time_since_update(self):
83
        # In case no update, default to 1
84
        return max(1, self._streamer.last_update_time - self._last_stats_computed_time)
85
86
    def _get_cpu_stats(self):
87
        """Return the container CPU usage.
88
89
        Output: a dict {'total': 1.49}
90
        """
91
        stats = {'total': 0.0}
92
93
        try:
94
            cpu_stats = self._streamer.stats['cpu_stats']
95
            precpu_stats = self._streamer.stats['precpu_stats']
96
            cpu = {'system': cpu_stats['system_cpu_usage'], 'total': cpu_stats['cpu_usage']['total_usage']}
97
            precpu = {'system': precpu_stats['system_cpu_usage'], 'total': precpu_stats['cpu_usage']['total_usage']}
98
99
            # Issue #1857
100
            # If either precpu_stats.online_cpus or cpu_stats.online_cpus is nil
101
            # then for compatibility with older daemons the length of
102
            # the corresponding cpu_usage.percpu_usage array should be used.
103
            cpu['nb_core'] = cpu_stats.get('online_cpus') or len(cpu_stats['cpu_usage']['percpu_usage'] or [])
104
        except KeyError as e:
105
            self._log_debug("Can't grab CPU stats", e)
106
            return None
107
108
        try:
109
            cpu_delta = cpu['total'] - precpu['total']
110
            system_cpu_delta = cpu['system'] - precpu['system']
111
            # CPU usage % = (cpu_delta / system_cpu_delta) * number_cpus * 100.0
112
            stats['total'] = (cpu_delta / system_cpu_delta) * cpu['nb_core'] * 100.0
113
        except TypeError as e:
114
            self._log_debug("Can't compute CPU usage", e)
115
            return None
116
117
        # Return the stats
118
        return stats
119
120
    def _get_memory_stats(self):
121
        """Return the container MEMORY.
122
123
        Output: a dict {'usage': ..., 'limit': ..., 'inactive_file': ...}
124
125
        Note:the displayed memory usage is 'usage - inactive_file'
126
        """
127
        memory_stats = self._streamer.stats.get('memory_stats')
128
129
        # Checks for memory_stats & mandatory fields
130
        if not memory_stats or any(field not in memory_stats for field in self.MANDATORY_MEMORY_FIELDS):
131
            self._log_debug("Missing MEM usage fields")
132
            return None
133
134
        stats = {field: memory_stats[field] for field in self.MANDATORY_MEMORY_FIELDS}
135
136
        # Optional field stats: inactive_file
137
        if memory_stats.get('stats', {}).get('inactive_file') is not None:
138
            stats['inactive_file'] = memory_stats['stats']['inactive_file']
139
140
        # Return the stats
141
        return stats
142
143
    def _get_network_stats(self):
144
        """Return the container network usage using the Docker API (v1.0 or higher).
145
146
        Output: a dict {'time_since_update': 3000, 'rx': 10, 'tx': 65}.
147
        with:
148
            time_since_update: number of seconds elapsed between the latest grab
149
            rx: Number of bytes received
150
            tx: Number of bytes transmitted
151
        """
152
        eth0_stats = self._streamer.stats.get('networks', {}).get('eth0')
153
154
        # Checks for net_stats & mandatory fields
155
        if not eth0_stats or any(field not in eth0_stats for field in ['rx_bytes', 'tx_bytes']):
156
            self._log_debug("Missing Network usage fields")
157
            return None
158
159
        # Read the rx/tx stats (in bytes)
160
        stats = {'cumulative_rx': eth0_stats["rx_bytes"], 'cumulative_tx': eth0_stats["tx_bytes"]}
161
162
        # Using previous stats to calculate rates
163
        old_network_stats = self._old_computed_stats.get("network")
164
        if old_network_stats:
165
            stats['time_since_update'] = round(self.time_since_update)
166
            stats['rx'] = stats['cumulative_rx'] - old_network_stats["cumulative_rx"]
167
            stats['tx'] = stats['cumulative_tx'] - old_network_stats['cumulative_tx']
168
169
        # Return the stats
170
        return stats
171
172
    def _get_io_stats(self):
173
        """Return the container IO usage using the Docker API (v1.0 or higher).
174
175
        Output: a dict {'time_since_update': 3000, 'ior': 10, 'iow': 65}.
176
        with:
177
            time_since_update: number of seconds elapsed between the latest grab
178
            ior: Number of bytes read
179
            iow: Number of bytes written
180
        """
181
        io_service_bytes_recursive = self._streamer.stats.get('blkio_stats', {}).get('io_service_bytes_recursive')
182
183
        # Checks for net_stats
184
        if not io_service_bytes_recursive:
185
            self._log_debug("Missing blockIO usage fields")
186
            return None
187
188
        # Read the ior/iow stats (in bytes)
189
        try:
190
            # Read IOR and IOW value in the structure list of dict
191
            cumulative_ior = [i for i in io_service_bytes_recursive if i['op'].lower() == 'read'][0]['value']
192
            cumulative_iow = [i for i in io_service_bytes_recursive if i['op'].lower() == 'write'][0]['value']
193
        except (TypeError, IndexError, KeyError, AttributeError) as e:
194
            self._log_debug("Can't grab blockIO usage", e)  # stats do not have io information
195
            return None
196
197
        stats = {'cumulative_ior': cumulative_ior, 'cumulative_iow': cumulative_iow}
198
199
        # Using previous stats to calculate difference
200
        old_io_stats = self._old_computed_stats.get("io")
201
        if old_io_stats:
202
            stats['time_since_update'] = round(self.time_since_update)
203
            stats['ior'] = stats['cumulative_ior'] - old_io_stats["cumulative_ior"]
204
            stats['iow'] = stats['cumulative_iow'] - old_io_stats["cumulative_iow"]
205
206
        # Return the stats
207
        return stats
208
209
210
class DockerContainersExtension:
211
    """Glances' Containers Plugin's Docker Extension unit"""
212
213
    CONTAINER_ACTIVE_STATUS = ['running', 'paused']
214
215
    def __init__(self):
216
        if import_docker_error_tag:
217
            raise Exception("Missing libs required to run Docker Extension (Containers) ")
218
219
        self.client = None
220
        self.ext_name = "containers (Docker)"
221
        self.stats_fetchers = {}
222
223
        self.connect()
224
225
    def connect(self):
226
        """Connect to the Docker server."""
227
        # Init the Docker API Client
228
        try:
229
            # Do not use the timeout option (see issue #1878)
230
            self.client = docker.from_env()
231
        except Exception as e:
232
            logger.error("{} plugin - Can't connect to Docker ({})".format(self.ext_name, e))
233
            self.client = None
234
235
    def update_version(self):
236
        # Long and not useful anymore because the information is no more displayed in UIs
237
        # return self.client.version()
238
        return {}
239
240
    def stop(self):
241
        # Stop all streaming threads
242
        for t in itervalues(self.stats_fetchers):
243
            t.stop()
244
245
    def update(self, all_tag):
246
        """Update Docker stats using the input method."""
247
248
        if not self.client:
249
            return {}, []
250
251
        version_stats = self.update_version()
252
253
        # Update current containers list
254
        try:
255
            # Issue #1152: Docker module doesn't export details about stopped containers
256
            # The Containers/all key of the configuration file should be set to True
257
            containers = self.client.containers.list(all=all_tag)
258
        except Exception as e:
259
            logger.error("{} plugin - Can't get containers list ({})".format(self.ext_name, e))
260
            return version_stats, []
261
262
        # Start new thread for new container
263
        for container in containers:
264
            if container.id not in self.stats_fetchers:
265
                # StatsFetcher did not exist in the internal dict
266
                # Create it, add it to the internal dict
267
                logger.debug("{} plugin - Create thread for container {}".format(self.ext_name, container.id[:12]))
268
                self.stats_fetchers[container.id] = DockerStatsFetcher(container)
269
270
        # Stop threads for non-existing containers
271
        absent_containers = set(iterkeys(self.stats_fetchers)) - set(c.id for c in containers)
272
        for container_id in absent_containers:
273
            # Stop the StatsFetcher
274
            logger.debug("{} plugin - Stop thread for old container {}".format(self.ext_name, container_id[:12]))
275
            self.stats_fetchers[container_id].stop()
276
            # Delete the StatsFetcher from the dict
277
            del self.stats_fetchers[container_id]
278
279
        # Get stats for all containers
280
        container_stats = [self.generate_stats(container) for container in containers]
281
        return version_stats, container_stats
282
283
    @property
284
    def key(self):
285
        """Return the key of the list."""
286
        return 'name'
287
288
    def generate_stats(self, container):
289
        # Init the stats for the current container
290
        stats = {
291
            'key': self.key,
292
            # Export name
293
            'name': nativestr(container.name),
294
            # Container Id
295
            'id': container.id,
296
            # Container Status (from attrs)
297
            'status': container.attrs['State']['Status'],
298
            'created': container.attrs['Created'],
299
            'command': [],
300
        }
301
302
        # Container Image
303
        try:
304
            # API fails on Unraid - See issue 2233
305
            stats['image'] = (','.join(container.image.tags if container.image.tags else []),)
306
        except requests.exceptions.HTTPError:
307
            stats['image'] = ''
308
309
        if container.attrs['Config'].get('Entrypoint', None):
310
            stats['command'].extend(container.attrs['Config'].get('Entrypoint', []))
311
        if container.attrs['Config'].get('Cmd', None):
312
            stats['command'].extend(container.attrs['Config'].get('Cmd', []))
313
        if not stats['command']:
314
            stats['command'] = None
315
316
        if stats['status'] in self.CONTAINER_ACTIVE_STATUS:
317
            started_at = container.attrs['State']['StartedAt']
318
            stats_fetcher = self.stats_fetchers[container.id]
319
            activity_stats = stats_fetcher.activity_stats
320
            stats.update(activity_stats)
321
322
            # Additional fields
323
            stats['cpu_percent'] = stats["cpu"]['total']
324
            stats['memory_usage'] = stats["memory"].get('usage')
325
            if stats['memory'].get('cache') is not None:
326
                stats['memory_usage'] -= stats['memory']['cache']
327
            if 'time_since_update' in stats['io']:
328
                stats['io_rx'] = stats['io'].get('ior') // stats['io'].get('time_since_update')
329
                stats['io_wx'] = stats['io'].get('iow') // stats['io'].get('time_since_update')
330
            if 'time_since_update' in stats['network']:
331
                stats['network_rx'] = stats['network'].get('rx') // stats['network'].get('time_since_update')
332
                stats['network_tx'] = stats['network'].get('tx') // stats['network'].get('time_since_update')
333
            stats['uptime'] = pretty_date(parser.parse(started_at).astimezone(tz.tzlocal()).replace(tzinfo=None))
334
            # Manage special chars in command (see isse#2733)
335
            stats['command'] = replace_special_chars(' '.join(stats['command']))
336
        else:
337
            stats['io'] = {}
338
            stats['cpu'] = {}
339
            stats['memory'] = {}
340
            stats['network'] = {}
341
            stats['io_rx'] = None
342
            stats['io_wx'] = None
343
            stats['cpu_percent'] = None
344
            stats['memory_percent'] = None
345
            stats['network_rx'] = None
346
            stats['network_tx'] = None
347
            stats['uptime'] = None
348
349
        return stats
350