Completed
Push — master ( 8abb6d...59788e )
by Nicolas
01:26
created

Plugin.get_docker_cpu()   D

Complexity

Conditions 9

Size

Total Lines 50

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 9
c 1
b 0
f 0
dl 0
loc 50
rs 4
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# Copyright (C) 2015 Nicolargo <[email protected]>
6
#
7
# Glances is free software; you can redistribute it and/or modify
8
# it under the terms of the GNU Lesser General Public License as published by
9
# the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
11
#
12
# Glances is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
# GNU Lesser General Public License for more details.
16
#
17
# You should have received a copy of the GNU Lesser General Public License
18
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20
"""Docker plugin."""
21
22
import os
23
import re
24
import threading
25
import time
26
27
from glances.compat import iterkeys, itervalues
28
from glances.logger import logger
29
from glances.timer import getTimeSinceLastUpdate
30
from glances.plugins.glances_plugin import GlancesPlugin
31
32
# Docker-py library (optional and Linux-only)
33
# https://github.com/docker/docker-py
34
try:
35
    import docker
36
    import requests
37
except ImportError as e:
38
    logger.debug("Docker library not found (%s). Glances cannot grab Docker info." % e)
39
    docker_tag = False
40
else:
41
    docker_tag = True
42
43
44
class Plugin(GlancesPlugin):
45
46
    """Glances Docker plugin.
47
48
    stats is a list
49
    """
50
51
    def __init__(self, args=None):
52
        """Init the plugin."""
53
        super(Plugin, self).__init__(args=args)
54
55
        # The plgin can be disable using: args.disable_docker
56
        self.args = args
57
58
        # We want to display the stat in the curse interface
59
        self.display_curse = True
60
61
        # Init the Docker API
62
        self.docker_client = False
63
64
        # Dict of thread (to grab stats asynchroniously, one thread is created by container)
65
        # key: Container Id
66
        # value: instance of ThreadDockerGrabber
67
        self.thread_list = {}
68
69
    def exit(self):
70
        """Overwrite the exit method to close threads"""
71
        logger.debug("Stop the Docker plugin")
72
        for t in itervalues(self.thread_list):
73
            t.stop()
74
75
    def get_key(self):
76
        """Return the key of the list."""
77
        return 'name'
78
79
    def get_export(self):
80
        """Overwrite the default export method.
81
82
        - Only exports containers
83
        - The key is the first container name
84
        """
85
        ret = []
86
        try:
87
            ret = self.stats['containers']
88
        except KeyError as e:
89
            logger.debug("Docker export error {0}".format(e))
90
        return ret
91
92
    def connect(self, version=None):
93
        """Connect to the Docker server."""
94
        # Init connection to the Docker API
95
        try:
96
            if version is None:
97
                ret = docker.Client(base_url='unix://var/run/docker.sock')
98
            else:
99
                ret = docker.Client(base_url='unix://var/run/docker.sock',
100
                                    version=version)
101
        except NameError:
102
            # docker lib not found
103
            return None
104
        try:
105
            ret.version()
106
        except requests.exceptions.ConnectionError as e:
107
            # Connexion error (Docker not detected)
108
            # Let this message in debug mode
109
            logger.debug("Can't connect to the Docker server (%s)" % e)
110
            return None
111
        except docker.errors.APIError as e:
112
            if version is None:
113
                # API error (Version mismatch ?)
114
                logger.debug("Docker API error (%s)" % e)
115
                # Try the connection with the server version
116
                version = re.search('(?:server API version|server)\:\ (.*)\)\".*\)', str(e))
117
                if version:
118
                    logger.debug("Try connection with Docker API version %s" % version.group(1))
119
                    ret = self.connect(version=version.group(1))
120
                else:
121
                    logger.debug("Can not retreive Docker server version")
122
                    ret = None
123
            else:
124
                # API error
125
                logger.error("Docker API error (%s)" % e)
126
                ret = None
127
        except Exception as e:
128
            # Others exceptions...
