Redis.record_zsets()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 21
rs 9.0534
cc 4
1
# -*- coding: utf-8 -*-
2
"""A redis reader plugin with builtin redis client."""
3
import sys
4
import time
5
import socket
6
from collections import deque
7
from collections import defaultdict
8
9
import plumd
10
from plumd.util import Differential
11
12
__author__ = 'Kenny Freeman'
13
__email__ = '[email protected]'
14
__license__ = "ISCL"
15
__docformat__ = 'reStructuredText'
16
17
PY3 = sys.version_info > (3,)
18
19
20
class RedisError(Exception):
21
    """A generic Redis error."""
22
23
    pass
24
25
26
class RedisClient(object):
27
    """A minimal Redis client."""
28
29
    def __init__(self, log, addr, sfamily, timeout):
30
        """A minimal Redis client.
31
32
        :param log: A logger created from loggging.getLogger
33
        :type log: logging.RootLogger
34
        :param addr: Either a tuple of ('host', port) or a path to a unix socket
35
        :type addr: str or tuple(str, int)
36
        :param sfamily: The socket family eg. socket.AF_INET or AF_UNIX
37
        :type sfamily: int
38
        :param timeout: The timeout in seconds for all socket operations
39
        :type timeout: float or int
40
        """
41
        self.log = log
42
        self.net = RedisNet(log, addr, sfamily, timeout)
43
44
    def info(self, section=None):
45
        """Return redis info.
46
47
        :param section: The info section to request from Redis
48
        :type section: str
49
        :raises RedisError: for any socket related exceptions
50
        :raises RedisError: for unexpcted server responses
51
        :rtype: dict
52
        """
53
        ret = {}
54
        section = "all" if section is None else section
55
        self.net.send("INFO {0}\r\n".format(section))
56
        for info_str in RedisResponse(self.net):
57
            for line in info_str.split("\n"):
58
                if not line or line[0] == "#" or line == '\r':
59
                    continue
60
                if line.find(":") >= 0:
61
                    key, val = line.split(":")
62
                ret[key] = val
63
        return ret
64
65
    def config_get_multi(self, globs):
66
        """Return redis config.
67
68
        :param globs: An containing glob search strings
69
        :type globs: iterable
70
        :raises RedisError: for any socket related exceptions
71
        :raises RedisError: for unexpcted server responses
72
        :rtype: dict
73
        """
74
        for glob_str in globs:
75
            self.net.send("CONFIG GET {0}\r\n".format(glob_str))
76
            vals = [val.strip() for val in RedisResponse(self.net)]
77
            yield dict(zip(vals[0::2], vals[1::2]))
78
79
    def scan(self, prefix, count=None):
80
        """Return a deque of key names that match the requested prefix.
81
82
        :param prefix: The key prefix/glob to search for
83
        :type prefix: str
84
        :param count: The number of keys to request on each iteration
85
        :type count: int
86
        :raises RedisError: for any socket related exceptions
87
        :raises RedisError: for unexpcted server responses
88
        :rtype: deque
89
        """
90
        scan_cmd = "scan {0} match {1} count {2}\r\n"
91
        count = 10 if count is None else count
92
        cursor = 0
93
        while True:
94
            # send scan request
95
            self.net.send(scan_cmd.format(cursor, prefix, count))
96
            # response is the next cursor followed by a list of matches
97
            resp = RedisResponse(self.net)
98
            cursor = 0
99
            try:
100
                cursor = int(next(resp))
101
            except ValueError as exc:
102
                err = "Redis: scan: invalid cursor: {0}: {1}"
103
                self.log.warn(err.format(prefix, exc))
104
            for key in resp:
105
                yield key.strip()
106
            if cursor == 0:
107
                break
108
109
    def get_multi(self, keys):
110
        """Return the total value of all matching keys.
111
112
        :param keys: The iterable of keys to return the total for.
113
        :type keys: iterable
114
        :raises RedisError: for any socket related exceptions
115
        :raises RedisError: for unexpcted server responses
116
        :raise ValueError: when a key doesn't cast to int
117
        :rtype: int
118
        """
119
        get_cmd = "get {0}\r\n"
120
        total = 0
121
        for key in keys:
122
            # send scan request
123
            self.net.send(get_cmd.format(key))
