Completed
Push — master ( 1806d1...053f07 )
by Nicolas
01:42
created

Plugin.get_docker_io()   F

Complexity

Conditions 16

Size

Total Lines 61

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 16
dl 0
loc 61
rs 3.0099

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like Plugin.get_docker_io() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# Copyright (C) 2015 Nicolargo <[email protected]>
6
#
7
# Glances is free software; you can redistribute it and/or modify
8
# it under the terms of the GNU Lesser General Public License as published by
9
# the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
11
#
12
# Glances is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
# GNU Lesser General Public License for more details.
16
#
17
# You should have received a copy of the GNU Lesser General Public License
18
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20
"""Docker plugin."""
21
22
import os
23
import re
24
import threading
25
import time
26
27
from glances.compat import iterkeys, itervalues
28
from glances.logger import logger
29
from glances.timer import getTimeSinceLastUpdate
30
from glances.plugins.glances_plugin import GlancesPlugin
31
32
# Docker-py library (optional and Linux-only)
33
# https://github.com/docker/docker-py
34
try:
35
    import docker
36
    import requests
37
except ImportError as e:
38
    logger.debug("Docker library not found (%s). Glances cannot grab Docker info." % e)
39
    docker_tag = False
40
else:
41
    docker_tag = True
42
43
44
class Plugin(GlancesPlugin):
45
46
    """Glances Docker plugin.
47
48
    stats is a list
49
    """
50
51
    def __init__(self, args=None):
52
        """Init the plugin."""
53
        super(Plugin, self).__init__(args=args)
54
55
        # The plgin can be disable using: args.disable_docker
56
        self.args = args
57
58
        # We want to display the stat in the curse interface
59
        self.display_curse = True
60
61
        # Init the Docker API
62
        self.docker_client = False
63
64
        # Dict of thread (to grab stats asynchroniously, one thread is created by container)
65
        # key: Container Id
66
        # value: instance of ThreadDockerGrabber
67
        self.thread_list = {}
68
69
    def exit(self):
70
        """Overwrite the exit method to close threads"""
71
        logger.debug("Stop the Docker plugin")
72
        for t in itervalues(self.thread_list):
73
            t.stop()
74
75
    def get_key(self):
76
        """Return the key of the list."""
77
        return 'name'
78
79
    def get_export(self):
80
        """Overwrite the default export method.
81
82
        - Only exports containers
83
        - The key is the first container name
84
        """
85
        ret = []
86
        try:
87
            ret = self.stats['containers']
88
        except KeyError as e:
89
            logger.debug("Docker export error {0}".format(e))
90
        return ret
91
92
    def connect(self, version=None):
93
        """Connect to the Docker server."""
94
        # Init connection to the Docker API
95
        try:
96
            if version is None:
97
                ret = docker.Client(base_url='unix://var/run/docker.sock')
98
            else:
99
                ret = docker.Client(base_url='unix://var/run/docker.sock',
100
                                    version=version)
101
        except NameError:
102
            # docker lib not found
103
            return None
104
        try:
105
            ret.version()
106
        except requests.exceptions.ConnectionError as e:
107
            # Connexion error (Docker not detected)
108
            # Let this message in debug mode
109
            logger.debug("Can't connect to the Docker server (%s)" % e)
110
            return None
111
        except docker.errors.APIError as e:
112
            if version is None:
113
                # API error (Version mismatch ?)
114
                logger.debug("Docker API error (%s)" % e)
115
                # Try the connection with the server version
116
                version = re.search('(?:server API version|server)\:\ (.*)\)\".*\)', str(e))
117
                if version:
118
                    logger.debug("Try connection with Docker API version %s" % version.group(1))
119
                    ret = self.connect(version=version.group(1))
120
                else:
121
                    logger.debug("Can not retreive Docker server version")
122
                    ret = None
123
            else:
124
                # API error
125
                logger.error("Docker API error (%s)" % e)
126
                ret = None
127
        except Exception as e:
128
            # Others exceptions...
129
            # Connexion error (Docker not detected)
130
            logger.error("Can't connect to the Docker server (%s)" % e)
131
            ret = None
132
133
        # Log an info if Docker plugin is disabled
134
        if ret is None:
135
            logger.debug("Docker plugin is disable because an error has been detected")
136
137
        return ret
138
139
    def reset(self):
140
        """Reset/init the stats."""
141
        self.stats = {}
142
143
    @GlancesPlugin._log_result_decorator
144
    def update(self):
145
        """Update Docker stats using the input method."""
146
        # Reset stats
147
        self.reset()
148
149
        # Get the current Docker API client
