Completed
Push — master ( 4834e9...08acac )
by Kenny
03:17
created

Redis.record_dbs()   C

Complexity

Conditions 7

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 26
rs 5.5
cc 7
1
# -*- coding: utf-8 -*-
2
import sys
3
4
PY3 = sys.version_info > (3,)
5
6
import re
0 ignored issues
show
Unused Code introduced by
The import re seems to be unused.
Loading history...
7
import os
0 ignored issues
show
Unused Code introduced by
The import os seems to be unused.
Loading history...
8
import time
9
import socket
10
import os.path
0 ignored issues
show
Unused Code introduced by
The import os.path seems to be unused.
Loading history...
11
from collections import deque
12
from collections import defaultdict
13
14
import plumd
15
from plumd.util import Differential
16
from plumd.util import get_file_list
0 ignored issues
show
Unused Code introduced by
Unused get_file_list imported from plumd.util
Loading history...
17
18
__author__ = 'Kenny Freeman'
19
__email__ = '[email protected]'
20
__license__ = "ISCL"
21
__docformat__ = 'reStructuredText'
22
23
24
class RedisError(Exception):
25
    pass
26
27
28
class RedisClient(object):
29
    """A minimal Redis client that supports only the commands needed
30
    for Redis metrics collection."""
31
32
    def __init__(self, log, addr, sfamily, timeout):
33
        """A minimal Redis client that supports only the commands needed
34
        for Redis metrics collection.
35
36
        :param log: A logger created from loggging.getLogger
37
        :type log: logging.RootLogger
38
        :param addr: Either a tuple of ('host', port) or a path to a unix socket
39
        :type addr: str or tuple(str, int)
40
        :param sfamily: The socket family eg. socket.AF_INET or AF_UNIX
41
        :type sfamily: int
42
        :param timeout: The timeout in seconds for all socket operations
43
        :type timeout: float or int
44
        """
45
        self.log = log
46
        self.net = RedisNet(log, addr, sfamily, timeout)
47
48
    def info(self, section=None):
49
        """Redis info command : http://redis.io/commands/info
50
51
        :param section: The info section to request from Redis
52
        :type section: str
53
        :raises RedisError: for any socket related exceptions
54
        :raises RedisError: for unexpcted server responses
55
        :rtype: dict
56
        """
57
        ret = {}
58
        section = "all" if section is None else section
59
        self.net.send("INFO {0}\r\n".format(section))
60
        for info_str in RedisResponse(self.net):
61
            for line in info_str.split("\n"):
62
                if not line or line[0] == "#" or line == '\r':
63
                    continue
64
                if line.find(":") >= 0:
65
                    key, val = line.split(":")
66
                ret[key] = val
67
        return ret
68
69
    def config_get_multi(self, globs):
70
        """Redis config get command : http://redis.io/commands/config-get
71
72
        :param globs: An containing glob search strings
73
        :type globs: iterable
74
        :raises RedisError: for any socket related exceptions
75
        :raises RedisError: for unexpcted server responses
76
        :rtype: dict
77
        """
78
        for glob_str  in globs:
79
            self.net.send("CONFIG GET {0}\r\n".format(glob_str))
80
            vals = [ val.strip() for val in RedisResponse(self.net) ]
0 ignored issues
show
Coding Style introduced by
No space allowed after bracket
vals = [ val.strip() for val in RedisResponse(self.net) ]
^
Loading history...
Coding Style introduced by
No space allowed before bracket
vals = [ val.strip() for val in RedisResponse(self.net) ]
^
Loading history...
81
            yield(dict(zip(vals[0::2], vals[1::2])))
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after yield.
Loading history...
82
83
    def scan(self, prefix, count=None):
84
        """Returns a deque of key names that match the requested prefix.
85
86
        :param prefix: The key prefix/glob to search for
87
        :type prefix: str
88
        :param count: The number of keys to request on each iteration
89
        :type count: int
90
        :raises RedisError: for any socket related exceptions
91
        :raises RedisError: for unexpcted server responses
92
        :rtype: deque
93
        """
94
        scan_cmd = "scan {0} match {1} count {2}\r\n"