124
            # response should just be an int
125
            resp = RedisResponse(self.net)
126
            try:
127
                total += int(next(resp))
128
            except ValueError as exc:
129
                err = "Redis: get_multi: invalid get response: {0}: {1}"
130
                self.log.warn(err.format(key, exc))
131
        return total
132
133
    def llen_multi(self, keys):
134
        """Return the total length of each key provided.
135
136
        :param keys: The iterable of keys to return the total length of.
137
        :type keys: iterable
138
        :raises RedisError: for any socket related exceptions
139
        :raises RedisError: for unexpcted server responses
140
        :rtype: int
141
        """
142
        llen_cmd = "llen {0}\r\n"
143
        total = 0
144
        for key in keys:
145
            # send scan request
146
            self.net.send(llen_cmd.format(key))
147
            # response should just be an int
148
            resp = RedisResponse(self.net)
149
            try:
150
                total += int(next(resp))
151
            except ValueError as exc:
152
                err = "Redis: llen_multi: invalid llen response: {0}: {1}"
153
                self.log.warn(err.format(key, exc))
154
        return total
155
156
    def zcard_multi(self, keys):
157
        """Return the total cardinality of each key provided.
158
159
        :param keys: The iterable of keys to return the total cardinality of.
160
        :type keys: iterable
161
        :raises RedisError: for any socket related exceptions
162
        :raises RedisError: for unexpcted server responses
163
        :rtype: int
164
        """
165
        zcard_cmd = "zcard {0}\r\n"
166
        total = 0
167
        for key in keys:
168
            # send scan request
169
            self.net.send(zcard_cmd.format(key))
170
            # response should just be an int
171
            resp = RedisResponse(self.net)
172
            try:
173
                total += int(next(resp))
174
            except ValueError as exc:
175
                err = "Redis: zcard_multi: invalid llen response: {0}: {1}"
176
                self.log.warn(err.format(key, exc))
177
        return total
178
179
    def scard_multi(self, keys):
180
        """Return the total cardinality of each key provided.
181
182
        :param keys: The iterable of keys to return the total cardinality of.
183
        :type keys: iterable
184
        :raises RedisError: for any socket related exceptions
185
        :raises RedisError: for unexpcted server responses
186
        :rtype: int
187
        """
188
        scard_cmd = "scard {0}\r\n"
189
        total = 0
190
        for key in keys:
191
            # send scan request
192
            self.net.send(scard_cmd.format(key))
193
            # response should just be an int
194
            resp = RedisResponse(self.net)
195
            try:
196
                total += int(next(resp))
197
            except ValueError as exc:
198
                err = "Redis: scard_multi: invalid llen response: {0}: {1}"
199
                self.log.warn(err.format(key, exc))
200
        return total
201
202
    def pfcount_multi(self, keys):
203
        """Return the total cardinality of each key provided.
204
205
        :param keys: The iterable of keys to return the total cardinality of.
206
        :type keys: iterable
207
        :raises RedisError: for any socket related exceptions
208
        :raises RedisError: for unexpcted server responses
209
        :rtype: int
210
        """
211
        pfcount_cmd = "pfcount {0}\r\n"
212
        total = 0
213
        for key in keys:
214
            # send scan request
215
            self.net.send(pfcount_cmd.format(key))
216
            # response should just be an int
217
            resp = RedisResponse(self.net)
218
            try:
219
                total += int(next(resp))
220
            except ValueError as exc:
221
                err = "Redis: pfcount_multi: invalid llen response: {0}: {1}"
222
                self.log.warn(err.format(key, exc))
223
        return total
224
225
226
class RedisResponse(object):
227
    """An iterable of redis command responses."""
228
229
    def __init__(self, reader):
230
        """An iterable of redis command responses.
231
232
        :param reader: A RedisNet reader instance
233
        :type reader: RedisNet
234
235
        :raises RedisError: for any socket related errors
236
        :raises RedisError: for any unknown responses
237
        :raises RedisError: for any Redis Errors returned
238
        :raises RedisError: for any ValueErrors encountered when casting
239
        """
240
        self.reader = reader
241
        # handlers consume responses and add them to self.vals
242
        self.func = defaultdict(lambda: RedisResponse.h_unknown)