129
            # Connexion error (Docker not detected)
130
            logger.error("Can't connect to the Docker server (%s)" % e)
131
            ret = None
132
133
        # Log an info if Docker plugin is disabled
134
        if ret is None:
135
            logger.debug("Docker plugin is disable because an error has been detected")
136
137
        return ret
138
139
    def reset(self):
140
        """Reset/init the stats."""
141
        self.stats = {}
142
143
    @GlancesPlugin._log_result_decorator
144
    def update(self):
145
        """Update Docker stats using the input method."""
146
        # Reset stats
147
        self.reset()
148
149
        # Get the current Docker API client
150
        if not self.docker_client:
151
            # First time, try to connect to the server
152
            self.docker_client = self.connect()
153
            if self.docker_client is None:
154
                global docker_tag
155
                docker_tag = False
156
157
        # The Docker-py lib is mandatory
158
        if not docker_tag or (self.args is not None and self.args.disable_docker):
159
            return self.stats
160
161
        if self.input_method == 'local':
162
            # Update stats
163
164
            # Docker version
165
            # Exemple: {
166
            #     "KernelVersion": "3.16.4-tinycore64",
167
            #     "Arch": "amd64",
168
            #     "ApiVersion": "1.15",
169
            #     "Version": "1.3.0",
170
            #     "GitCommit": "c78088f",
171
            #     "Os": "linux",
172
            #     "GoVersion": "go1.3.3"
173
            # }
174
            try:
175
                self.stats['version'] = self.docker_client.version()
176
            except Exception as e:
177
                # Correct issue#649
178
                logger.error("{0} plugin - Cannot get Docker version ({1})".format(self.plugin_name, e))
179
                return self.stats
180
181
            # Container globals information
182
            # Example: [{u'Status': u'Up 36 seconds',
183
            #            u'Created': 1420378904,
184
            #            u'Image': u'nginx:1',
185
            #            u'Ports': [{u'Type': u'tcp', u'PrivatePort': 443},
186
            #                       {u'IP': u'0.0.0.0', u'Type': u'tcp', u'PublicPort': 8080, u'PrivatePort': 80}],
187
            #            u'Command': u"nginx -g 'daemon off;'",
188
            #            u'Names': [u'/webstack_nginx_1'],
189
            #            u'Id': u'b0da859e84eb4019cf1d965b15e9323006e510352c402d2f442ea632d61faaa5'}]
190
191
            # Update current containers list
192
            try:
193
                self.stats['containers'] = self.docker_client.containers() or []
194
            except Exception as e:
195
                logger.error("{0} plugin - Cannot get containers list ({1})".format(self.plugin_name, e))
196
                return self.stats
197
198
            # Start new thread for new container
199
            for container in self.stats['containers']:
200
                if container['Id'] not in self.thread_list:
201
                    # Thread did not exist in the internal dict
202
                    # Create it and add it to the internal dict
203
                    logger.debug("{0} plugin - Create thread for container {1}".format(self.plugin_name, container['Id'][:12]))
204
                    t = ThreadDockerGrabber(self.docker_client, container['Id'])
205
                    self.thread_list[container['Id']] = t
206
                    t.start()
207
208
            # Stop threads for non-existing containers
209
            nonexisting_containers = set(iterkeys(self.thread_list)) - set([c['Id'] for c in self.stats['containers']])
210
            for container_id in nonexisting_containers:
211
                # Stop the thread
212
                logger.debug("{0} plugin - Stop thread for old container {1}".format(self.plugin_name, container_id[:12]))
213
                self.thread_list[container_id].stop()
214
                # Delete the item from the dict
215
                del self.thread_list[container_id]
216
217
            # Get stats for all containers
218
            for container in self.stats['containers']:
219
                # The key is the container name and not the Id
220
                container['key'] = self.get_key()
221
222
                # Export name (first name in the list, without the /)
223
                container['name'] = container['Names'][0][1:]
224
225
                container['cpu'] = self.get_docker_cpu(container['Id'], self.thread_list[container['Id']].stats)