150
        if not self.docker_client:
151
            # First time, try to connect to the server
152
            self.docker_client = self.connect()
153
            if self.docker_client is None:
154
                global docker_tag
155
                docker_tag = False
156
157
        # The Docker-py lib is mandatory
158
        if not docker_tag or (self.args is not None and self.args.disable_docker):
159
            return self.stats
160
161
        if self.input_method == 'local':
162
            # Update stats
163
164
            # Docker version
165
            # Exemple: {
166
            #     "KernelVersion": "3.16.4-tinycore64",
167
            #     "Arch": "amd64",
168
            #     "ApiVersion": "1.15",
169
            #     "Version": "1.3.0",
170
            #     "GitCommit": "c78088f",
171
            #     "Os": "linux",
172
            #     "GoVersion": "go1.3.3"
173
            # }
174
            try:
175
                self.stats['version'] = self.docker_client.version()
176
            except Exception as e:
177
                # Correct issue#649
178
                logger.error("{0} plugin - Cannot get Docker version ({1})".format(self.plugin_name, e))
179
                return self.stats
180
181
            # Container globals information
182
            # Example: [{u'Status': u'Up 36 seconds',
183
            #            u'Created': 1420378904,
184
            #            u'Image': u'nginx:1',
185
            #            u'Ports': [{u'Type': u'tcp', u'PrivatePort': 443},
186
            #                       {u'IP': u'0.0.0.0', u'Type': u'tcp', u'PublicPort': 8080, u'PrivatePort': 80}],
187
            #            u'Command': u"nginx -g 'daemon off;'",
188
            #            u'Names': [u'/webstack_nginx_1'],
189
            #            u'Id': u'b0da859e84eb4019cf1d965b15e9323006e510352c402d2f442ea632d61faaa5'}]
190
191
            # Update current containers list
192
            try:
193
                self.stats['containers'] = self.docker_client.containers() or []
194
            except Exception as e:
195
                logger.error("{0} plugin - Cannot get containers list ({1})".format(self.plugin_name, e))
196
                return self.stats
197
198
            # Start new thread for new container
199
            for container in self.stats['containers']:
200
                if container['Id'] not in self.thread_list:
201
                    # Thread did not exist in the internal dict
202
                    # Create it and add it to the internal dict
203
                    logger.debug("{0} plugin - Create thread for container {1}".format(self.plugin_name, container['Id'][:12]))
204
                    t = ThreadDockerGrabber(self.docker_client, container['Id'])
205
                    self.thread_list[container['Id']] = t
206
                    t.start()
207
208
            # Stop threads for non-existing containers
209
            nonexisting_containers = set(iterkeys(self.thread_list)) - set([c['Id'] for c in self.stats['containers']])
210
            for container_id in nonexisting_containers:
211
                # Stop the thread
212
                logger.debug("{0} plugin - Stop thread for old container {1}".format(self.plugin_name, container_id[:12]))
213
                self.thread_list[container_id].stop()
214
                # Delete the item from the dict
215
                del self.thread_list[container_id]
216
217
            # Get stats for all containers
218
            for container in self.stats['containers']:
219
                # The key is the container name and not the Id
220
                container['key'] = self.get_key()
221
222
                # Export name (first name in the list, without the /)
223
                container['name'] = container['Names'][0][1:]
224
225
                container['cpu'] = self.get_docker_cpu(container['Id'], self.thread_list[container['Id']].stats)
226
                container['memory'] = self.get_docker_memory(container['Id'], self.thread_list[container['Id']].stats)
227
                container['network'] = self.get_docker_network(container['Id'], self.thread_list[container['Id']].stats)
228
                container['io'] = self.get_docker_io(container['Id'], self.thread_list[container['Id']].stats)
229
230
        elif self.input_method == 'snmp':
231
            # Update stats using SNMP
232
            # Not available
233
            pass
234
235
        return self.stats
236
237
    def get_docker_cpu(self, container_id, all_stats):
238
        """Return the container CPU usage.
239
240
        Input: id is the full container id
241
               all_stats is the output of the stats method of the Docker API
242
        Output: a dict {'total': 1.49}
243
        """
244
        cpu_new = {}
245
        ret = {'total': 0.0}
246
247
        # Read the stats
248
        # For each container, you will find a pseudo-file cpuacct.stat,
249
        # containing the CPU usage accumulated by the processes of the container.
250
        # Those times are expressed in ticks of 1/USER_HZ of a second.
251
        # On x86 systems, USER_HZ is 100.
252
        try:
253
            cpu_new['total'] = all_stats['cpu_stats']['cpu_usage']['total_usage']