243
        self.func["*"] = lambda buff: self.parse(int(buff))
244
        self.func["+"] = lambda buff: self.vals.append(str(buff))
245
        # remove the \r from the string
246
        self.func["$"] = lambda buff: \
247
            self.vals.append(self.reader.readnbytes(int(buff) + 2))
248
        self.func[":"] = lambda buff: self.vals.append(int(buff))
249
        self.func["-"] = RedisResponse.h_error
250
        self.vals = deque()
251
        self.parse()
252
253
    def parse(self, nitems=None):
254
        """Read the full response from self.sock.
255
256
        :raises RedisError: for any socket related Exceptions
257
        :raises RedisError: for any unknown types read
258
        :raises RedisError: for any redis protocol errors
259
        :rtype: varies
260
        """
261
        nitems = 1 if nitems is None else nitems
262
        for i in xrange(nitems):
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'xrange'
Loading history...
Unused Code introduced by
The variable i seems to be unused.
Loading history...
263
            try:
264
                buff = self.reader.readline()
265
                self.func[buff[0]](buff[1:])
266
            except (ValueError, IndexError) as exc:
267
                msg = "could not parse response: {0}: {1}"
268
                raise RedisError(msg.format(buff, exc))
269
270
    @staticmethod
271
    def h_unknown(buff):
272
        """Uknown response handler.
273
274
        :param buff: A response buffer read from Redis
275
        :type buff: str
276
        :raises RedisError: this function always raises a RedisError
277
        """
278
        raise RedisError("unknown command: {0}".format(buff))
279
280
    @staticmethod
281
    def h_error(buff):
282
        """Raise a RedisError with unknown command buff.
283
284
        :param buff: A response buffer read from Redis
285
        :type buff: str
286
        :raises RedisError: on any socket related exceptions
287
        :rtype: str
288
        """
289
        msg = "RedisResponse: h_error({0})"
290
        raise RedisError(msg.format(buff))
291
292
    def __iter__(self):
293
        """A Redis command response iterator.
294
295
        :rtype: iterator
296
        """
297
        return self
298
299
    def __next__(self):
300
        """Return the next response, if any.
301
302
        :rtype: object
303
        """
304
        if not self.vals:
305
            raise StopIteration()
306
        return self.vals.popleft()
307
308
    def next(self):
309
        """Return the next response, if any.
310
311
        :rtype: object
312
        """
313
        if not self.vals:
314
            raise StopIteration()
315
        return self.vals.popleft()
316
317
318
class RedisNet(object):
319
    """A helper class that talks to Redis on a unix/tcp socket."""
320
321
    BUFF_LEN = 8192
322
323
    def __init__(self, log, addr, sfamily, timeout):
324
        """A helper class that talks to Redis on a unix/tcp socket.
325
326
        :param log: A logger created from loggging.getLogger
327
        :type log: logging.RootLogger
328
        :param addr: Either a tuple of ('host', port) or a path to a unix socket
329
        :type addr: str or tuple(str, int)
330
        :param sfamily: The socket family eg. socket.AF_INET or AF_UNIX
331
        :type sfamily: int
332
        :param timeout: The timeout in seconds for all socket operations
333
        :type timeout: float or int
334
        """
335
        self.log = log
336
        # addr can be unix socket or (host, port) tuple
337
        self.addr = addr
338
        # socket.AF_INET or socket.AF_UNIX
339
        self.sfamily = sfamily
340
        # all socket operations timeout
341
        self.timeout = timeout
342
        self.sock = None
343
        # read from our socket into this buffer
344
        # keep an index in the buffer that we've read up to
345
        # and record the total number of bytes in the buffer
346
        self.buff = ""
347
        self.buff_end = -1
348
        self.buff_i = -1
349
350
    def connect(self):
351
        """Connect to Redis.
352
353
        :raises RedisError: for any socket related exceptions
354
        :rtype: Exception or None
355
        """
356
        if self.sock:
357
            self.disconnect()
358
        try:
359
            # create the socket
360
            self.sock = socket.socket(self.sfamily, socket.SOCK_STREAM)
361
            # set timeout for socket operations
362
            self.sock.settimeout(self.timeout)
363
            self.sock.connect(self.addr)
364
            msg = "RedisNet: connected: {0}:{1}"
