Completed
Push — master ( 56a309...01d752 )
by Kenny
01:17
created

RedisClient.get_multi()   A

Complexity

Conditions 2

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 19
rs 9.4285
cc 2
1
# -*- coding: utf-8 -*-
2
"""A redis reader plugin with builtin redis client."""
3
import sys
4
import time
5
import socket
6
from collections import deque
7
from collections import defaultdict
8
9
import plumd
10
from plumd.util import Differential
11
12
__author__ = 'Kenny Freeman'
13
__email__ = '[email protected]'
14
__license__ = "ISCL"
15
__docformat__ = 'reStructuredText'
16
17
PY3 = sys.version_info > (3,)
18
19
20
class RedisError(Exception):
21
    """A generic Redis error."""
22
23
    pass
24
25
26
class RedisClient(object):
27
    """A minimal Redis client."""
28
29
    def __init__(self, log, addr, sfamily, timeout):
30
        """A minimal Redis client.
31
32
        :param log: A logger created from loggging.getLogger
33
        :type log: logging.RootLogger
34
        :param addr: Either a tuple of ('host', port) or a path to a unix socket
35
        :type addr: str or tuple(str, int)
36
        :param sfamily: The socket family eg. socket.AF_INET or AF_UNIX
37
        :type sfamily: int
38
        :param timeout: The timeout in seconds for all socket operations
39
        :type timeout: float or int
40
        """
41
        self.log = log
42
        self.net = RedisNet(log, addr, sfamily, timeout)
43
44
    def info(self, section=None):
45
        """Return redis info.
46
47
        :param section: The info section to request from Redis
48
        :type section: str
49
        :raises RedisError: for any socket related exceptions
50
        :raises RedisError: for unexpcted server responses
51
        :rtype: dict
52
        """
53
        ret = {}
54
        section = "all" if section is None else section
55
        self.net.send("INFO {0}\r\n".format(section))
56
        for info_str in RedisResponse(self.net):
57
            for line in info_str.split("\n"):
58
                if not line or line[0] == "#" or line == '\r':
59
                    continue
60
                if line.find(":") >= 0:
61
                    key, val = line.split(":")
62
                ret[key] = val
63
        return ret
64
65
    def config_get_multi(self, globs):
66
        """Return redis config.
67
68
        :param globs: An containing glob search strings
69
        :type globs: iterable
70
        :raises RedisError: for any socket related exceptions
71
        :raises RedisError: for unexpcted server responses
72
        :rtype: dict
73
        """
74
        for glob_str in globs:
75
            self.net.send("CONFIG GET {0}\r\n".format(glob_str))
76
            vals = [val.strip() for val in RedisResponse(self.net)]
77
            yield dict(zip(vals[0::2], vals[1::2]))
78
79
    def scan(self, prefix, count=None):
80
        """Return a deque of key names that match the requested prefix.
81
82
        :param prefix: The key prefix/glob to search for
83
        :type prefix: str
84
        :param count: The number of keys to request on each iteration
85
        :type count: int
86
        :raises RedisError: for any socket related exceptions
87
        :raises RedisError: for unexpcted server responses
88
        :rtype: deque
89
        """
90
        scan_cmd = "scan {0} match {1} count {2}\r\n"
91
        count = 10 if count is None else count
92
        cursor = 0
93
        while True:
94
            # send scan request
95
            self.net.send(scan_cmd.format(cursor, prefix, count))
96
            # response is the next cursor followed by a list of matches
97
            resp = RedisResponse(self.net)
98
            cursor = int(next(resp))
99
            for key in resp:
100
                yield key.strip()
101
            if cursor == 0:
102
                break
103
104
    def get_multi(self, keys):
105
        """Return the total value of all matching keys.
106
107
        :param keys: The iterable of keys to return the total for.
108
        :type keys: iterable
109
        :raises RedisError: for any socket related exceptions
110
        :raises RedisError: for unexpcted server responses
111
        :raise ValueError: when a key doesn't cast to int
112
        :rtype: int
113
        """
114
        get_cmd = "get {0}\r\n"
115
        total = 0
116
        for key in keys:
117
            # send scan request