95
        count = 10 if count is None else count
96
        matches = deque()
0 ignored issues
show
Unused Code introduced by
The variable matches seems to be unused.
Loading history...
97
        cursor = "0"
98
        while True:
99
            # send scan request
100
            self.net.send(scan_cmd.format(cursor, prefix, count))
101
            # response is the next cursor followed by a list of matches
102
            resp = RedisResponse(self.net)
103
            cursor = int(next(resp))
104
            for key in resp:
105
                yield(key.strip())
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after yield.
Loading history...
106
            if cursor == 0:
107
                break
108
109
    def llen_multi(self, keys):
110
        """Returns the total length of each key provided.
111
112
        :param keys: The iterable of keys to return the total length of.
113
        :type keys: iterable
114
        :raises RedisError: for any socket related exceptions
115
        :raises RedisError: for unexpcted server responses
116
        :rtype: int
117
        """
118
        llen_cmd = "llen {0}\r\n"
119
        total = 0
120
        for key in keys:
121
            # send scan request
122
            self.net.send(llen_cmd.format(key))
123
            # response should just be an int
124
            resp = RedisResponse(self.net)
125
            total += int(next(resp))
126
        return total
127
128
    def zcard_multi(self, keys):
129
        """Returns the total cardinality of each key provided.
130
131
        :param keys: The iterable of keys to return the total cardinality of.
132
        :type keys: iterable
133
        :raises RedisError: for any socket related exceptions
134
        :raises RedisError: for unexpcted server responses
135
        :rtype: int
136
        """
137
        zcard_cmd = "zcard {0}\r\n"
138
        total = 0
139
        for key in keys:
140
            # send scan request
141
            self.net.send(zcard_cmd.format(key))
142
            # response should just be an int
143
            resp = RedisResponse(self.net)
144
            total += int(next(resp))
145
        return total
146
147
    def scard_multi(self, keys):
148
        """Returns the total cardinality of each key provided.
149
150
        :param keys: The iterable of keys to return the total cardinality of.
151
        :type keys: iterable
152
        :raises RedisError: for any socket related exceptions
153
        :raises RedisError: for unexpcted server responses
154
        :rtype: int
155
        """
156
        scard_cmd = "scard {0}\r\n"
157
        total = 0
158
        for key in keys:
159
            # send scan request
160
            self.net.send(scard_cmd.format(key))
161
            # response should just be an int
162
            resp = RedisResponse(self.net)
163
            total += int(next(resp))
164
        return total
165
166
    def pfcount_multi(self, keys):
167
        """Returns the total cardinality of each key provided.
168
169
        :param keys: The iterable of keys to return the total cardinality of.
170
        :type keys: iterable
171
        :raises RedisError: for any socket related exceptions
172
        :raises RedisError: for unexpcted server responses
173
        :rtype: int
174
        """
175
        pfcount_cmd = "pfcount {0}\r\n"
176
        total = 0
177
        for key in keys:
178
            # send scan request
179
            self.net.send(pfcount_cmd.format(key))
180
            # response should just be an int
181
            resp = RedisResponse(self.net)
182
            total += int(next(resp))
183
        return total
184
185
186
class RedisResponse(object):
187
    """An iterable of redis command responses."""
188
189
    def __init__(self, reader):
190
        """An iterable of redis command responses.
191
192
        :param reader: A RedisNet reader instance
193
        :type reader: RedisNet
194
195
        :raises RedisError: for any socket related errors
196
        :raises RedisError: for any unknown responses
197
        :raises RedisError: for any Redis Errors returned
198
        :raises RedisError: for any ValueErrors encountered when casting
199
        """
200
        self.reader = reader
201
        # handlers consume responses and add them to self.vals
202
        self.func = defaultdict(lambda: self.h_unknown)
203
        #self.func["*"] = self.h_array
204
        self.func["*"] = lambda buff: self.parse(int(buff))
205
        #self.func["+"] = self.h_simple_str
206
        self.func["+"] = lambda buff: self.vals.append(str(buff))
207
        #self.func["$"] = self.h_bulk_str
208
        # remove the \r from the string