365
            self.log.info(msg.format(self.addr[0], self.addr[1]))
366
        except Exception as exc:
367
            msg = "RedisNet: Exception during connect: {0}"
368
            self.log.error(msg.format(exc))
369
            raise RedisError(msg.format(exc))
370
        return True
371
372
    def disconnect(self):
373
        """Disconnect from Redis.
374
375
        :raises RedisError: for any socket related exceptions
376
        """
377
        self.log.debug("RedisNet: disconnect")
378
        if self.sock:
379
            try:
380
                self.sock.close()
381
                self.sock = None
382
            except Exception as exc:
383
                msg = "RedisNet: exception during disconnect: {0}"
384
                self.log.error(msg.format(exc))
385
                raise RedisError(msg.format(exc))
386
387
    def read(self):
388
        """Read RedisNet.BUFF_LEN bytes from our socket into self.buff.
389
390
        Calls here overwrite self.buff and reset self.buff_i and
391
        self.buff_end.
392
393
        :raises RedisError: for any socket related exceptions
394
        """
395
        if not self.sock and not self.connect():
396
            msg = "RedisNet: unable to connect to: {0}"
397
            raise RedisError(msg.format(self.addr))
398
399
        try:
400
            self.buff = self.sock.recv(RedisNet.BUFF_LEN)
401
            self.buff_end = len(self.buff)
402
            self.buff_i = 0
403
        except Exception as exc:
404
            msg = "RedisNet: Exception during readline: {0}"
405
            self.log.error(msg.format(exc))
406
            self.disconnect()
407
            raise RedisError(msg.format(exc))
408
409
    def recv(self, nbytes):
410
        """Read nbytes from our socket and return it.
411
412
        :param nbytes: The number of bytes to read
413
        :type nbytes: int
414
        :raises RedisError: for any socket related exceptions
415
        :rytpe: str
416
        """
417
        if not self.sock and not self.connect():
418
            msg = "RedisNet: unable to connect to: {0}"
419
            raise RedisError(msg.format(self.addr))
420
421
        ret = ""
422
        try:
423
            ret = self.sock.recv(nbytes)
424
        except Exception as exc:
425
            msg = "RedisNet: Exception during recv: {0}"
426
            self.log.error(msg.format(exc))
427
            self.disconnect()
428
            raise RedisError(msg.format(exc))
429
        return ret
430
431
    def readline(self):
432
        """Get the next available line.
433
434
        :raises RedisError: for any socket related exceptions
435
        :rytpe: str
436
        """
437
        if not self.sock and not self.connect():
438
            msg = "RedisNet: unable to connect to: {0}"
439
            raise RedisError(msg.format(self.addr))
440
441
        buffs = deque()
442
        while True:
443
            # do we have any data available?
444
            if self.buff_end < 0 or self.buff_i >= self.buff_end:
445
                # read data, reset buffer state
446
                while self.buff_end < 1:
447
                    self.read()
448
            # now we have data, do we have a newline?
449
            i = self.buff[self.buff_i:].find("\n")
450
            if i > -1:
451
                # return line, advance buffer past it
452
                # move i past the newline
453
                # also need to find
454
                buff_i = self.buff_i
455
                buffs.append(self.buff[buff_i:buff_i+i])
456
                # advance beyond i
457
                self.buff_i = buff_i + i + 1
458
                # reset if we have no buffer left
459
                if self.buff_i >= self.buff_end:
460
                    self.buff_i = -1
461
                    self.buff_end = -1
462
                break
463
            # no newline yet, record and keep reading
464
            buffs.append(self.buff[self.buff_i:])
465
            self.buff_end = -1
466
            self.buff_i = -1
467
        ret = "".join(buffs)
468
        return ret
469
470
    def readnbytes(self, nbytes):
471
        """Read nbytes from our socket.
472
473
        :param nbytes: The number of bytes to read
474
        :type nbytes: int
475
        :raises RedisError: for any socket related exceptions
476
        :rytpe: str
477
        """
478
479
        # any bytes in our buffer?
480
        ret = ""
481
        buffs = deque()
482
        if self.buff_end and self.buff_i < self.buff_end:
483
            # do we have enough buffer to fullfill the request?
484
            nbytes_left = self.buff_end - self.buff_i