118
            self.net.send(get_cmd.format(key))
119
            # response should just be an int
120
            resp = RedisResponse(self.net)
121
            total += int(next(resp))
122
        return total
123
124
    def llen_multi(self, keys):
125
        """Return the total length of each key provided.
126
127
        :param keys: The iterable of keys to return the total length of.
128
        :type keys: iterable
129
        :raises RedisError: for any socket related exceptions
130
        :raises RedisError: for unexpcted server responses
131
        :rtype: int
132
        """
133
        llen_cmd = "llen {0}\r\n"
134
        total = 0
135
        for key in keys:
136
            # send scan request
137
            self.net.send(llen_cmd.format(key))
138
            # response should just be an int
139
            resp = RedisResponse(self.net)
140
            total += int(next(resp))
141
        return total
142
143
    def zcard_multi(self, keys):
144
        """Return the total cardinality of each key provided.
145
146
        :param keys: The iterable of keys to return the total cardinality of.
147
        :type keys: iterable
148
        :raises RedisError: for any socket related exceptions
149
        :raises RedisError: for unexpcted server responses
150
        :rtype: int
151
        """
152
        zcard_cmd = "zcard {0}\r\n"
153
        total = 0
154
        for key in keys:
155
            # send scan request
156
            self.net.send(zcard_cmd.format(key))
157
            # response should just be an int
158
            resp = RedisResponse(self.net)
159
            total += int(next(resp))
160
        return total
161
162
    def scard_multi(self, keys):
163
        """Return the total cardinality of each key provided.
164
165
        :param keys: The iterable of keys to return the total cardinality of.
166
        :type keys: iterable
167
        :raises RedisError: for any socket related exceptions
168
        :raises RedisError: for unexpcted server responses
169
        :rtype: int
170
        """
171
        scard_cmd = "scard {0}\r\n"
172
        total = 0
173
        for key in keys:
174
            # send scan request
175
            self.net.send(scard_cmd.format(key))
176
            # response should just be an int
177
            resp = RedisResponse(self.net)
178
            total += int(next(resp))
179
        return total
180
181
    def pfcount_multi(self, keys):
182
        """Return the total cardinality of each key provided.
183
184
        :param keys: The iterable of keys to return the total cardinality of.
185
        :type keys: iterable
186
        :raises RedisError: for any socket related exceptions
187
        :raises RedisError: for unexpcted server responses
188
        :rtype: int
189
        """
190
        pfcount_cmd = "pfcount {0}\r\n"
191
        total = 0
192
        for key in keys:
193
            # send scan request
194
            self.net.send(pfcount_cmd.format(key))
195
            # response should just be an int
196
            resp = RedisResponse(self.net)
197
            total += int(next(resp))
198
        return total
199
200
201
class RedisResponse(object):
202
    """An iterable of redis command responses."""
203
204
    def __init__(self, reader):
205
        """An iterable of redis command responses.
206
207
        :param reader: A RedisNet reader instance
208
        :type reader: RedisNet
209
210
        :raises RedisError: for any socket related errors
211
        :raises RedisError: for any unknown responses
212
        :raises RedisError: for any Redis Errors returned
213
        :raises RedisError: for any ValueErrors encountered when casting
214
        """
215
        self.reader = reader
216
        # handlers consume responses and add them to self.vals
217
        self.func = defaultdict(lambda: RedisResponse.h_unknown)
218
        self.func["*"] = lambda buff: self.parse(int(buff))
219
        self.func["+"] = lambda buff: self.vals.append(str(buff))
220
        # remove the \r from the string
221
        self.func["$"] = lambda buff: \
222
            self.vals.append(self.reader.readnbytes(int(buff) + 2))
223
        self.func[":"] = lambda buff: self.vals.append(int(buff))
224
        self.func["-"] = RedisResponse.h_error
225
        self.vals = deque()
226
        self.parse()
227
228
    def parse(self, nitems=None):
229
        """Read the full response from self.sock.
230
231
        :raises RedisError: for any socket related Exceptions
232
        :raises RedisError: for any unknown types read
233
        :raises RedisError: for any redis protocol errors
234
        :rtype: varies
235
        """