254
            cpu_new['system'] = all_stats['cpu_stats']['system_cpu_usage']
255
            cpu_new['nb_core'] = len(all_stats['cpu_stats']['cpu_usage']['percpu_usage'] or [])
256
        except KeyError as e:
257
            # all_stats do not have CPU information
258
            logger.debug("Can not grab CPU usage for container {0} ({1})".format(container_id, e))
259
        else:
260
            # Previous CPU stats stored in the cpu_old variable
261
            if not hasattr(self, 'cpu_old'):
262
                # First call, we init the cpu_old variable
263
                self.cpu_old = {}
264
                try:
265
                    self.cpu_old[container_id] = cpu_new
266
                except (IOError, UnboundLocalError):
267
                    pass
268
269
            if container_id not in self.cpu_old:
270
                try:
271
                    self.cpu_old[container_id] = cpu_new
272
                except (IOError, UnboundLocalError):
273
                    pass
274
            else:
275
                #
276
                cpu_delta = float(cpu_new['total'] - self.cpu_old[container_id]['total'])
277
                system_delta = float(cpu_new['system'] - self.cpu_old[container_id]['system'])
278
                if cpu_delta > 0.0 and system_delta > 0.0:
279
                    ret['total'] = (cpu_delta / system_delta) * float(cpu_new['nb_core']) * 100
280
281
                # Save stats to compute next stats
282
                self.cpu_old[container_id] = cpu_new
283
284
        # Return the stats
285
        return ret
286
287
    def get_docker_memory(self, container_id, all_stats):
288
        """Return the container MEMORY.
289
290
        Input: id is the full container id
291
               all_stats is the output of the stats method of the Docker API
292
        Output: a dict {'rss': 1015808, 'cache': 356352,  'usage': ..., 'max_usage': ...}
293
        """
294
        ret = {}
295
        # Read the stats
296
        try:
297
            ret['rss'] = all_stats['memory_stats']['stats']['rss']
298
            ret['cache'] = all_stats['memory_stats']['stats']['cache']
299
            ret['usage'] = all_stats['memory_stats']['usage']
300
            ret['max_usage'] = all_stats['memory_stats']['max_usage']
301
        except KeyError as e:
302
            # all_stats do not have MEM information
303
            logger.debug("Can not grab MEM usage for container {0} ({1})".format(container_id, e))
304
        # Return the stats
305
        return ret
306
307
    def get_docker_network(self, container_id, all_stats):
308
        """Return the container network usage using the Docker API (v1.0 or higher).
309
310
        Input: id is the full container id
311
        Output: a dict {'time_since_update': 3000, 'rx': 10, 'tx': 65}.
312
        with:
313
            time_since_update: number of seconds elapsed between the latest grab
314
            rx: Number of byte received
315
            tx: Number of byte transmited
316
        """
317
        # Init the returned dict
318
        network_new = {}
319
320
        # Read the rx/tx stats (in bytes)
321
        try:
322
            netcounters = all_stats["network"]
323
        except KeyError as e:
324
            # all_stats do not have NETWORK information
325
            logger.debug("Can not grab NET usage for container {0} ({1})".format(container_id, e))
326
            # No fallback available...
327
            return network_new
328
329
        # Previous network interface stats are stored in the network_old variable
330
        if not hasattr(self, 'inetcounters_old'):
331
            # First call, we init the network_old var
332
            self.netcounters_old = {}
333
            try:
334
                self.netcounters_old[container_id] = netcounters
335
            except (IOError, UnboundLocalError):
336
                pass
337
338
        if container_id not in self.netcounters_old:
339
            try:
340
                self.netcounters_old[container_id] = netcounters
341
            except (IOError, UnboundLocalError):
342
                pass
343
        else:
344
            # By storing time data we enable Rx/s and Tx/s calculations in the
345
            # XML/RPC API, which would otherwise be overly difficult work
346
            # for users of the API
347
            network_new['time_since_update'] = getTimeSinceLastUpdate('docker_net_{0}'.format(container_id))
348
            network_new['rx'] = netcounters["rx_bytes"] - self.netcounters_old[container_id]["rx_bytes"]
349
            network_new['tx'] = netcounters["tx_bytes"] - self.netcounters_old[container_id]["tx_bytes"]
350
            network_new['cumulative_rx'] = netcounters["rx_bytes"]
351
            network_new['cumulative_tx'] = netcounters["tx_bytes"]
352
353
            # Save stats to compute next bitrate
354
            self.netcounters_old[container_id] = netcounters
355
356
        # Return the stats
357
        return network_new