485
            if nbytes_left >= nbytes:
486
                # yes, advance our pointer
487
                buffi = self.buff_i
488
                buffs.append(self.buff[buffi:buffi+nbytes])
489
                self.buff_i += nbytes
490
                nbytes = 0
491
            else:
492
                # no, consume all of the buffer and then get remaining
493
                buffs.append(self.buff[self.buff_i:])
494
                # reset so next access on buffer forces a read
495
                self.buff_i = -1
496
                self.buff_end = -1
497
                nbytes -= nbytes_left
498
        # do we need more bytes?
499
        if nbytes:
500
            # just do a recv - don't use our buffer
501
            buffs.append(self.recv(nbytes))
502
503
        # join the buffers
504
        ret = "".join(buffs)
505
        return ret
506
507
    def send(self, cmd):
508
        """Send the supplied string to the redis server.
509
510
        :param cmd: The string to send to the redis server
511
        :type cmd: str
512
        :raises RedisError: for any socket related exceptions
513
        """
514
        if not self.sock and not self.connect():
515
            msg = "RedisNet: unable to connect to: {0}"
516
            raise RedisError(msg.format(self.addr))
517
        # send info request
518
        try:
519
            self.sock.sendall(cmd)
520
        except Exception as exc:
521
            msg = "RedisNet: exception sending to server: {0}"
522
            self.log.error(msg.format(exc))
523
            self.disconnect()
524
            raise RedisError(msg.format(exc))
525
526
527
class Redis(plumd.Reader):
528
    """Plugin to record redis metrics."""
529
530
    # default config values
531
    defaults = {
532
        'poll.interval': 10,
533
        'gauges': [
534
            "aof_current_rewrite_time_sec",
535
            "aof_enabled",
536
            "aof_last_rewrite_time_sec",
537
            "aof_rewrite_in_progress",
538
            "aof_rewrite_scheduled",
539
            "blocked_clients",
540
            "client_biggest_input_buf",
541
            "client_longest_output_list",
542
            "connected_clients",
543
            "connected_slaves",
544
            "evicted_keys",
545
            "expired_keys",
546
            "instantaneous_input_kbps",
547
            "instantaneous_ops_per_sec",
548
            "instantaneous_output_kbps",
549
            "keyspace_hits",
550
            "keyspace_misses",
551
            "latest_fork_usec",
552
            "loading",
553
            "master_repl_offset",
554
            "mem_fragmentation_ratio",
555
            "pubsub_channels",
556
            "pubsub_patterns",
557
            "rdb_bgsave_in_progress",
558
            "rdb_changes_since_last_save",
559
            "rdb_current_bgsave_time_sec",
560
            "rdb_last_bgsave_time_sec",
561
            "rdb_last_save_time",
562
            "rejected_connections",
563
            "repl_backlog_active",
564
            "repl_backlog_first_byte_offset",
565
            "repl_backlog_histlen",
566
            "repl_backlog_size",
567
            "sync_full",
568
            "sync_partial_err",
569
            "sync_partial_ok",
570
            "total_commands_processed",
571
            "total_connections_received",
572
            "total_net_input_bytes",
573
            "total_net_output_bytes",
574
            "uptime_in_days",
575
            "uptime_in_seconds",
576
            "used_cpu_sys",
577
            "used_cpu_sys_children",
578
            "used_cpu_user",
579
            "used_cpu_user_children",
580
            "used_memory",
581
            "used_memory_lua",
582
            "used_memory_peak",
583
            "used_memory_rss",
584
            "master_last_io_seconds_ago",
585
            "master_sync_in_progress",
586
            "slave_repl_offset",
587
            "slave_priority",
588
            "slave_read_only",
589
            "connected_slaves",
590
            "master_repl_offset",
591
            "repl_backlog_active",
592
            "repl_backlog_size",
593
            "repl_backlog_first_byte_offset",
594
            "repl_backlog_histlen"
595
            "connected_slaves"
596
            ],
597
        'rates': [],
598
        'configs': [
599
            'maxmemory'
600
        ],
601
        'keys': {
602
            # 'type': { metric_prefix: [key_prefix*, ...] }
603
            'lists': {},
604
            'zsets': {},
605
            'sets': {},
606
            'hlls': {}
607
        },
608
        'addr': '127.0.0.1:6379',
609
        'addr_type': 'inet',
610
        'timeout': 10
611
    }