236
        nitems = 1 if nitems is None else nitems
237
        for i in xrange(nitems):
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'xrange'
Loading history...
Unused Code introduced by
The variable i seems to be unused.
Loading history...
238
            try:
239
                buff = self.reader.readline()
240
                self.func[buff[0]](buff[1:])
241
            except (ValueError, IndexError) as exc:
242
                msg = "could not parse response: {0}: {1}"
243
                raise RedisError(msg.format(buff, exc))
244
245
    @staticmethod
246
    def h_unknown(buff):
247
        """Uknown response handler.
248
249
        :param buff: A response buffer read from Redis
250
        :type buff: str
251
        :raises RedisError: this function always raises a RedisError
252
        """
253
        raise RedisError("unknown command: {0}".format(buff))
254
255
    @staticmethod
256
    def h_error(buff):
257
        """Raise a RedisError with unknown command buff.
258
259
        :param buff: A response buffer read from Redis
260
        :type buff: str
261
        :raises RedisError: on any socket related exceptions
262
        :rtype: str
263
        """
264
        msg = "RedisResponse: h_error({0})"
265
        raise RedisError(msg.format(buff))
266
267
    def __iter__(self):
268
        """A Redis command response iterator.
269
270
        :rtype: iterator
271
        """
272
        return self
273
274
    def __next__(self):
275
        """Return the next response, if any.
276
277
        :rtype: object
278
        """
279
        if not self.vals:
280
            raise StopIteration()
281
        return self.vals.popleft()
282
283
    def next(self):
284
        """Return the next response, if any.
285
286
        :rtype: object
287
        """
288
        if not self.vals:
289
            raise StopIteration()
290
        return self.vals.popleft()
291
292
293
class RedisNet(object):
294
    """A helper class that talks to Redis on a unix/tcp socket."""
295
296
    BUFF_LEN = 8192
297
298
    def __init__(self, log, addr, sfamily, timeout):
299
        """A helper class that talks to Redis on a unix/tcp socket.
300
301
        :param log: A logger created from loggging.getLogger
302
        :type log: logging.RootLogger
303
        :param addr: Either a tuple of ('host', port) or a path to a unix socket
304
        :type addr: str or tuple(str, int)
305
        :param sfamily: The socket family eg. socket.AF_INET or AF_UNIX
306
        :type sfamily: int
307
        :param timeout: The timeout in seconds for all socket operations
308
        :type timeout: float or int
309
        """
310
        self.log = log
311
        # addr can be unix socket or (host, port) tuple
312
        self.addr = addr
313
        # socket.AF_INET or socket.AF_UNIX
314
        self.sfamily = sfamily
315
        # all socket operations timeout
316
        self.timeout = timeout
317
        self.sock = None
318
        # read from our socket into this buffer
319
        # keep an index in the buffer that we've read up to
320
        # and record the total number of bytes in the buffer
321
        self.buff = ""
322
        self.buff_end = -1
323
        self.buff_i = -1
324
325
    def connect(self):
326
        """Connect to Redis.
327
328
        :raises RedisError: for any socket related exceptions
329
        :rtype: Exception or None
330
        """
331
        if self.sock:
332
            self.disconnect()
333
        try:
334
            # create the socket
335
            self.sock = socket.socket(self.sfamily, socket.SOCK_STREAM)
336
            # set timeout for socket operations
337
            self.sock.settimeout(self.timeout)
338
            self.sock.connect(self.addr)
339
            msg = "RedisNet: connected: {0}:{1}"
340
            self.log.info(msg.format(self.addr[0], self.addr[1]))
341
        except Exception as exc:
342
            msg = "RedisNet: Exception during connect: {0}"
343
            self.log.error(msg.format(exc))
344
            raise RedisError(msg.format(exc))
345
        return True
346
347
    def disconnect(self):
348
        """Disconnect from Redis.
349
350
        :raises RedisError: for any socket related exceptions
351
        """
352
        self.log.debug("RedisNet: disconnect")
353
        if self.sock:
354
            try:
355
                self.sock.close()
356
                self.sock = None
357
            except Exception as exc:
358
                msg = "RedisNet: exception during disconnect: {0}"
359
                self.log.error(msg.format(exc))
360
                raise RedisError(msg.format(exc))
361
362
    def read(self):
363
        """Read RedisNet.BUFF_LEN bytes from our socket into self.buff.
364
365
        Calls here overwrite self.buff and reset self.buff_i and
366
        self.buff_end.
367
368
        :raises RedisError: for any socket related exceptions
369
        """
370
        if not self.sock and not self.connect():
371
            msg = "RedisNet: unable to connect to: {0}"
372
            raise RedisError(msg.format(self.addr))
373
374
        try:
375
            self.buff = self.sock.recv(RedisNet.BUFF_LEN)
376
            self.buff_end = len(self.buff)
377
            self.buff_i = 0
378
        except Exception as exc:
379
            msg = "RedisNet: Exception during readline: {0}"
380
            self.log.error(msg.format(exc))
381
            self.disconnect()
382
            raise RedisError(msg.format(exc))
383
384
    def recv(self, nbytes):
385
        """Read nbytes from our socket and return it.
386
387
        :param nbytes: The number of bytes to read
388
        :type nbytes: int
389
        :raises RedisError: for any socket related exceptions
390
        :rytpe: str
391
        """
392
        if not self.sock and not self.connect():
393
            msg = "RedisNet: unable to connect to: {0}"
394
            raise RedisError(msg.format(self.addr))
395
396
        ret = ""
397
        try:
398
            ret = self.sock.recv(nbytes)
399
        except Exception as exc:
400
            msg = "RedisNet: Exception during recv: {0}"
401
            self.log.error(msg.format(exc))
402
            self.disconnect()
403
            raise RedisError(msg.format(exc))
404
        return ret
405
406
    def readline(self):
407
        """Get the next available line.
408
409
        :raises RedisError: for any socket related exceptions
410
        :rytpe: str
411
        """
412
        if not self.sock and not self.connect():
413
            msg = "RedisNet: unable to connect to: {0}"
414
            raise RedisError(msg.format(self.addr))
415
416
        buffs = deque()
417
        while True:
418
            # do we have any data available?
419
            if self.buff_end < 0 or self.buff_i >= self.buff_end:
420
                # read data, reset buffer state
421
                while self.buff_end < 1:
422
                    self.read()
423
            # now we have data, do we have a newline?
424
            i = self.buff[self.buff_i:].find("\n")
425
            if i > -1:
426
                # return line, advance buffer past it
427
                # move i past the newline
428
                # also need to find
429
                buff_i = self.buff_i
430
                buffs.append(self.buff[buff_i:buff_i+i])
431
                # advance beyond i
432
                self.buff_i = buff_i + i + 1
433
                # reset if we have no buffer left
434
                if self.buff_i >= self.buff_end:
435
                    self.buff_i = -1
436
                    self.buff_end = -1
437
                break
438
            # no newline yet, record and keep reading
439
            buffs.append(self.buff[self.buff_i:])
440
            self.buff_end = -1
441
            self.buff_i = -1
442
        ret = "".join(buffs)
443
        return ret
444
445
    def readnbytes(self, nbytes):
446
        """Read nbytes from our socket.
447
448
        :param nbytes: The number of bytes to read
449
        :type nbytes: int
450
        :raises RedisError: for any socket related exceptions
451
        :rytpe: str
452
        """
453
454
        # any bytes in our buffer?
455
        ret = ""
456
        buffs = deque()
457
        if self.buff_end and self.buff_i < self.buff_end:
458
            # do we have enough buffer to fullfill the request?
459
            nbytes_left = self.buff_end - self.buff_i
460
            if nbytes_left >= nbytes:
461
                # yes, advance our pointer
462
                buffi = self.buff_i
463
                buffs.append(self.buff[buffi:buffi+nbytes])
464
                self.buff_i += nbytes
465
                nbytes = 0
466
            else:
467
                # no, consume all of the buffer and then get remaining
468
                buffs.append(self.buff[self.buff_i:])
469
                # reset so next access on buffer forces a read
470
                self.buff_i = -1