358
359
    def get_docker_io(self, container_id, all_stats):
360
        """Return the container IO usage using the Docker API (v1.0 or higher).
361
362
        Input: id is the full container id
363
        Output: a dict {'time_since_update': 3000, 'ior': 10, 'iow': 65}.
364
        with:
365
            time_since_update: number of seconds elapsed between the latest grab
366
            ior: Number of byte readed
367
            iow: Number of byte written
368
        """
369
        # Init the returned dict
370
        io_new = {}
371
372
        # Read the ior/iow stats (in bytes)
373
        try:
374
            iocounters = all_stats["blkio_stats"]
375
        except KeyError as e:
376
            # all_stats do not have io information
377
            logger.debug("Can not grab block IO usage for container {0} ({1})".format(container_id, e))
378
            # No fallback available...
379
            return io_new
380
381
        # Previous io interface stats are stored in the io_old variable
382
        if not hasattr(self, 'iocounters_old'):
383
            # First call, we init the io_old var
384
            self.iocounters_old = {}
385
            try:
386
                self.iocounters_old[container_id] = iocounters
387
            except (IOError, UnboundLocalError):
388
                pass
389
390
        if container_id not in self.iocounters_old:
391
            try:
392
                self.iocounters_old[container_id] = iocounters
393
            except (IOError, UnboundLocalError):
394
                pass
395
        else:
396
            # By storing time data we enable IoR/s and IoW/s calculations in the
397
            # XML/RPC API, which would otherwise be overly difficult work
398
            # for users of the API
399
            try:
400
                # Read IOR and IOW value in the structure list of dict
401
                ior = [i for i in iocounters['io_service_bytes_recursive'] if i['op'] == 'Read'][0]['value']
402
                iow = [i for i in iocounters['io_service_bytes_recursive'] if i['op'] == 'Write'][0]['value']
403
                ior_old = [i for i in self.iocounters_old[container_id]['io_service_bytes_recursive'] if i['op'] == 'Read'][0]['value']
404
                iow_old = [i for i in self.iocounters_old[container_id]['io_service_bytes_recursive'] if i['op'] == 'Write'][0]['value']
405
            except (IndexError, KeyError) as e:
406
                # all_stats do not have io information
407
                logger.debug("Can not grab block IO usage for container {0} ({1})".format(container_id, e))
408
            else:
409
                io_new['time_since_update'] = getTimeSinceLastUpdate('docker_io_{0}'.format(container_id))
410
                io_new['ior'] = ior - ior_old
411
                io_new['iow'] = iow - iow_old
412
                io_new['cumulative_ior'] = ior
413
                io_new['cumulative_iow'] = iow
414
415
                # Save stats to compute next bitrate
416
                self.iocounters_old[container_id] = iocounters
417
418
        # Return the stats
419
        return io_new
420
421
    def get_user_ticks(self):
422
        """Return the user ticks by reading the environment variable."""
423
        return os.sysconf(os.sysconf_names['SC_CLK_TCK'])
424
425
    def msg_curse(self, args=None):
426
        """Return the dict to display in the curse interface."""
427
        # Init the return message
428
        ret = []
429
430
        # Only process if stats exist (and non null) and display plugin enable...
431
        if not self.stats or args.disable_docker or len(self.stats['containers']) == 0:
432
            return ret
433
434
        # Build the string message
435
        # Title
436
        msg = '{0}'.format('CONTAINERS')
437
        ret.append(self.curse_add_line(msg, "TITLE"))
438
        msg = ' {0}'.format(len(self.stats['containers']))
439
        ret.append(self.curse_add_line(msg))
440
        msg = ' ({0} {1})'.format('served by Docker',
441
                                  self.stats['version']["Version"])
442
        ret.append(self.curse_add_line(msg))
443
        ret.append(self.curse_new_line())
444
        # Header
445
        ret.append(self.curse_new_line())
446
        # msg = '{0:>14}'.format('Id')
447
        # ret.append(self.curse_add_line(msg))
448
        # Get the maximum containers name (cutted to 20 char max)
449
        name_max_width = min(20, len(max(self.stats['containers'], key=lambda x: len(x['name']))['name']))
450
        msg = ' {0:{width}}'.format('Name', width=name_max_width)
451
        ret.append(self.curse_add_line(msg))
452
        msg = '{0:>26}'.format('Status')
453
        ret.append(self.curse_add_line(msg))
454
        msg = '{0:>6}'.format('CPU%')
455
        ret.append(self.curse_add_line(msg))
456
        msg = '{0:>7}'.format('MEM')
457
        ret.append(self.curse_add_line(msg))
