Test Failed
Push — master ( 183265...afa1da )
by Nicolas
03:15 queued 16s
created

glances.plugins.containers.engines.podman   C

Complexity

Total Complexity 57

Size/Duplication

Total Lines 389
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 237
dl 0
loc 389
rs 5.04
c 0
b 0
f 0
wmc 57

21 Methods

Rating   Name   Duplication   Size   Complexity  
B PodmanContainersExtension.generate_stats() 0 51 6
A PodmanContainerStatsFetcher.stop() 0 2 1
A PodmanPodStatsFetcher._get_network_stats() 0 25 4
A PodmanPodStatsFetcher.activity_stats() 0 22 2
A PodmanPodStatsFetcher._log_debug() 0 3 1
A PodmanPodStatsFetcher.__init__() 0 7 1
A PodmanPodStatsFetcher._get_io_stats() 0 25 4
A PodmanContainersExtension.update_version() 0 4 1
A PodmanContainersExtension.stop() 0 7 3
A PodmanContainersExtension.__init__() 0 11 2
A PodmanContainerStatsFetcher.time_since_update() 0 4 1
A PodmanContainersExtension.key() 0 4 1
A PodmanPodStatsFetcher._get_cpu_stats() 0 11 2
A PodmanContainerStatsFetcher.__init__() 0 13 1
C PodmanContainersExtension.update() 0 46 9
A PodmanPodStatsFetcher.stop() 0 2 1
A PodmanContainerStatsFetcher.activity_stats() 0 10 1
A PodmanPodStatsFetcher._get_memory_stats() 0 20 4
A PodmanContainersExtension.connect() 0 9 2
A PodmanContainerStatsFetcher.get_streamed_stats() 0 7 2
B PodmanContainerStatsFetcher._compute_activity_stats() 0 45 8

How to fix   Complexity   

Complexity

Complex classes like glances.plugins.containers.engines.podman often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
8
"""Podman Extension unit for Glances' Containers plugin."""
9
10
import time
11
from datetime import datetime
12
from typing import Any, Dict, Optional, Tuple
13
14
from glances.globals import iterkeys, itervalues, nativestr, pretty_date, replace_special_chars, string_value_to_float
15
from glances.logger import logger
16
from glances.plugins.containers.stats_streamer import ThreadedIterableStreamer
17
18
# Podman library (optional and Linux-only)
19
# https://pypi.org/project/podman/
20
try:
21
    from podman import PodmanClient
22
except Exception as e:
23
    import_podman_error_tag = True
24
    # Display debug message if import KeyError
25
    logger.warning(f"Error loading Podman deps Lib. Podman feature in the Containers plugin is disabled ({e})")
26
else:
27
    import_podman_error_tag = False
28
29
30
class PodmanContainerStatsFetcher:
31
    MANDATORY_FIELDS = ["CPU", "MemUsage", "MemLimit", "BlockInput", "BlockOutput"]
32
33
    def __init__(self, container):
34
        self._container = container
35
36
        # Previous stats are stored in the self._old_computed_stats variable
37
        # We store time data to enable rate calculations to avoid complexity for consumers of the APIs exposed.
38
        self._old_computed_stats = {}
39
40
        # Last time when output stats (results) were computed
41
        self._last_stats_computed_time = 0
42
43
        # Threaded Streamer
44
        stats_iterable = container.stats(decode=True)
45
        self._streamer = ThreadedIterableStreamer(stats_iterable, initial_stream_value={})
46
47
    def stop(self):
48
        self._streamer.stop()
49
50
    def get_streamed_stats(self) -> Dict[str, Any]:
51
        stats = self._streamer.stats
52
        if stats["Error"]:
53
            logger.error(f"containers (Podman) Container({self._container.id}): Stats fetching failed")
54
            logger.debug(f"containers (Podman) Container({self._container.id}): ", stats)
55
56
        return stats["Stats"][0]
57
58
    @property
59
    def activity_stats(self) -> Dict[str, Any]:
60
        """Activity Stats
61
62
        Each successive access of activity_stats will cause computation of activity_stats
63
        """
64
        computed_activity_stats = self._compute_activity_stats()
65
        self._old_computed_stats = computed_activity_stats
66
        self._last_stats_computed_time = time.time()
67
        return computed_activity_stats
68
69
    def _compute_activity_stats(self) -> Dict[str, Dict[str, Any]]:
70
        stats = {"cpu": {}, "memory": {}, "io": {}, "network": {}}