471
                self.buff_end = -1
472
                nbytes -= nbytes_left
473
        # do we need more bytes?
474
        if nbytes:
475
            # just do a recv - don't use our buffer
476
            buffs.append(self.recv(nbytes))
477
478
        # join the buffers
479
        ret = "".join(buffs)
480
        return ret
481
482
    def send(self, cmd):
483
        """Send the supplied string to the redis server.
484
485
        :param cmd: The string to send to the redis server
486
        :type cmd: str
487
        :raises RedisError: for any socket related exceptions
488
        """
489
        if not self.sock and not self.connect():
490
            msg = "RedisNet: unable to connect to: {0}"
491
            raise RedisError(msg.format(self.addr))
492
        # send info request
493
        try:
494
            self.sock.sendall(cmd)
495
        except Exception as exc:
496
            msg = "RedisNet: exception sending to server: {0}"
497
            self.log.error(msg.format(exc))
498
            self.disconnect()
499
            raise RedisError(msg.format(exc))
500
501
502
class Redis(plumd.Reader):
503
    """Plugin to record redis metrics."""
504
505
    # default config values
506
    defaults = {
507
        'poll.interval': 10,
508
        'gauges': [
509
            "aof_current_rewrite_time_sec",
510
            "aof_enabled",
511
            "aof_last_rewrite_time_sec",
512
            "aof_rewrite_in_progress",
513
            "aof_rewrite_scheduled",
514
            "blocked_clients",
515
            "client_biggest_input_buf",
516
            "client_longest_output_list",
517
            "connected_clients",
518
            "connected_slaves",
519
            "evicted_keys",
520
            "expired_keys",
521
            "instantaneous_input_kbps",
522
            "instantaneous_ops_per_sec",
523
            "instantaneous_output_kbps",
524
            "keyspace_hits",
525
            "keyspace_misses",
526
            "latest_fork_usec",
527
            "loading",
528
            "master_repl_offset",
529
            "mem_fragmentation_ratio",
530
            "pubsub_channels",
531
            "pubsub_patterns",
532
            "rdb_bgsave_in_progress",
533
            "rdb_changes_since_last_save",
534
            "rdb_current_bgsave_time_sec",
535
            "rdb_last_bgsave_time_sec",
536
            "rdb_last_save_time",
537
            "rejected_connections",
538
            "repl_backlog_active",
539
            "repl_backlog_first_byte_offset",
540
            "repl_backlog_histlen",
541
            "repl_backlog_size",
542
            "sync_full",
543
            "sync_partial_err",
544
            "sync_partial_ok",
545
            "total_commands_processed",
546
            "total_connections_received",
547
            "total_net_input_bytes",
548
            "total_net_output_bytes",
549
            "uptime_in_days",
550
            "uptime_in_seconds",
551
            "used_cpu_sys",
552
            "used_cpu_sys_children",
553
            "used_cpu_user",
554
            "used_cpu_user_children",
555
            "used_memory",
556
            "used_memory_lua",
557
            "used_memory_peak",
558
            "used_memory_rss",
559
            "master_last_io_seconds_ago",
560
            "master_sync_in_progress",
561
            "slave_repl_offset",
562
            "slave_priority",
563
            "slave_read_only",
564
            "connected_slaves",
565
            "master_repl_offset",
566
            "repl_backlog_active",
567
            "repl_backlog_size",
568
            "repl_backlog_first_byte_offset",
569
            "repl_backlog_histlen"
570
            "connected_slaves"
571
            ],
572
        'rates': [],
573
        'configs': [
574
            'maxmemory'
575
        ],
576
        'keys': {
577
            # 'type': { metric_prefix: [key_prefix*, ...] }
578
            'lists': {},
579
            'zsets': {},
580
            'sets': {},
581
            'hlls': {}
582
        },
583
        'addr': '127.0.0.1:6379',
584
        'addr_type': 'inet',
585
        'timeout': 10
586
    }
587
588
    def __init__(self, log, config):
589
        """Plugin to record redis metrics.
590
591
        :param log: A logger
592
        :type log: logging.RootLogger
593
        :param config: a plumd.config.Conf configuration helper instance.
594
        :type config: plumd.config.Conf
595
        """