209
        self.func["$"] = lambda buff: \
210
         self.vals.append(self.reader.readnbytes(int(buff) + 2))
211
        #self.func[":"] = self.h_int
212
        self.func[":"] = lambda buff: self.vals.append(int(buff))
213
        self.func["-"] = self.h_error
214
        self.vals = deque()
215
        self.parse()
216
217
    def parse(self, nitems=None):
218
        """Reads the full response from self.sock.
219
220
        :raises RedisError: for any socket related Exceptions
221
        :raises RedisError: for any unknown types read
222
        :raises RedisError: for any redis protocol errors
223
        :rtype: varies
224
        """
225
        nitems = 1 if nitems is None else nitems
226
        for i in xrange(nitems):
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'xrange'
Loading history...
Unused Code introduced by
The variable i seems to be unused.
Loading history...
227
            try:
228
                buff = self.reader.readline()
229
                self.func[buff[0]](buff[1:])
230
            except (ValueError, IndexError) as e:
231
                msg = "could not parse response: {0}: {1}"
232
                raise RedisError(msg.format(buff, e))
233
234
    def h_unknown(self, buff):
235
        """Uknown response handler.
236
237
        :param buff: A response buffer read from Redis
238
        :type buff: str
239
        :raises RedisError: this function always raises a RedisError
240
        """
241
        raise RedisError("unknown command: {0}".format(buff))
242
243
    def h_error(self, buff):
244
        """Raises a RedisError and sets contents to buff.
245
246
        :param buff: A response buffer read from Redis
247
        :type buff: str
248
        :raises RedisError: on any socket related exceptions
249
        :rtype: str
250
        """
251
        msg = "RedisResponse: h_error({0})"
0 ignored issues
show
Unused Code introduced by
The variable msg seems to be unused.
Loading history...
252
        raise RedisError("redis error: {0}".format(buff))
253
254
    def __iter__(self):
255
        """A Redis command response iterator.
256
257
        :rtype: iterator
258
        """
259
        return self
260
261
    def __next__(self):
262
        """Return the next response, if any.
263
264
        :rtype: object
265
        """
266
        if not self.vals:
267
            raise StopIteration()
268
        return self.vals.popleft()
269
270
    def next(self):
271
        """Return the next response, if any.
272
273
        :rtype: object
274
        """
275
        if not self.vals:
276
            raise StopIteration()
277
        return self.vals.popleft()
278
279
280
class RedisNet(object):
281
    """A helper class that talks to Redis on a unix/tcp socket."""
282
283
    BUFF_LEN = 8192
284
285
    def __init__(self, log, addr, sfamily, timeout):
286
        """A helper class that talks to Redis on a unix/tcp socket.
287
288
        :param log: A logger created from loggging.getLogger
289
        :type log: logging.RootLogger
290
        :param addr: Either a tuple of ('host', port) or a path to a unix socket
291
        :type addr: str or tuple(str, int)
292
        :param sfamily: The socket family eg. socket.AF_INET or AF_UNIX
293
        :type sfamily: int
294
        :param timeout: The timeout in seconds for all socket operations
295
        :type timeout: float or int
296
        """
297
        self.log = log
298
        # addr can be unix socket or (host, port) tuple
299
        self.addr = addr
300
        # socket.AF_INET or socket.AF_UNIX
301
        self.sfamily = sfamily
302
        # all socket operations timeout
303
        self.timeout = timeout
304
        self.sock = None
305
        # read from our socket into this buffer
306
        # keep an index in the buffer that we've read up to
307
        # and record the total number of bytes in the buffer
308
        self.buff = ""
309
        self.buff_end = -1
310
        self.buff_i = -1
311
312 View Code Duplication
    def connect(self):
313
        """Connect to Redis.
314
315
        :raises RedisError: for any socket related exceptions
316
        :rtype: Exception or None
317
        """
318
        #self.log.debug("RedisNet: connect")
319
        if self.sock:
320
            self.disconnect()
321
        try:
322
            # create the socket
323
            self.sock = socket.socket(self.sfamily, socket.SOCK_STREAM)
324
            # set timeout for socket operations