612
613
    def __init__(self, log, config):
614
        """Plugin to record redis metrics.
615
616
        :param log: A logger
617
        :type log: logging.RootLogger
618
        :param config: a plumd.config.Conf configuration helper instance.
619
        :type config: plumd.config.Conf
620
        """
621
        super(Redis, self).__init__(log, config)
622
        self.config.defaults(Redis.defaults)
623
624
        # metrics to record
625
        self.gauges = self.config.get('gauges')
626
        self.rates = self.config.get('rates')
627
        self.configs = self.config.get('configs')
628
        self.keys = self.config.get('keys')
629
630
        # Redis connection - either unix socket or tcp
631
        addr = self.config.get('addr')
632
        addr_type = self.config.get('addr_type').lower()
633
        if addr_type == "unix":
634
            sfamily = socket.AF_UNIX
635
        elif addr_type == "inet":
636
            try:
637
                host, port = addr.split(":")
638
            except AttributeError:
639
                msg = "Redis: invalid address: {0}, (host:port)"
640
                raise plumd.ConfigError(msg.format(addr))
641
            addr = (host, int(port))
642
            sfamily = socket.AF_INET
643
        else:
644
            msg = "Redis: unsupported connection type: {0} (unix, inet)"
645
            raise plumd.ConfigError(msg.format(addr_type))
646
        timeout = config.get('timeout')
647
        self.client = RedisClient(self.log, addr, sfamily, timeout)
648
        self.calc = Differential()
649
650
    def poll(self):
651
        """Query Redis for metrics.
652
653
        :rtype: ResultSet
654
        """
655
        # catch exceptions - simply skip the poll on error
656
        try:
657
            result = plumd.Result("redis")
658
659
            # config values
660
            self.record_configs(result)
661
662
            # key sizes
663
            self.record_sizes(result)
664
665
            # get server metrics
666
            stats = self.client.info()
667
668
            # record gauges, rates
669
            self.record_metrics(stats, result)
670
671
            # replication, if any slaves are connected
672
            if "slave0" in stats:
673
                self.record_slaves(stats, result)
674
675
            # db metrics, maxmem
676
            self.record_dbs(stats, result)
677
678
            # record lists, zsets, sets and hll sizes
679
            self.record_sizes(result)
680
681
            # and finally command stats - if available
682
            self.record_cmdstats(result)
683
684
        except RedisError as exc:
685
            msg = "Redis: exception during poll: {0}"
686
            self.log.error(msg.format(exc))
687
        return plumd.ResultSet([result])
688
689
    def record_cmdstats(self, result):
690
        """Record the stats from info commandstats.
691
692
        :param result: A result object to add metrics to
693
        :type result: ResultSet
694
        """
695
        name = self.name
696
        infos = self.client.info("commandstats")
697
        for key in sorted(infos.keys()):
698
            vals = infos[key].split(",")
699
            cstat, cname = key.split("_")
700
            for val in vals:
701
                mname, mval = val.split("=")
702
                metric = "{0}.{1}.{2}.{3}".format(name, cstat, cname, mname)
703
                result.add(plumd.Float(metric, mval))
704
705
    def record_metrics(self, stats, result):
706
        """Record the configured gauges and metrics.
707
708
        :param stats: Dictionary returned from info command
709
        :type stats: dict
710
        :param result: A result object to add metrics to
711
        :type result: ResultSet
712
        """
713
        timest = time.time()
714
        name = self.name
715
716
        # record gauges
717
        for stat in self.gauges:
718
            if stat in stats:
719
                mname = "{0}.{1}".format(name, stat)
720
                result.add(plumd.Float(mname, stats[stat]))
721
722
        # record rates
723
        for stat in self.rates:
724
            if stat in stats:
725
                mname = "{0}.{1}".format(name, stat)
726
                mval = self.calc.per_second(mname, float(stats[stat]), timest)
727
                result.add(plumd.Float(mname, mval))
728
729
    def record_dbs(self, stats, result):
730
        """Record per database metrics into result.
731
732
        :param stats: Dictionary returned from info command
733
        :type stats: dict
734
        :param result: A result object to add metrics to
735
        :type result: ResultSet
736
        """