596
        super(Redis, self).__init__(log, config)
597
        self.config.defaults(Redis.defaults)
598
599
        # metrics to record
600
        self.gauges = self.config.get('gauges')
601
        self.rates = self.config.get('rates')
602
        self.configs = self.config.get('configs')
603
        self.keys = self.config.get('keys')
604
605
        # Redis connection - either unix socket or tcp
606
        addr = self.config.get('addr')
607
        addr_type = self.config.get('addr_type').lower()
608
        if addr_type == "unix":
609
            sfamily = socket.AF_UNIX
610
        elif addr_type == "inet":
611
            try:
612
                host, port = addr.split(":")
613
            except AttributeError:
614
                msg = "Redis: invalid address: {0}, (host:port)"
615
                raise plumd.ConfigError(msg.format(addr))
616
            addr = (host, int(port))
617
            sfamily = socket.AF_INET
618
        else:
619
            msg = "Redis: unsupported connection type: {0} (unix, inet)"
620
            raise plumd.ConfigError(msg.format(addr_type))
621
        timeout = config.get('timeout')
622
        self.client = RedisClient(self.log, addr, sfamily, timeout)
623
        self.calc = Differential()
624
625
    def poll(self):
626
        """Query Redis for metrics.
627
628
        :rtype: ResultSet
629
        """
630
        # catch exceptions - simply skip the poll on error
631
        try:
632
            result = plumd.Result("redis")
633
634
            # config values
635
            self.record_configs(result)
636
637
            # key sizes
638
            self.record_sizes(result)
639
640
            # get server metrics
641
            stats = self.client.info()
642
643
            # record gauges, rates
644
            self.record_metrics(stats, result)
645
646
            # replication, if any slaves are connected
647
            if "slave0" in stats:
648
                self.record_slaves(stats, result)
649
650
            # db metrics, maxmem
651
            self.record_dbs(stats, result)
652
653
            # record lists, zsets, sets and hll sizes
654
            self.record_sizes(result)
655
656
            # and finally command stats - if available
657
            self.record_cmdstats(result)
658
659
        except RedisError as exc:
660
            msg = "Redis: exception during poll: {0}"
661
            self.log.error(msg.format(exc))
662
        return plumd.ResultSet([result])
663
664
    def record_cmdstats(self, result):
665
        """Record the stats from info commandstats.
666
667
        :param result: A result object to add metrics to
668
        :type result: ResultSet
669
        """
670
        name = self.name
671
        infos = self.client.info("commandstats")
672
        for key in sorted(infos.keys()):
673
            vals = infos[key].split(",")
674
            cstat, cname = key.split("_")
675
            for val in vals:
676
                mname, mval = val.split("=")
677
                metric = "{0}.{1}.{2}.{3}".format(name, cstat, cname, mname)
678
                result.add(plumd.Float(metric, mval))
679
680
    def record_metrics(self, stats, result):
681
        """Record the configured gauges and metrics.
682
683
        :param stats: Dictionary returned from info command
684
        :type stats: dict
685
        :param result: A result object to add metrics to
686
        :type result: ResultSet
687
        """
688
        timest = time.time()
689
        name = self.name
690
691
        # record gauges
692
        for stat in self.gauges:
693
            if stat in stats:
694
                mname = "{0}.{1}".format(name, stat)
695
                result.add(plumd.Float(mname, stats[stat]))
696
697
        # record rates
698
        for stat in self.rates:
699
            if stat in stats:
700
                mname = "{0}.{1}".format(name, stat)
701
                mval = self.calc.per_second(mname, float(stats[stat]), timest)
702
                result.add(plumd.Float(mname, mval))
703
704
    def record_dbs(self, stats, result):
705
        """Record per database metrics into result.
706
707
        :param stats: Dictionary returned from info command
708
        :type stats: dict
709
        :param result: A result object to add metrics to
710
        :type result: ResultSet
711
        """
712
        # db0:keys=1,expires=0,avg_ttl=0
713
        name = self.name
714
        db_fmt = "db{0}"