325
            self.sock.settimeout(self.timeout)
326
            self.sock.connect(self.addr)
327
            msg = "RedisNet: connected: {0}"
328
            self.log.info(msg.format(self.addr))
329
        except Exception as e:
330
            msg = "RedisNet: Exception during connect: {0}"
331
            self.log.error(msg.format(e))
332
            raise RedisError(msg.format(e))
333
        return True
334
335
    def disconnect(self):
336
        """Disconnect from Redis.
337
338
        :raises RedisError: for any socket related exceptions
339
        """
340
        self.log.debug("RedisNet: disconnect")
341
        if self.sock:
342
            try:
343
                self.sock.close()
344
                self.sock = None
345
            except Exception as e:
346
                msg = "RedisNet: exception during disconnect: {0}"
347
                self.log.error(msg.format(e))
348
                raise RedisError(msg.format(e))
349
350
    def read(self):
351
        """Read RedisNet.BUFF_LEN bytes from our socket into self.buff.
352
353
        Calls here overwrite self.buff and reset self.buff_i and
354
        self.buff_end.
355
356
        :raises RedisError: for any socket related exceptions
357
        """
358
        #self.log.debug("RedisNet: read")
359
        if not self.sock and not self.connect():
360
            msg = "RedisNet: unable to connect to: {0}"
361
            raise RedisError(msg.format(self.addr))
362
363
        try:
364
            self.buff = self.sock.recv(RedisNet.BUFF_LEN)
365
            self.buff_end = len(self.buff)
366
            self.buff_i = 0
367
        except Exception as e:
368
            msg = "RedisNet: Exception during readline: {0}"
369
            self.log.error(msg.format(e))
370
            self.disconnect()
371
            raise RedisError(msg.format(e))
372
373
    def recv(self, nbytes):
374
        """Read nbytes from our socket and return it.
375
376
        :param nbytes: The number of bytes to read
377
        :type nbytes: int
378
        :raises RedisError: for any socket related exceptions
379
        :rytpe: str
380
        """
381
        #self.log.debug("RedisNet: recv({0})".format(nbytes))
382
        if not self.sock and not self.connect():
383
            msg = "RedisNet: unable to connect to: {0}"
384
            raise RedisError(msg.format(self.addr))
385
386
        ret = ""
387
        try:
388
            ret = self.sock.recv(nbytes)
389
        except Exception as e:
390
            msg = "RedisNet: Exception during recv: {0}"
391
            self.log.error(msg.format(e))
392
            self.disconnect()
393
            raise RedisError(msg.format(e))
394
        return ret
395
396
    def readline(self):
397
        """Get the next available line.
398
399
        :raises RedisError: for any socket related exceptions
400
        :rytpe: str
401
        """
402
        msg = "RedisNet: readline"
403
        #self.log.debug(msg)
404
        if not self.sock and not self.connect():
405
            msg = "RedisNet: unable to connect to: {0}"
406
            raise RedisError(msg.format(self.addr))
407
408
        buffs = deque()
409
        while True:
410
            # do we have any data available?
411
            if self.buff_end < 0 or self.buff_i >= self.buff_end:
412
                # read data, reset buffer state
413
                while self.buff_end < 1:
414
                    self.read()
415
            # now we have data, do we have a newline?
416
            i = self.buff[self.buff_i:].find("\n")
417
            if i > -1:
418
                # return line, advance buffer past it
419
                # move i past the newline
420
                # also need to find
421
                buff_i = self.buff_i
422
                buffs.append(self.buff[buff_i:buff_i+i])
423
                # advance beyond i
424
                self.buff_i = buff_i + i + 1
425
                # reset if we have no buffer left
426
                if self.buff_i >= self.buff_end:
427
                    self.buff_i = -1
428
                    self.buff_end = -1
429
                break
430
            # no newline yet, record and keep reading
431
            buffs.append(self.buff[self.buff_i:])
432
            self.buff_end = -1
433
            self.buff_i = -1
434
        ret = "".join(buffs)
435
        return ret
436
437
    def readnbytes(self, nbytes):