737
        # db0:keys=1,expires=0,avg_ttl=0
738
        name = self.name
739
        db_fmt = "db{0}"
740
        metric_fmt = "{0}.db.{1}.{2}"
741
742
        for i in xrange(0, len(stats.keys())):
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'xrange'
Loading history...
743
            dbname = db_fmt.format(i)
744
            if dbname not in stats:
745
                break
746
            try:
747
                vals = stats[dbname].split(",")
748
                dbmetrics = dict((k, v)
749
                                 for k, v in (v.split('=') for v in vals))
750
                for key, val in dbmetrics.items():
751
                    metric_str = metric_fmt.format(name, i, key)
752
                    result.add(plumd.Int(metric_str, val))
753
            except KeyError:
754
                self.log.error("Redis: invalid db entry: {0}".format(dbname))
755
756
    def record_slaves(self, stats, result):
757
        """Record slave metrics into result.
758
759
        :param stats: A dictionary returned from info command
760
        :type stats: dict
761
        :param result: A ResultSet object to add metrics to
762
        :type result: ResultSet
763
        """
764
        # slave0:ip=127.0.0.1,port=6399,state=online,offset=239,lag=1
765
        name = self.name
766
        slave_str = "slave{0}"
767
        moffstr = 'master_repl_offset'
768
        moffset = 0
769
        try:
770
            moffset = int(stats[moffstr])
771
        except(TypeError, KeyError):
772
            self.log.error("Redis: no {0} value".format(moffstr))
773
774
        # for each slave entry
775
        for i in xrange(0, len(stats.keys())):
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'xrange'
Loading history...
776
            sname = slave_str.format(i)
777
            if sname not in stats:
778
                break
779
            try:
780
                vals = stats[sname].split(",")
781
                smetrics = dict((k, v)
782
                                for k, v in (v.split('=') for v in vals))
783
                sip = smetrics['ip'].replace(".", "_")
784
                smname = "{0}_{1}".format(sip, smetrics['port'])
785
786
                # record offset and lag
787
                mname = "{0}.slave.{1}.offset".format(name, smname)
788
                soffset = moffset - int(smetrics['offset'])
789
                result.add(plumd.Int(mname, soffset))
790
                mname = "{0}.slave.{1}.lag".format(name, sname)
791
                result.add(plumd.Int(mname, smetrics['lag']))
792
793
                # if slave is online set online = 1, otherwise 0
794
                sonline = 1 if smetrics['state'] == "online" else 0
795
                mname = "{0}.slave.{1}.online".format(name, sname)
796
                result.add(plumd.Int(mname, sonline))
797
            except(TypeError, KeyError, ValueError):
798
                self.log.error("Redis: invalid slave entry: {0}".format(sname))
799
800
    def record_configs(self, result):
801
        """Record the configured configuration values.
802
803
        :param result: A ResultSet to record max mem to.
804
        :type result: plumd.ResultSet
805
        """
806
        configs = self.configs
807
        if not configs:
808
            return
809
        name = self.name
810
        for config in self.client.config_get_multi(configs):
811
            for key, val in config.items():
812
                mstr = "{0}.configs.{1}".format(name, key)
813
                result.add(plumd.Float(mstr, val))
814
815
    def record_sizes(self, result):
816
        """Record the total sizes of the configured keys.
817
818
        For each type of key (list, zset, set, hyperloglog) record
819
        total number of items for all keys.
820
821
        :param result: A ResultSet to record into.
822
        :type result: plumd.ResultSet
823
        """
824
        if not self.keys:
825
            return
826
        keys = self.config.get("keys")
827
        if "strings" in keys:
828
            self.record_strings(keys['strings'], result)
829
        if "lists" in keys:
830
            self.record_lists(keys['lists'], result)
831
        if "zsets" in keys:
832
            self.record_zsets(keys['zsets'], result)
833
        if "sets" in keys:
834
            self.record_sets(keys['sets'], result)
835
        if "hlls" in keys:
836
            self.record_hlls(keys['hlls'], result)
837
838
    def record_strings(self, lconfig, result):