715
        metric_fmt = "{0}.db.{1}.{2}"
716
717
        for i in xrange(0, len(stats.keys())):
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'xrange'
Loading history...
718
            dbname = db_fmt.format(i)
719
            if dbname not in stats:
720
                break
721
            try:
722
                vals = stats[dbname].split(",")
723
                dbmetrics = dict((k, v)
724
                                 for k, v in (v.split('=') for v in vals))
725
                for key, val in dbmetrics.items():
726
                    metric_str = metric_fmt.format(name, i, key)
727
                    result.add(plumd.Int(metric_str, val))
728
            except KeyError:
729
                self.log.error("Redis: invalid db entry: {0}".format(dbname))
730
731
    def record_slaves(self, stats, result):
732
        """Record slave metrics into result.
733
734
        :param stats: A dictionary returned from info command
735
        :type stats: dict
736
        :param result: A ResultSet object to add metrics to
737
        :type result: ResultSet
738
        """
739
        # slave0:ip=127.0.0.1,port=6399,state=online,offset=239,lag=1
740
        name = self.name
741
        slave_str = "slave{0}"
742
        moffstr = 'master_repl_offset'
743
        moffset = 0
744
        try:
745
            moffset = int(stats[moffstr])
746
        except(TypeError, KeyError):
747
            self.log.error("Redis: no {0} value".format(moffstr))
748
749
        # for each slave entry
750
        for i in xrange(0, len(stats.keys())):
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'xrange'
Loading history...
751
            sname = slave_str.format(i)
752
            if sname not in stats:
753
                break
754
            try:
755
                vals = stats[sname].split(",")
756
                smetrics = dict((k, v)
757
                                for k, v in (v.split('=') for v in vals))
758
                sip = smetrics['ip'].replace(".", "_")
759
                smname = "{0}_{1}".format(sip, smetrics['port'])
760
761
                # record offset and lag
762
                mname = "{0}.slave.{1}.offset".format(name, smname)
763
                soffset = moffset - int(smetrics['offset'])
764
                result.add(plumd.Int(mname, soffset))
765
                mname = "{0}.slave.{1}.lag".format(name, sname)
766
                result.add(plumd.Int(mname, smetrics['lag']))
767
768
                # if slave is online set online = 1, otherwise 0
769
                sonline = 1 if smetrics['state'] == "online" else 0
770
                mname = "{0}.slave.{1}.online".format(name, sname)
771
                result.add(plumd.Int(mname, sonline))
772
            except(TypeError, KeyError, ValueError):
773
                self.log.error("Redis: invalid slave entry: {0}".format(sname))
774
775
    def record_configs(self, result):
776
        """Record the configured configuration values.
777
778
        :param result: A ResultSet to record max mem to.
779
        :type result: plumd.ResultSet
780
        """
781
        configs = self.configs
782
        if not configs:
783
            return
784
        name = self.name
785
        for config in self.client.config_get_multi(configs):
786
            for key, val in config.items():
787
                mstr = "{0}.configs.{1}".format(name, key)
788
                result.add(plumd.Float(mstr, val))
789
790
    def record_sizes(self, result):
791
        """Record the total sizes of the configured keys.
792
793
        For each type of key (list, zset, set, hyperloglog) record
794
        total number of items for all keys.
795
796
        :param result: A ResultSet to record into.
797
        :type result: plumd.ResultSet
798
        """
799
        if not self.keys:
800
            return
801
        keys = self.config.get("keys")
802
        if "strings" in keys:
803
            self.record_strings(keys['strings'], result)
804
        if "lists" in keys:
805
            self.record_lists(keys['lists'], result)
806
        if "zsets" in keys:
807
            self.record_zsets(keys['zsets'], result)
808
        if "sets" in keys:
809
            self.record_sets(keys['sets'], result)
810
        if "hlls" in keys:
811
            self.record_hlls(keys['hlls'], result)
812
813
    def record_strings(self, lconfig, result):
814
        """Record the total size of the configured string keys.
815
816
        eg. lconfig: {"metric_name": [ "list", "of", "keys"]}
817
818
        :param lconfig: A dict of metric name => list of key names
819
        :type lconfig: dict
820
        :param result: A ResultSet to record into.
821
        :type result: plumd.ResultSet
822
        """