438
        """Read nbytes from our socket.
439
440
        :param nbytes: The number of bytes to read
441
        :type nbytes: int
442
        :raises RedisError: for any socket related exceptions
443
        :rytpe: str
444
        """
445
        #self.log.debug("RedisNet: readnbytes({0})".format(nbytes))
446
447
        # any bytes in our buffer?
448
        ret = ""
449
        buffs = deque()
450
        if self.buff_end and self.buff_i < self.buff_end:
451
            # do we have enough buffer to fullfill the request?
452
            nbytes_left = self.buff_end - self.buff_i
453
            if nbytes_left >= nbytes:
454
                # yes, advance our pointer
455
                buffi = self.buff_i
456
                buffs.append(self.buff[buffi:buffi+nbytes])
457
                self.buff_i += nbytes
458
                nbytes = 0
459
            else:
460
                # no, consume all of the buffer and then get remaining
461
                buffs.append(self.buff[self.buff_i:])
462
                # reset so next access on buffer forces a read
463
                self.buff_i = -1
464
                self.buff_end = -1
465
                nbytes -= nbytes_left
466
        # do we need more bytes?
467
        if nbytes:
468
            # just do a recv - don't use our buffer
469
            buffs.append(self.recv(nbytes))
470
471
        # join the buffers
472
        ret = "".join(buffs)
473
        return ret
474
475
    def send(self, cmd):
476
        """Send the supplied string to the redis server.
477
478
        :param cmd: The string to send to the redis server
479
        :type cmd: str
480
        :raises RedisError: for any socket related exceptions
481
        """
482
        if not self.sock and not self.connect():
483
            msg = "RedisNet: unable to connect to: {0}"
484
            raise RedisError(msg.format(self.addr))
485
        # send info request
486
        try:
487
            self.sock.sendall(cmd)
488
            #self.log.debug("RedisNet: send: {0}".format(cmd.strip()))
489
        except Exception as e:
490
            msg = "RedisNet: exception sending to server: {0}"
491
            self.log.error(msg.format(e))
492
            self.disconnect()
493
            raise RedisError(msg.format(e))
494
495
496
class Redis(plumd.Reader):
497
    """Plugin to record redis metrics."""
498
499
    # default config values
500
    defaults = {
501
        'poll.interval': 10,
502
        'gauges': [
503
            "aof_current_rewrite_time_sec",
504
            "aof_enabled",
505
            "aof_last_rewrite_time_sec",
506
            "aof_rewrite_in_progress",
507
            "aof_rewrite_scheduled",
508
            "blocked_clients",
509
            "client_biggest_input_buf",
510
            "client_longest_output_list",
511
            "connected_clients",
512
            "connected_slaves",
513
            "evicted_keys",
514
            "expired_keys",
515
            "instantaneous_input_kbps",
516
            "instantaneous_ops_per_sec",
517
            "instantaneous_output_kbps",
518
            "keyspace_hits",
519
            "keyspace_misses",
520
            "latest_fork_usec",
521
            "loading",
522
            "master_repl_offset",
523
            "mem_fragmentation_ratio",
524
            "pubsub_channels",
525
            "pubsub_patterns",
526
            "rdb_bgsave_in_progress",
527
            "rdb_changes_since_last_save",
528
            "rdb_current_bgsave_time_sec",
529
            "rdb_last_bgsave_time_sec",
530
            "rdb_last_save_time",
531
            "rejected_connections",
532
            "repl_backlog_active",
533
            "repl_backlog_first_byte_offset",
534
            "repl_backlog_histlen",
535
            "repl_backlog_size",
536
            "sync_full",
537
            "sync_partial_err",
538
            "sync_partial_ok",
539
            "total_commands_processed",
540
            "total_connections_received",
541
            "total_net_input_bytes",
542
            "total_net_output_bytes",
543
            "uptime_in_days",
544
            "uptime_in_seconds",
545
            "used_cpu_sys",
546
            "used_cpu_sys_children",
547
            "used_cpu_user",
548
            "used_cpu_user_children",
549
            "used_memory",
550
            "used_memory_lua",
551
            "used_memory_peak",
552
            "used_memory_rss",
553
            "master_last_io_seconds_ago",
554
            "master_sync_in_progress",
555
            "slave_repl_offset",
556
            "slave_priority",
557
            "slave_read_only",
558
            "connected_slaves",
559
            "master_repl_offset",
560
            "repl_backlog_active",
561
            "repl_backlog_size",
562
            "repl_backlog_first_byte_offset",
563
            "repl_backlog_histlen"
564
            "connected_slaves"
565
            ],
566
        'rates': [],
567
        'configs': [
568
            'maxmemory'
569
        ],
570
        'keys': {
571
            # 'type': { metric_prefix: [key_prefix*, ...] }
572
            'lists': {},
573
            'zsets': {},
574
            'sets': {},
575
            'hlls': {}
576
        },
577
        'addr': '127.0.0.1:6379',
578
        'addr_type': 'inet',
579
        'timeout': 10
580
    }