226
                container['memory'] = self.get_docker_memory(container['Id'], self.thread_list[container['Id']].stats)
227
                container['network'] = self.get_docker_network(container['Id'], self.thread_list[container['Id']].stats)
228
                container['io'] = self.get_docker_io(container['Id'], self.thread_list[container['Id']].stats)
229
230
        elif self.input_method == 'snmp':
231
            # Update stats using SNMP
232
            # Not available
233
            pass
234
235
        return self.stats
236
237
    def get_docker_cpu(self, container_id, all_stats):
238
        """Return the container CPU usage.
239
240
        Input: id is the full container id
241
               all_stats is the output of the stats method of the Docker API
242
        Output: a dict {'total': 1.49}
243
        """
244
        cpu_new = {}
245
        ret = {'total': 0.0}
246
247
        # Read the stats
248
        # For each container, you will find a pseudo-file cpuacct.stat,
249
        # containing the CPU usage accumulated by the processes of the container.
250
        # Those times are expressed in ticks of 1/USER_HZ of a second.
251
        # On x86 systems, USER_HZ is 100.
252
        try:
253
            cpu_new['total'] = all_stats['cpu_stats']['cpu_usage']['total_usage']
254
            cpu_new['system'] = all_stats['cpu_stats']['system_cpu_usage']
255
            cpu_new['nb_core'] = len(all_stats['cpu_stats']['cpu_usage']['percpu_usage'] or [])
256
        except KeyError as e:
257
            # all_stats do not have CPU information
258
            logger.debug("Can not grab CPU usage for container {0} ({1})".format(container_id, e))
259
            logger.debug(all_stats)
260
        else:
261
            # Previous CPU stats stored in the cpu_old variable
262
            if not hasattr(self, 'cpu_old'):
263
                # First call, we init the cpu_old variable
264
                self.cpu_old = {}
265
                try:
266
                    self.cpu_old[container_id] = cpu_new
267
                except (IOError, UnboundLocalError):
268
                    pass
269
270
            if container_id not in self.cpu_old:
271
                try:
272
                    self.cpu_old[container_id] = cpu_new
273
                except (IOError, UnboundLocalError):
274
                    pass
275
            else:
276
                #
277
                cpu_delta = float(cpu_new['total'] - self.cpu_old[container_id]['total'])
278
                system_delta = float(cpu_new['system'] - self.cpu_old[container_id]['system'])
279
                if cpu_delta > 0.0 and system_delta > 0.0:
280
                    ret['total'] = (cpu_delta / system_delta) * float(cpu_new['nb_core']) * 100
281
282
                # Save stats to compute next stats
283
                self.cpu_old[container_id] = cpu_new
284
285
        # Return the stats
286
        return ret
287
288
    def get_docker_memory(self, container_id, all_stats):
289
        """Return the container MEMORY.
290
291
        Input: id is the full container id
292
               all_stats is the output of the stats method of the Docker API
293
        Output: a dict {'rss': 1015808, 'cache': 356352,  'usage': ..., 'max_usage': ...}
294
        """
295
        ret = {}
296
        # Read the stats
297
        try:
298
            # Do not exist anymore with Docker 1.11 (issue #848)
299
            # ret['rss'] = all_stats['memory_stats']['stats']['rss']
300
            # ret['cache'] = all_stats['memory_stats']['stats']['cache']
301
            ret['usage'] = all_stats['memory_stats']['usage']
302
            ret['limit'] = all_stats['memory_stats']['limit']
303
            ret['max_usage'] = all_stats['memory_stats']['max_usage']
304
        except (KeyError, TypeError) as e:
305
            # all_stats do not have MEM information
306
            logger.debug("Can not grab MEM usage for container {0} ({1})".format(container_id, e))
307
            logger.debug(all_stats)
308
        # Return the stats
309
        return ret
310
311
    def get_docker_network(self, container_id, all_stats):