458
        msg = '{0:>6}'.format('IOR/s')
459
        ret.append(self.curse_add_line(msg))
460
        msg = '{0:>6}'.format('IOW/s')
461
        ret.append(self.curse_add_line(msg))
462
        msg = '{0:>6}'.format('Rx/s')
463
        ret.append(self.curse_add_line(msg))
464
        msg = '{0:>6}'.format('Tx/s')
465
        ret.append(self.curse_add_line(msg))
466
        msg = ' {0:8}'.format('Command')
467
        ret.append(self.curse_add_line(msg))
468
        # Data
469
        for container in self.stats['containers']:
470
            ret.append(self.curse_new_line())
471
            # Id
472
            # msg = '{0:>14}'.format(container['Id'][0:12])
473
            # ret.append(self.curse_add_line(msg))
474
            # Name
475
            name = container['name']
476
            if len(name) > name_max_width:
477
                name = '_' + name[-name_max_width + 1:]
478
            else:
479
                name = name[:name_max_width]
480
            msg = ' {0:{width}}'.format(name, width=name_max_width)
481
            ret.append(self.curse_add_line(msg))
482
            # Status
483
            status = self.container_alert(container['Status'])
484
            msg = container['Status'].replace("minute", "min")
485
            msg = '{0:>26}'.format(msg[0:25])
486
            ret.append(self.curse_add_line(msg, status))
487
            # CPU
488
            try:
489
                msg = '{0:>6.1f}'.format(container['cpu']['total'])
490
            except KeyError:
491
                msg = '{0:>6}'.format('?')
492
            ret.append(self.curse_add_line(msg))
493
            # MEM
494
            try:
495
                msg = '{0:>7}'.format(self.auto_unit(container['memory']['usage']))
496
            except KeyError:
497
                msg = '{0:>7}'.format('?')
498
            ret.append(self.curse_add_line(msg))
499
            # IO R/W
500
            for r in ['ior', 'iow']:
501
                try:
502
                    value = self.auto_unit(int(container['io'][r] // container['io']['time_since_update'] * 8)) + "b"
503
                    msg = '{0:>6}'.format(value)
504
                except KeyError:
505
                    msg = '{0:>6}'.format('?')
506
                ret.append(self.curse_add_line(msg))
507
            # NET RX/TX
508
            for r in ['rx', 'tx']:
509
                try:
510
                    value = self.auto_unit(int(container['network'][r] // container['network']['time_since_update'] * 8)) + "b"
511
                    msg = '{0:>6}'.format(value)
512
                except KeyError:
513
                    msg = '{0:>6}'.format('?')
514
                ret.append(self.curse_add_line(msg))
515
            # Command
516
            msg = ' {0}'.format(container['Command'])
517
            ret.append(self.curse_add_line(msg, splittable=True))
518
519
        return ret
520
521
    def container_alert(self, status):
522
        """Analyse the container status."""
523
        if "Paused" in status:
524
            return 'CAREFUL'
525
        else:
526
            return 'OK'
527
528
529
class ThreadDockerGrabber(threading.Thread):
530
    """
531
    Specific thread to grab docker stats.
532
533
    stats is a dict
534
    """
535
536
    def __init__(self, docker_client, container_id):
537
        """Init the class:
538
        docker_client: instance of Docker-py client
539
        container_id: Id of the container"""
540
        logger.debug("docker plugin - Create thread for container {0}".format(container_id[:12]))
541
        super(ThreadDockerGrabber, self).__init__()
542
        # Event needed to stop properly the thread
543
        self._stopper = threading.Event()
544
        # The docker-py return stats as a stream
545
        self._container_id = container_id
546
        self._stats_stream = docker_client.stats(container_id, decode=True)
547
        # The class return the stats as a dict
548
        self._stats = {}
549
550
    def run(self):
551
        """Function called to grab stats.
552
        Infinite loop, should be stopped by calling the stop() method"""
553
554
        for i in self._stats_stream:
555
            self._stats = i
556
            time.sleep(0.1)
557
            if self.stopped():
558
                break
559
560
    @property
561
    def stats(self):
562
        """Stats getter"""
563
        return self._stats
564
565
    @stats.setter
566
    def stats(self, value):
567
        """Stats setter"""
568
        self._stats = value
569
570
    def stop(self, timeout=None):
571
        """Stop the thread"""
572
        logger.debug("docker plugin - Close thread for container {0}".format(self._container_id[:12]))
573
        self._stopper.set()
574
575
    def stopped(self):
576
        """Return True is the thread is stopped"""
577
        return self._stopper.isSet()
578