581
582
    def __init__(self, log, config):
583
        """Plugin to record redis metrics.
584
585
        :param log: A logger
586
        :type log: logging.RootLogger
587
        :param config: a plumd.config.Conf configuration helper instance.
588
        :type config: plumd.config.Conf
589
        """
590
        super(Redis, self).__init__(log, config)
591
        self.config.defaults(Redis.defaults)
592
593
        # metrics to record
594
        self.gauges = self.config.get('gauges')
595
        self.rates = self.config.get('rates')
596
        self.configs = self.config.get('configs')
597
        self.keys = self.config.get('keys')
598
599
        # Redis connection - either unix socket or tcp
600
        addr = self.config.get('addr')
601
        addr_type = self.config.get('addr_type').lower()
602
        if addr_type == "unix":
603
            sfamily = socket.AF_UNIX
604
        elif addr_type == "inet":
605
            try:
606
                host, port = addr.split(":")
607
            except AttributeError as e:
0 ignored issues
show
Unused Code introduced by
The variable e seems to be unused.
Loading history...
608
                msg = "Redis: invalid address: {0}, (host:port)"
609
                raise plumd.ConfigError(msg.format(addr))
610
            addr = (host, int(port))
611
            sfamily = socket.AF_INET
612
        else:
613
            msg = "Redis: unsupported connection type: {0} (unix, inet)"
614
            raise plumd.ConfigError(msg.format(addr_type))
615
        timeout = config.get('timeout')
616
        self.client = RedisClient(self.log, addr, sfamily, timeout)
617
        self.calc = Differential()
618
619
    def poll(self):
620
        """Query Redis for metrics.
621
622
        :rtype: ResultSet
623
        """
624
        # catch exceptions - simply skip the poll on error
625
        try:
626
            result = plumd.Result("redis")
627
628
            # config values
629
            self.record_configs(result)
630
631
            # key sizes
632
            self.record_sizes(result)
633
634
            # get server metrics
635
            stats = self.client.info()
636
637
            # record gauges, rates
638
            self.record_metrics(stats, result)
639
640
            # replication, if any slaves are connected
641
            if "slave0" in stats:
642
                self.record_slaves(stats, result)
643
644
            # db metrics, maxmem
645
            self.record_dbs(stats, result)
646
647
            # record lists, zsets, sets and hll sizes
648
            self.record_sizes(result)
649
650
            # and finally command stats - if available
651
            self.record_cmdstats(result)
652
653
        except RedisError as e:
654
            msg = "Redis: exception during poll: {0}"
655
            self.log.error(msg.format(e))
656
        return plumd.ResultSet([result])
657
658
    def record_cmdstats(self, result):
659
        """Record the stats from info commandstats.
660
661
        :param result: A result object to add metrics to
662
        :type result: ResultSet
663
        """
664
        ts = time.time()
0 ignored issues
show
Unused Code introduced by
The variable ts seems to be unused.
Loading history...
665
        name = self.name
666
        infos = self.client.info("commandstats")
667
        for key in sorted(infos.keys()):
668
            vals = infos[key].split(",")
669
            cstat, cname = key.split("_")
670
            for val in vals:
671
                mname, mval = val.split("=")
672
                metric = "{0}.{1}.{2}.{3}".format(name, cstat, cname, mname)