312
        """Return the container network usage using the Docker API (v1.0 or higher).
313
314
        Input: id is the full container id
315
        Output: a dict {'time_since_update': 3000, 'rx': 10, 'tx': 65}.
316
        with:
317
            time_since_update: number of seconds elapsed between the latest grab
318
            rx: Number of byte received
319
            tx: Number of byte transmited
320
        """
321
        # Init the returned dict
322
        network_new = {}
323
324
        # Read the rx/tx stats (in bytes)
325
        try:
326
            netcounters = all_stats["networks"]
327
        except KeyError as e:
328
            # all_stats do not have NETWORK information
329
            logger.debug("Can not grab NET usage for container {0} ({1})".format(container_id, e))
330
            logger.debug(all_stats)
331
            # No fallback available...
332
            return network_new
333
334
        # Previous network interface stats are stored in the network_old variable
335
        if not hasattr(self, 'inetcounters_old'):
336
            # First call, we init the network_old var
337
            self.netcounters_old = {}
338
            try:
339
                self.netcounters_old[container_id] = netcounters
340
            except (IOError, UnboundLocalError):
341
                pass
342
343
        if container_id not in self.netcounters_old:
344
            try:
345
                self.netcounters_old[container_id] = netcounters
346
            except (IOError, UnboundLocalError):
347
                pass
348
        else:
349
            # By storing time data we enable Rx/s and Tx/s calculations in the
350
            # XML/RPC API, which would otherwise be overly difficult work
351
            # for users of the API
352
            try:
353
                network_new['time_since_update'] = getTimeSinceLastUpdate('docker_net_{0}'.format(container_id))
354
                network_new['rx'] = netcounters["eth0"]["rx_bytes"] - self.netcounters_old[container_id]["eth0"]["rx_bytes"]
355
                network_new['tx'] = netcounters["eth0"]["tx_bytes"] - self.netcounters_old[container_id]["eth0"]["tx_bytes"]
356
                network_new['cumulative_rx'] = netcounters["eth0"]["rx_bytes"]
357
                network_new['cumulative_tx'] = netcounters["eth0"]["tx_bytes"]
358
            except KeyError:
359
                # all_stats do not have INTERFACE information
360
                logger.debug("Can not grab network interface usage for container {0} ({1})".format(container_id, e))
361
                logger.debug(all_stats)
362
363
            # Save stats to compute next bitrate
364
            self.netcounters_old[container_id] = netcounters
365
366
        # Return the stats
367
        return network_new
368
369
    def get_docker_io(self, container_id, all_stats):
370
        """Return the container IO usage using the Docker API (v1.0 or higher).
371
372
        Input: id is the full container id
373
        Output: a dict {'time_since_update': 3000, 'ior': 10, 'iow': 65}.
374
        with:
375
            time_since_update: number of seconds elapsed between the latest grab
376
            ior: Number of byte readed
377
            iow: Number of byte written
378
        """
379
        # Init the returned dict
380
        io_new = {}
381
382
        # Read the ior/iow stats (in bytes)
383
        try:
384
            iocounters = all_stats["blkio_stats"]
385
        except KeyError as e:
386
            # all_stats do not have io information
387
            logger.debug("Can not grab block IO usage for container {0} ({1})".format(container_id, e))
388
            logger.debug(all_stats)
389
            # No fallback available...
390
            return io_new
391
392
        # Previous io interface stats are stored in the io_old variable
393
        if not hasattr(self, 'iocounters_old'):
394
            # First call, we init the io_old var
395
            self.iocounters_old = {}
396
            try:
397
                self.iocounters_old[container_id] = iocounters
398
            except (IOError, UnboundLocalError):
399
                pass
400
401
        if container_id not in self.iocounters_old:
402
            try:
403
                self.iocounters_old[container_id] = iocounters
404
            except (IOError, UnboundLocalError):
405
                pass
406
        else:
407
            # By storing time data we enable IoR/s and IoW/s calculations in the
408
            # XML/RPC API, which would otherwise be overly difficult work
409
            # for users of the API
410
            try:
411
                # Read IOR and IOW value in the structure list of dict
412
                ior = [i for i in iocounters['io_service_bytes_recursive'] if i['op'] == 'Read'][0]['value']
413
                iow = [i for i in iocounters['io_service_bytes_recursive'] if i['op'] == 'Write'][0]['value']
414
                ior_old = [i for i in self.iocounters_old[container_id]['io_service_bytes_recursive'] if i['op'] == 'Read'][0]['value']
415
                iow_old = [i for i in self.iocounters_old[container_id]['io_service_bytes_recursive'] if i['op'] == 'Write'][0]['value']
416
            except (IndexError, KeyError) as e:
417
                # all_stats do not have io information
418
                logger.debug("Can not grab block IO usage for container {0} ({1})".format(container_id, e))
419
            else:
420
                io_new['time_since_update'] = getTimeSinceLastUpdate('docker_io_{0}'.format(container_id))
421
                io_new['ior'] = ior - ior_old
422
                io_new['iow'] = iow - iow_old
423
                io_new['cumulative_ior'] = ior
424
                io_new['cumulative_iow'] = iow
425
426
                # Save stats to compute next bitrate
427
                self.iocounters_old[container_id] = iocounters
428
429
        # Return the stats
430
        return io_new
431
432
    def get_user_ticks(self):
433
        """Return the user ticks by reading the environment variable."""
434
        return os.sysconf(os.sysconf_names['SC_CLK_TCK'])
435
436
    def msg_curse(self, args=None):
437
        """Return the dict to display in the curse interface."""
438
        # Init the return message
439
        ret = []
440
441
        # Only process if stats exist (and non null) and display plugin enable...
442
        if not self.stats or args.disable_docker or len(self.stats['containers']) == 0:
443
            return ret
444
445
        # Build the string message
446
        # Title
447
        msg = '{0}'.format('CONTAINERS')
448
        ret.append(self.curse_add_line(msg, "TITLE"))
449
        msg = ' {0}'.format(len(self.stats['containers']))
450
        ret.append(self.curse_add_line(msg))
451
        msg = ' ({0} {1})'.format('served by Docker',
452
                                  self.stats['version']["Version"])
453
        ret.append(self.curse_add_line(msg))
454
        ret.append(self.curse_new_line())
455
        # Header
456
        ret.append(self.curse_new_line())
457
        # msg = '{0:>14}'.format('Id')
458
        # ret.append(self.curse_add_line(msg))
459
        # Get the maximum containers name (cutted to 20 char max)
460
        name_max_width = min(20, len(max(self.stats['containers'], key=lambda x: len(x['name']))['name']))
461
        msg = ' {0:{width}}'.format('Name', width=name_max_width)
462
        ret.append(self.curse_add_line(msg))
463
        msg = '{0:>26}'.format('Status')
464
        ret.append(self.curse_add_line(msg))
465
        msg = '{0:>6}'.format('CPU%')
466
        ret.append(self.curse_add_line(msg))
467
        msg = '{0:>7}'.format('MEM')
468
        ret.append(self.curse_add_line(msg))
469
        msg = '{0:>7}'.format('/MAX')
470
        ret.append(self.curse_add_line(msg))
471
        msg = '{0:>7}'.format('IOR/s')
472
        ret.append(self.curse_add_line(msg))
473
        msg = '{0:>7}'.format('IOW/s')
474
        ret.append(self.curse_add_line(msg))
475
        msg = '{0:>7}'.format('Rx/s')
476
        ret.append(self.curse_add_line(msg))
477
        msg = '{0:>7}'.format('Tx/s')
478
        ret.append(self.curse_add_line(msg))
479
        msg = ' {0:8}'.format('Command')
480
        ret.append(self.curse_add_line(msg))
481
        # Data
482
        for container in self.stats['containers']:
483
            ret.append(self.curse_new_line())
484
            # Id
485
            # msg = '{0:>14}'.format(container['Id'][0:12])
486
            # ret.append(self.curse_add_line(msg))
487
            # Name