71
        api_stats = self.get_streamed_stats()
72
73
        if any(field not in api_stats for field in self.MANDATORY_FIELDS) or (
74
            "Network" not in api_stats and any(k not in api_stats for k in ['NetInput', 'NetOutput'])
75
        ):
76
            logger.error(f"containers (Podman) Container({self._container.id}): Missing mandatory fields")
77
            return stats
78
79
        try:
80
            stats["cpu"]["total"] = api_stats['CPU']
81
82
            stats["memory"]["usage"] = api_stats["MemUsage"]
83
            stats["memory"]["limit"] = api_stats["MemLimit"]
84
85
            stats["io"]["ior"] = api_stats["BlockInput"]
86
            stats["io"]["iow"] = api_stats["BlockOutput"]
87
            stats["io"]["time_since_update"] = 1
88
            # Hardcode `time_since_update` to 1 as podman already sends at the same fixed rate per second
89
90
            if "Network" not in api_stats:
91
                # For podman rooted mode
92
                stats["network"]['rx'] = api_stats["NetInput"]
93
                stats["network"]['tx'] = api_stats["NetOutput"]
94
                stats["network"]['time_since_update'] = 1
95
                # Hardcode to 1 as podman already sends at the same fixed rate per second
96
            elif api_stats["Network"] is not None:
97
                # api_stats["Network"] can be None if the infra container of the pod is killed
98
                # For podman in rootless mode
99
                stats['network'] = {
100
                    "cumulative_rx": sum(interface["RxBytes"] for interface in api_stats["Network"].values()),
101
                    "cumulative_tx": sum(interface["TxBytes"] for interface in api_stats["Network"].values()),
102
                }
103
                # Using previous stats to calculate rates
104
                old_network_stats = self._old_computed_stats.get("network")
105
                if old_network_stats:
106
                    stats['network']['time_since_update'] = round(self.time_since_update)
107
                    stats['network']['rx'] = stats['network']['cumulative_rx'] - old_network_stats["cumulative_rx"]
108
                    stats['network']['tx'] = stats['network']['cumulative_tx'] - old_network_stats['cumulative_tx']
109
110
        except ValueError as e:
111
            logger.error(f"containers (Podman) Container({self._container.id}): Non float stats values found", e)
112
113
        return stats
114
115
    @property
116
    def time_since_update(self) -> float:
117
        # In case no update (at startup), default to 1
118
        return max(1, self._streamer.last_update_time - self._last_stats_computed_time)
119
120
121
class PodmanPodStatsFetcher:
122
    def __init__(self, pod_manager):
123
        self._pod_manager = pod_manager
124
125
        # Threaded Streamer
126
        # Temporary patch to get podman extension working
127
        stats_iterable = (pod_manager.stats(decode=True) for _ in iter(int, 1))
128
        self._streamer = ThreadedIterableStreamer(stats_iterable, initial_stream_value={}, sleep_duration=2)
129
130
    def _log_debug(self, msg, exception=None):
131
        logger.debug(f"containers (Podman): Pod Manager - {msg} ({exception})")
132
        logger.debug(self._streamer.stats)
133
134
    def stop(self):
135
        self._streamer.stop()
136
137
    @property
138
    def activity_stats(self):
139
        result_stats = {}
140
        container_stats = self._streamer.stats
141
        for stat in container_stats:
142
            io_stats = self._get_io_stats(stat)
143
            cpu_stats = self._get_cpu_stats(stat)
144
            memory_stats = self._get_memory_stats(stat)
145
            network_stats = self._get_network_stats(stat)
146
147
            computed_stats = {
148
                "name": stat["Name"],
149
                "cid": stat["CID"],
150
                "pod_id": stat["Pod"],
151
                "io": io_stats or {},
152
                "memory": memory_stats or {},
153
                "network": network_stats or {},
154
                "cpu": cpu_stats or {},
155
            }
156
            result_stats[stat["CID"]] = computed_stats
157
158
        return result_stats
159
160
    def _get_cpu_stats(self, stats: Dict) -> Optional[Dict]:
161
        """Return the container CPU usage.
162
163
        Output: a dict {'total': 1.49}
164
        """
165
        if "CPU" not in stats:
166
            self._log_debug("Missing CPU usage fields")
167
            return None
168
169
        cpu_usage = string_value_to_float(stats["CPU"].rstrip("%"))
170
        return {"total": cpu_usage}