673
                result.add(plumd.Float(metric, mval))
674
675
    def record_metrics(self, stats, result):
676
        """Record the configured gauges and metrics
677
678
        :param stats: Dictionary returned from info command
679
        :type stats: dict
680
        :param result: A result object to add metrics to
681
        :type result: ResultSet
682
        """
683
        ts = time.time()
684
        name = self.name
685
686
        # record gauges
687
        for stat in self.gauges:
688
            if stat in stats:
689
                mname = "{0}.{1}".format(name, stat)
690
                result.add(plumd.Float(mname, stats[stat]))
691
692
        # record rates
693
        for stat in self.rates:
694
            if stat in stats:
695
                mname = "{0}.{1}".format(name, stat)
696
                mval = self.calc.per_second(mname, float(stats[stat]), ts)
697
                result.add(plumd.Float(mname, mval))
698
699
    def record_dbs(self, stats, result):
700
        """Record per database metrics into result.
701
702
        :param stats: Dictionary returned from info command
703
        :type stats: dict
704
        :param result: A result object to add metrics to
705
        :type result: ResultSet
706
        """
707
        # db0:keys=1,expires=0,avg_ttl=0
708
        name = self.name
709
        db_fmt = "db{0}"
710
        metric_fmt = "{0}.db.{1}.{2}"
711
712
        for i in xrange(0, len(stats.keys())):
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'xrange'
Loading history...
713
            dbname = db_fmt.format(i)
714
            if dbname not in stats:
715
                break
716
            try:
717
                vals = stats[dbname].split(",")
718
                dbmetrics = dict((k, v)
719
                                 for k, v in (v.split('=') for v in vals))
720
                for k, v in dbmetrics.items():
721
                    metric_str = metric_fmt.format(name, i, k)
722
                    result.add(plumd.Int(metric_str, v))
723
            except KeyError as E:
0 ignored issues
show
Unused Code introduced by
The variable E seems to be unused.
Loading history...
724
                self.log.error("Redis: invalid db entry: {0}".format(dbname))
725
726
    def record_slaves(self, stats, result):
727
        """Record slave metrics into result.
728
729
        :param stats: A dictionary returned from info command
730
        :type stats: dict
731
        :param result: A ResultSet object to add metrics to
732
        :type result: ResultSet
733
        """
734
        # slave0:ip=127.0.0.1,port=6399,state=online,offset=239,lag=1
735
        name = self.name
736
        slave_str = "slave{0}"
737
        moffstr = 'master_repl_offset'
738
        moffset = 0
739
        try:
740
            moffset = int(stats[moffstr])
741
        except(TypeError, KeyError) as e:
0 ignored issues
show
Unused Code introduced by
The variable e seems to be unused.
Loading history...
742
            self.log.error("Redis: no {0} value".format(moffstr))
743
744
        # for each slave entry
745
        for i in xrange(0, len(stats.keys())):
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'xrange'
Loading history...
746
            sname = slave_str.format(i)
747
            if sname not in stats:
748
                break
749
            try:
750
                vals = stats[sname].split(",")
751
                smetrics = dict((k, v)
752
                                for k, v in (v.split('=') for v in vals))
753
                sip = smetrics['ip'].replace(".", "_")
754
                smname = "{0}_{1}".format(sip, smetrics['port'])
755
756
                # record offset and lag
757
                mname = "{0}.slave.{1}.offset".format(name, smname)
758
                soffset = moffset - int(smetrics['offset'])
759
                result.add(plumd.Int(mname, soffset))
760
                mname = "{0}.slave.{1}.lag".format(name, sname)
761
                result.add(plumd.Int(mname, smetrics['lag']))
762
763
                # if slave is online set online = 1, otherwise 0
764
                sonline = 1 if smetrics['state'] == "online" else 0
765
                mname = "{0}.slave.{1}.online".format(name, sname)
766
                result.add(plumd.Int(mname, sonline))
767
            except(TypeError, KeyError, ValueError) as e:
768
                self.log.error("Redis: invalid slave entry: {0}".format(sname))
769
770
    def record_configs(self, result):