839
        """Record the total size of the configured string keys.
840
841
        eg. lconfig: {"metric_name": [ "list", "of", "keys"]}
842
843
        :param lconfig: A dict of metric name => list of key names
844
        :type lconfig: dict
845
        :param result: A ResultSet to record into.
846
        :type result: plumd.ResultSet
847
        """
848
        name = self.name
849
        for mprefix, keys in lconfig.items():
850
            total = 0
851
            # get the total for this prefix
852
            try:
853
                total = self.client.get_multi(keys)
854
            except RedisError as exc:
855
                msg = "ERROR: redis: record_strings: are {0} strings? {1}"
856
                print(msg.format(mprefix, exc))
857
            else:
858
                mstr = "{0}.sizes.strings.{1}".format(name, mprefix)
859
                result.add(plumd.Int(mstr, total))
860
861
    def record_lists(self, lconfig, result):
862
        """Record the total length of the configured lists.
863
864
        eg. lconfig: {"metric_name": [ "list", "of", "keys"]}
865
866
        :param lconfig: A dict of metric name => list of key names
867
        :type lconfig: dict
868
        :param result: A ResultSet to record into.
869
        :type result: plumd.ResultSet
870
        """
871
        name = self.name
872
        for mprefix, keys in lconfig.items():
873
            total = 0
874
            # get the total for this prefix
875
            try:
876
                total = self.client.llen_multi(keys)
877
            except RedisError as exc:
878
                msg = "ERROR: redis: record_lists: are {0} lists? {1}"
879
                print(msg.format(mprefix, exc))
880
            mstr = "{0}.sizes.lists.{1}".format(name, mprefix)
881
            result.add(plumd.Int(mstr, total))
882
883
    def record_zsets(self, zconfig, result):
884
        """Record the total length of the configured zsets.
885
886
        eg. zconfig: {"metric_name": [ "list", "of", "keys"]}
887
888
        :param zconfig: A dict of metric name => list of key names
889
        :type zconfig: dict
890
        :param result: A ResultSet to record into.
891
        :type result: plumd.ResultSet
892
        """
893
        name = self.name
894
        for mprefix, keys in zconfig.items():
895
            # get the total for this prefix
896
            try:
897
                total = self.client.zcard_multi(keys)
898
            except RedisError as exc:
899
                msg = "ERROR: redis: record_zsets: are {0} zsets? {1}"
900
                print(msg.format(mprefix, exc))
901
            else:
902
                mstr = "{0}.sizes.zset.{1}".format(name, mprefix)
903
                result.add(plumd.Int(mstr, total))
904
905
    def record_sets(self, sconfig, result):
906
        """Record the total length of the configured sets.
907
908
        eg. sconfig: {"metric_name": [ "list", "of", "keys"]}
909
910
        :param sconfig: A dict of metric name => list of key names
911
        :type sconfig: dict
912
        :param result: A ResultSet to record into.
913
        :type result: plumd.ResultSet
914
        """
915
        name = self.name
916
        for mprefix, keys in sconfig.items():
917
            # get the total for this prefix
918
            try:
919
                total = self.client.scard_multi(keys)
920
            except RedisError as exc:
921
                msg = "ERROR: redis: record_sets: are {0} sets? {1}"
922
                print(msg.format(mprefix, exc))
923
            else:
924
                mstr = "{0}.sizes.set.{1}".format(name, mprefix)
925
                result.add(plumd.Int(mstr, total))
926
927
    def record_hlls(self, hllconfig, result):
928
        """Record the total length of the configured hlls.
929
930
        eg. sconfig: {"metric_name": [ "list", "of", "keys"]}
931
932
        :param hllconfig: A dict of metric name => list of key names
933
        :type hllconfig: dict
934
        :param result: A ResultSet to record into.
935
        :type result: plumd.ResultSet
936
        """
937
        name = self.name
938
        for mprefix, keys in hllconfig.items():
939
            # get the total for this prefix
940
            try:
941
                total = self.client.pfcount_multi(keys)
942
            except RedisError as exc:
943
                msg = "ERROR: redis: record_hlls: are {0} hyperloglogs? {1}"
944
                print(msg.format(mprefix, exc))
945
            else:
946
                mstr = "{0}.sizes.hll.{1}".format(name, mprefix)
947
                result.add(plumd.Int(mstr, total))
948