171
172
    def _get_memory_stats(self, stats) -> Optional[Dict]:
173
        """Return the container MEMORY.
174
175
        Output: a dict {'usage': ..., 'limit': ...}
176
        """
177
        if "MemUsage" not in stats or "/" not in stats["MemUsage"]:
178
            self._log_debug("Missing MEM usage fields")
179
            return None
180
181
        memory_usage_str = stats["MemUsage"]
182
        usage_str, limit_str = memory_usage_str.split("/")
183
184
        try:
185
            usage = string_value_to_float(usage_str)
186
            limit = string_value_to_float(limit_str)
187
        except ValueError as e:
188
            self._log_debug("Compute MEM usage failed", e)
189
            return None
190
191
        return {'usage': usage, 'limit': limit, 'inactive_file': 0}
192
193
    def _get_network_stats(self, stats) -> Optional[Dict]:
194
        """Return the container network usage using the Docker API (v1.0 or higher).
195
196
        Output: a dict {'time_since_update': 3000, 'rx': 10, 'tx': 65}.
197
        with:
198
            time_since_update: number of seconds elapsed between the latest grab
199
            rx: Number of bytes received
200
            tx: Number of bytes transmitted
201
        """
202
        if "NetIO" not in stats or "/" not in stats["NetIO"]:
203
            self._log_debug("Compute MEM usage failed")
204
            return None
205
206
        net_io_str = stats["NetIO"]
207
        rx_str, tx_str = net_io_str.split("/")
208
209
        try:
210
            rx = string_value_to_float(rx_str)
211
            tx = string_value_to_float(tx_str)
212
        except ValueError as e:
213
            self._log_debug("Compute MEM usage failed", e)
214
            return None
215
216
        # Hardcode `time_since_update` to 1 as podman docs don't specify the rate calculation procedure
217
        return {"rx": rx, "tx": tx, "time_since_update": 1}
218
219
    def _get_io_stats(self, stats) -> Optional[Dict]:
220
        """Return the container IO usage using the Docker API (v1.0 or higher).
221
222
        Output: a dict {'time_since_update': 3000, 'ior': 10, 'iow': 65}.
223
        with:
224
            time_since_update: number of seconds elapsed between the latest grab
225
            ior: Number of bytes read
226
            iow: Number of bytes written
227
        """
228
        if "BlockIO" not in stats or "/" not in stats["BlockIO"]:
229
            self._log_debug("Missing BlockIO usage fields")
230
            return None
231
232
        block_io_str = stats["BlockIO"]
233
        ior_str, iow_str = block_io_str.split("/")
234
235
        try:
236
            ior = string_value_to_float(ior_str)
237
            iow = string_value_to_float(iow_str)
238
        except ValueError as e:
239
            self._log_debug("Compute BlockIO usage failed", e)
240
            return None
241
242
        # Hardcode `time_since_update` to 1 as podman docs don't specify the rate calculation procedure
243
        return {"ior": ior, "iow": iow, "time_since_update": 1}
244
245
246
class PodmanContainersExtension:
247
    """Glances' Containers Plugin's Docker Extension unit"""
248
249
    CONTAINER_ACTIVE_STATUS = ['running', 'paused']
250
251
    def __init__(self, podman_sock):
252
        if import_podman_error_tag:
253
            raise Exception("Missing libs required to run Podman Extension (Containers)")
254
255
        self.client = None
256
        self.ext_name = "containers (Podman)"
257
        self.podman_sock = podman_sock
258
        self.pods_stats_fetcher = None
259
        self.container_stats_fetchers = {}
260
261
        self.connect()
262
263
    def connect(self):
264
        """Connect to Podman."""
265
        try:
266
            self.client = PodmanClient(base_url=self.podman_sock)
267
            # PodmanClient works lazily, so make a ping to determine if socket is open
268
            self.client.ping()
269
        except Exception as e:
270
            logger.debug(f"{self.ext_name} plugin - Can't connect to Podman ({e})")
271
            self.client = None
272
273
    def update_version(self):
274
        # Long and not useful anymore because the information is no more displayed in UIs
275
        # return self.client.version()
276
        return {}
277
278
    def stop(self) -> None:
279
        # Stop all streaming threads
280
        for t in itervalues(self.container_stats_fetchers):
281
            t.stop()
282
283
        if self.pods_stats_fetcher:
284
            self.pods_stats_fetcher.stop()