823
        name = self.name
824
        for mprefix, keys in lconfig.items():
825
            total = 0
826
            # get the total for this prefix
827
            try:
828
                total = self.client.get_multi(keys)
829
            except RedisError as exc:
830
                msg = "ERROR: redis: record_strings: are {0} strings? {1}"
831
                print(msg.format(mprefix, exc))
832
            else:
833
                mstr = "{0}.sizes.strings.{1}".format(name, mprefix)
834
                result.add(plumd.Int(mstr, total))
835
836
    def record_lists(self, lconfig, result):
837
        """Record the total length of the configured lists.
838
839
        eg. lconfig: {"metric_name": [ "list", "of", "keys"]}
840
841
        :param lconfig: A dict of metric name => list of key names
842
        :type lconfig: dict
843
        :param result: A ResultSet to record into.
844
        :type result: plumd.ResultSet
845
        """
846
        name = self.name
847
        for mprefix, keys in lconfig.items():
848
            total = 0
849
            # get the total for this prefix
850
            try:
851
                total = self.client.llen_multi(keys)
852
            except RedisError as exc:
853
                msg = "ERROR: redis: record_lists: are {0} lists? {1}"
854
                print(msg.format(mprefix, exc))
855
            mstr = "{0}.sizes.lists.{1}".format(name, mprefix)
856
            result.add(plumd.Int(mstr, total))
857
858
    def record_zsets(self, zconfig, result):
859
        """Record the total length of the configured zsets.
860
861
        eg. zconfig: {"metric_name": [ "list", "of", "keys"]}
862
863
        :param zconfig: A dict of metric name => list of key names
864
        :type zconfig: dict
865
        :param result: A ResultSet to record into.
866
        :type result: plumd.ResultSet
867
        """
868
        name = self.name
869
        for mprefix, keys in zconfig.items():
870
            # get the total for this prefix
871
            try:
872
                total = self.client.zcard_multi(keys)
873
            except RedisError as exc:
874
                msg = "ERROR: redis: record_zsets: are {0} zsets? {1}"
875
                print(msg.format(mprefix, exc))
876
            else:
877
                mstr = "{0}.sizes.zset.{1}".format(name, mprefix)
878
                result.add(plumd.Int(mstr, total))
879
880
    def record_sets(self, sconfig, result):
881
        """Record the total length of the configured sets.
882
883
        eg. sconfig: {"metric_name": [ "list", "of", "keys"]}
884
885
        :param sconfig: A dict of metric name => list of key names
886
        :type sconfig: dict
887
        :param result: A ResultSet to record into.
888
        :type result: plumd.ResultSet
889
        """
890
        name = self.name
891
        for mprefix, keys in sconfig.items():
892
            # get the total for this prefix
893
            try:
894
                total = self.client.scard_multi(keys)
895
            except RedisError as exc:
896
                msg = "ERROR: redis: record_sets: are {0} sets? {1}"
897
                print(msg.format(mprefix, exc))
898
            else:
899
                mstr = "{0}.sizes.set.{1}".format(name, mprefix)
900
                result.add(plumd.Int(mstr, total))
901
902
    def record_hlls(self, hllconfig, result):
903
        """Record the total length of the configured hlls.
904
905
        eg. sconfig: {"metric_name": [ "list", "of", "keys"]}
906
907
        :param hllconfig: A dict of metric name => list of key names
908
        :type hllconfig: dict
909
        :param result: A ResultSet to record into.
910
        :type result: plumd.ResultSet
911
        """
912
        name = self.name
913
        for mprefix, keys in hllconfig.items():
914
            # get the total for this prefix
915
            try:
916
                total = self.client.pfcount_multi(keys)
917
            except RedisError as exc:
918
                msg = "ERROR: redis: record_hlls: are {0} hyperloglogs? {1}"
919
                print(msg.format(mprefix, exc))
920
            else:
921
                mstr = "{0}.sizes.hll.{1}".format(name, mprefix)
922
                result.add(plumd.Int(mstr, total))
923