488
            name = container['name']
489
            if len(name) > name_max_width:
490
                name = '_' + name[-name_max_width + 1:]
491
            else:
492
                name = name[:name_max_width]
493
            msg = ' {0:{width}}'.format(name, width=name_max_width)
494
            ret.append(self.curse_add_line(msg))
495
            # Status
496
            status = self.container_alert(container['Status'])
497
            msg = container['Status'].replace("minute", "min")
498
            msg = '{0:>26}'.format(msg[0:25])
499
            ret.append(self.curse_add_line(msg, status))
500
            # CPU
501
            try:
502
                msg = '{0:>6.1f}'.format(container['cpu']['total'])
503
            except KeyError:
504
                msg = '{0:>6}'.format('?')
505
            ret.append(self.curse_add_line(msg))
506
            # MEM
507
            try:
508
                msg = '{0:>7}'.format(self.auto_unit(container['memory']['usage']))
509
            except KeyError:
510
                msg = '{0:>7}'.format('?')
511
            ret.append(self.curse_add_line(msg))
512
            try:
513
                msg = '{0:>7}'.format(self.auto_unit(container['memory']['limit']))
514
            except KeyError:
515
                msg = '{0:>7}'.format('?')
516
            ret.append(self.curse_add_line(msg))
517
            # IO R/W
518
            for r in ['ior', 'iow']:
519
                try:
520
                    value = self.auto_unit(int(container['io'][r] // container['io']['time_since_update'] * 8)) + "b"
521
                    msg = '{0:>7}'.format(value)
522
                except KeyError:
523
                    msg = '{0:>7}'.format('?')
524
                ret.append(self.curse_add_line(msg))
525
            # NET RX/TX
526
            for r in ['rx', 'tx']:
527
                try:
528
                    value = self.auto_unit(int(container['network'][r] // container['network']['time_since_update'] * 8)) + "b"
529
                    msg = '{0:>7}'.format(value)
530
                except KeyError:
531
                    msg = '{0:>7}'.format('?')
532
                ret.append(self.curse_add_line(msg))
533
            # Command
534
            msg = ' {0}'.format(container['Command'])
535
            ret.append(self.curse_add_line(msg, splittable=True))
536
537
        return ret
538
539
    def container_alert(self, status):
540
        """Analyse the container status."""
541
        if "Paused" in status:
542
            return 'CAREFUL'
543
        else:
544
            return 'OK'
545
546
547
class ThreadDockerGrabber(threading.Thread):
548
    """
549
    Specific thread to grab docker stats.
550
551
    stats is a dict
552
    """
553
554
    def __init__(self, docker_client, container_id):
555
        """Init the class:
556
        docker_client: instance of Docker-py client
557
        container_id: Id of the container"""
558
        logger.debug("docker plugin - Create thread for container {0}".format(container_id[:12]))
559
        super(ThreadDockerGrabber, self).__init__()
560
        # Event needed to stop properly the thread
561
        self._stopper = threading.Event()
562
        # The docker-py return stats as a stream
563
        self._container_id = container_id
564
        self._stats_stream = docker_client.stats(container_id, decode=True)
565
        # The class return the stats as a dict
566
        self._stats = {}
567
568
    def run(self):
569
        """Function called to grab stats.
570
        Infinite loop, should be stopped by calling the stop() method"""
571
572
        for i in self._stats_stream:
573
            self._stats = i
574
            time.sleep(0.1)
575
            if self.stopped():
576
                break
577
578
    @property
579
    def stats(self):
580
        """Stats getter"""
581
        return self._stats
582
583
    @stats.setter
584
    def stats(self, value):
585
        """Stats setter"""
586
        self._stats = value
587
588
    def stop(self, timeout=None):
589
        """Stop the thread"""
590
        logger.debug("docker plugin - Close thread for container {0}".format(self._container_id[:12]))
591
        self._stopper.set()
592
593
    def stopped(self):
594
        """Return True is the thread is stopped"""
595
        return self._stopper.isSet()
596