285
286
    def update(self, all_tag) -> Tuple[Dict, list[Dict[str, Any]]]:
287
        """Update Podman stats using the input method."""
288
289
        if not self.client:
290
            return {}, []
291
292
        version_stats = self.update_version()
293
294
        # Update current containers list
295
        try:
296
            # Issue #1152: Podman module doesn't export details about stopped containers
297
            # The Containers/all key of the configuration file should be set to True
298
            containers = self.client.containers.list(all=all_tag)
299
            if not self.pods_stats_fetcher:
300
                self.pods_stats_fetcher = PodmanPodStatsFetcher(self.client.pods)
301
        except Exception as e:
302
            logger.error(f"{self.ext_name} plugin - Can't get containers list ({e})")
303
            return version_stats, []
304
305
        # Start new thread for new container
306
        for container in containers:
307
            if container.id not in self.container_stats_fetchers:
308
                # StatsFetcher did not exist in the internal dict
309
                # Create it, add it to the internal dict
310
                logger.debug(f"{self.ext_name} plugin - Create thread for container {container.id[:12]}")
311
                self.container_stats_fetchers[container.id] = PodmanContainerStatsFetcher(container)
312
313
        # Stop threads for non-existing containers
314
        absent_containers = set(iterkeys(self.container_stats_fetchers)) - {c.id for c in containers}
315
        for container_id in absent_containers:
316
            # Stop the StatsFetcher
317
            logger.debug(f"{self.ext_name} plugin - Stop thread for old container {container_id[:12]}")
318
            self.container_stats_fetchers[container_id].stop()
319
            # Delete the StatsFetcher from the dict
320
            del self.container_stats_fetchers[container_id]
321
322
        # Get stats for all containers
323
        container_stats = [self.generate_stats(container) for container in containers]
324
325
        pod_stats = self.pods_stats_fetcher.activity_stats
326
        for stats in container_stats:
327
            if stats["id"][:12] in pod_stats:
328
                stats["pod_name"] = pod_stats[stats["id"][:12]]["name"]
329
                stats["pod_id"] = pod_stats[stats["id"][:12]]["pod_id"]
330
331
        return version_stats, container_stats
332
333
    @property
334
    def key(self) -> str:
335
        """Return the key of the list."""
336
        return 'name'
337
338
    def generate_stats(self, container) -> Dict[str, Any]:
339
        # Init the stats for the current container
340
        stats = {
341
            'key': self.key,
342
            'name': nativestr(container.name),
343
            'id': container.id,
344
            'image': ','.join(container.image.tags if container.image.tags else []),
345
            'status': container.attrs['State'],
346
            'created': container.attrs['Created'],
347
            'command': container.attrs.get('Command') or [],
348
            'io': {},
349
            'cpu': {},
350
            'memory': {},
351
            'network': {},
352
            'io_rx': None,
353
            'io_wx': None,
354
            'cpu_percent': None,
355
            'memory_percent': None,
356
            'network_rx': None,
357
            'network_tx': None,
358
            'uptime': None,
359
        }
360
361
        if stats['status'] not in self.CONTAINER_ACTIVE_STATUS:
362
            return stats
363
364
        stats_fetcher = self.container_stats_fetchers[container.id]
365
        activity_stats = stats_fetcher.activity_stats
366
        stats.update(activity_stats)
367
368
        # Additional fields
369
        stats['cpu_percent'] = stats['cpu'].get('total')
370
        stats['memory_usage'] = stats['memory'].get('usage')
371
        if stats['memory'].get('cache') is not None:
372
            stats['memory_usage'] -= stats['memory']['cache']
373
374
        if all(k in stats['io'] for k in ('ior', 'iow', 'time_since_update')):
375
            stats['io_rx'] = stats['io']['ior'] // stats['io']['time_since_update']
376
            stats['io_wx'] = stats['io']['iow'] // stats['io']['time_since_update']
377
378
        if all(k in stats['network'] for k in ('rx', 'tx', 'time_since_update')):
379
            stats['network_rx'] = stats['network']['rx'] // stats['network']['time_since_update']
380
            stats['network_tx'] = stats['network']['tx'] // stats['network']['time_since_update']
381
382
        started_at = datetime.fromtimestamp(container.attrs['StartedAt'])
383
        stats['uptime'] = pretty_date(started_at)
384
385
        # Manage special chars in command (see issue#2733)
386
        stats['command'] = replace_special_chars(' '.join(stats['command']))
387
388
        return stats
389