771
        """Record the configured configuration values.
772
773
        :param result: A ResultSet to record max mem to.
774
        :type result: plumd.ResultSet
775
        """
776
        configs = self.configs
777
        if not configs:
778
            return
779
        name = self.name
780
        for config in self.client.config_get_multi(configs):
781
            for key, val in config.items():
782
                mstr = "{0}.configs.{1}".format(name, key)
783
                result.add(plumd.Float(mstr, val))
784
785
    def record_sizes(self, result):
786
        """For each type of key (list, zset, set, hyperloglog)
787
        scan for a list of keys matching the prefix and record the
788
        total number of items for the matching prefix.
789
790
        :param result: A ResultSet to record into.
791
        :type result: plumd.ResultSet
792
        """
793
        if not self.keys:
794
            return
795
        keys = self.config.get("keys")
796
        if "lists" in keys:
797
            self.record_lists(keys['lists'], result)
798
        if "zsets" in keys:
799
            self.record_zsets(keys['zsets'], result)
800
        if "sets" in keys:
801
            self.record_sets(keys['sets'], result)
802
        if "hlls" in keys:
803
            self.record_hlls(keys['hlls'], result)
804
805
    def record_lists(self, lconfig, result):
806
        """Record the total length of the configured lists:
807
808
        eg. lconfig: {"metric_name": [ "list*", "of*", "globs*"]}
809
810
        :param lconfig: A dict of metric name => globs
811
        :type lconfig: dict
812
        :param result: A ResultSet to record into.
813
        :type result: plumd.ResultSet
814
        """
815
        name = self.name
816
        for mprefix, kprefixes in lconfig.items():
817
            for prefix in kprefixes:
818
                # get the total for this prefix
819
                total = self.client.llen_multi(self.client.scan(prefix))
820
                mstr = "{0}.sizes.lists.{1}".format(name, mprefix)
821
                result.add(plumd.Int(mstr, total))
822
823
    def record_zsets(self, zconfig, result):
824
        """Record the total length of the configured zsets:
825
826
        eg. zconfig: {"metric_name": [ "list*", "of*", "globs*"]}
827
828
        :param zconfig: A dict of metric name => globs
829
        :type zconfig: dict
830
        :param result: A ResultSet to record into.
831
        :type result: plumd.ResultSet
832
        """
833
        name = self.name
834
        for mprefix, kprefixes in zconfig.items():
835
            for prefix in kprefixes:
836
                # get the total for this prefix
837
                total = self.client.zcard_multi(self.client.scan(prefix))
838
                mstr = "{0}.sizes.zset.{1}".format(name, mprefix)
839
                result.add(plumd.Int(mstr, total))
840
841
    def record_sets(self, sconfig, result):
842
        """Record the total length of the configured zsets:
843
844
        eg. sconfig: {"metric_name": [ "list*", "of*", "globs*"]}
845
846
        :param sconfig: A dict of metric name => globs
847
        :type sconfig: dict
848
        :param result: A ResultSet to record into.
849
        :type result: plumd.ResultSet
850
        """
851
        name = self.name
852
        for mprefix, kprefixes in sconfig.items():
853
            for prefix in kprefixes:
854
                # get the total for this prefix
855
                total = self.client.scard_multi(self.client.scan(prefix))
856
                mstr = "{0}.sizes.set.{1}".format(name, mprefix)
857
                result.add(plumd.Int(mstr, total))
858
859
    def record_hlls(self, hllconfig, result):
860
        """Record the total length of the configured hlls:
861
862
        eg. sconfig: {"metric_name": [ "list*", "of*", "globs*"]}
863
864
        :param hllconfig: A dict of metric name => globs
865
        :type hllconfig: dict
866
        :param result: A ResultSet to record into.
867
        :type result: plumd.ResultSet
868
        """
869
        name = self.name
870
        for mprefix, kprefixes in hllconfig.items():
871
            for prefix in kprefixes:
872
                # get the total for this prefix
873
                total = self.client.pfcount_multi(self.client.scan(prefix))
874
                mstr = "{0}.sizes.hll.{1}".format(name, mprefix)
875
                result.add(plumd.Int(mstr, total))
876