Completed
Push — master ( 08acac...d85769 )
by Kenny
01:15
created

RedisNet.recv()   A

Complexity

Conditions 4

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 21
rs 9.0534
cc 4
1
# -*- coding: utf-8 -*-
2
"""A redis reader plugin with builtin redis client."""
3
import sys
4
import time
5
import socket
6
from collections import deque
7
from collections import defaultdict
8
9
import plumd
10
from plumd.util import Differential
11
12
__author__ = 'Kenny Freeman'
13
__email__ = '[email protected]'
14
__license__ = "ISCL"
15
__docformat__ = 'reStructuredText'
16
17
PY3 = sys.version_info > (3,)
18
19
20
class RedisError(Exception):
21
    """A generic Redis error."""
22
23
    pass
24
25
26
class RedisClient(object):
27
    """A minimal Redis client."""
28
29
    def __init__(self, log, addr, sfamily, timeout):
30
        """A minimal Redis client.
31
32
        :param log: A logger created from loggging.getLogger
33
        :type log: logging.RootLogger
34
        :param addr: Either a tuple of ('host', port) or a path to a unix socket
35
        :type addr: str or tuple(str, int)
36
        :param sfamily: The socket family eg. socket.AF_INET or AF_UNIX
37
        :type sfamily: int
38
        :param timeout: The timeout in seconds for all socket operations
39
        :type timeout: float or int
40
        """
41
        self.log = log
42
        self.net = RedisNet(log, addr, sfamily, timeout)
43
44
    def info(self, section=None):
45
        """Return redis info.
46
47
        :param section: The info section to request from Redis
48
        :type section: str
49
        :raises RedisError: for any socket related exceptions
50
        :raises RedisError: for unexpcted server responses
51
        :rtype: dict
52
        """
53
        ret = {}
54
        section = "all" if section is None else section
55
        self.net.send("INFO {0}\r\n".format(section))
56
        for info_str in RedisResponse(self.net):
57
            for line in info_str.split("\n"):
58
                if not line or line[0] == "#" or line == '\r':
59
                    continue
60
                if line.find(":") >= 0:
61
                    key, val = line.split(":")
62
                ret[key] = val
63
        return ret
64
65
    def config_get_multi(self, globs):
66
        """Return redis config.
67
68
        :param globs: An containing glob search strings
69
        :type globs: iterable
70
        :raises RedisError: for any socket related exceptions
71
        :raises RedisError: for unexpcted server responses
72
        :rtype: dict
73
        """
74
        for glob_str in globs:
75
            self.net.send("CONFIG GET {0}\r\n".format(glob_str))
76
            vals = [val.strip() for val in RedisResponse(self.net)]
77
            yield dict(zip(vals[0::2], vals[1::2]))
78
79
    def scan(self, prefix, count=None):
80
        """Return a deque of key names that match the requested prefix.
81
82
        :param prefix: The key prefix/glob to search for
83
        :type prefix: str
84
        :param count: The number of keys to request on each iteration
85
        :type count: int
86
        :raises RedisError: for any socket related exceptions
87
        :raises RedisError: for unexpcted server responses
88
        :rtype: deque
89
        """
90
        scan_cmd = "scan {0} match {1} count {2}\r\n"
91
        count = 10 if count is None else count
92
        cursor = 0
93
        while True:
94
            # send scan request
95
            self.net.send(scan_cmd.format(cursor, prefix, count))
96
            # response is the next cursor followed by a list of matches
97
            resp = RedisResponse(self.net)
98
            cursor = int(next(resp))
99
            for key in resp:
100
                yield key.strip()
101
            if cursor == 0:
102
                break
103
104
    def llen_multi(self, keys):
105
        """Return the total length of each key provided.
106
107
        :param keys: The iterable of keys to return the total length of.
108
        :type keys: iterable
109
        :raises RedisError: for any socket related exceptions
110
        :raises RedisError: for unexpcted server responses
111
        :rtype: int
112
        """
113
        llen_cmd = "llen {0}\r\n"
114
        total = 0
115
        for key in keys:
116
            # send scan request
117
            self.net.send(llen_cmd.format(key))
118
            # response should just be an int
119
            resp = RedisResponse(self.net)
120
            total += int(next(resp))
121
        return total
122
123
    def zcard_multi(self, keys):
124
        """Return the total cardinality of each key provided.
125
126
        :param keys: The iterable of keys to return the total cardinality of.
127
        :type keys: iterable
128
        :raises RedisError: for any socket related exceptions
129
        :raises RedisError: for unexpcted server responses
130
        :rtype: int
131
        """
132
        zcard_cmd = "zcard {0}\r\n"
133
        total = 0
134
        for key in keys:
135
            # send scan request
136
            self.net.send(zcard_cmd.format(key))
137
            # response should just be an int
138
            resp = RedisResponse(self.net)
139
            total += int(next(resp))
140
        return total
141
142
    def scard_multi(self, keys):
143
        """Return the total cardinality of each key provided.
144
145
        :param keys: The iterable of keys to return the total cardinality of.
146
        :type keys: iterable
147
        :raises RedisError: for any socket related exceptions
148
        :raises RedisError: for unexpcted server responses
149
        :rtype: int
150
        """
151
        scard_cmd = "scard {0}\r\n"
152
        total = 0
153
        for key in keys:
154
            # send scan request
155
            self.net.send(scard_cmd.format(key))
156
            # response should just be an int
157
            resp = RedisResponse(self.net)
158
            total += int(next(resp))
159
        return total
160
161
    def pfcount_multi(self, keys):
162
        """Return the total cardinality of each key provided.
163
164
        :param keys: The iterable of keys to return the total cardinality of.
165
        :type keys: iterable
166
        :raises RedisError: for any socket related exceptions
167
        :raises RedisError: for unexpcted server responses
168
        :rtype: int
169
        """
170
        pfcount_cmd = "pfcount {0}\r\n"
171
        total = 0
172
        for key in keys:
173
            # send scan request
174
            self.net.send(pfcount_cmd.format(key))
175
            # response should just be an int
176
            resp = RedisResponse(self.net)
177
            total += int(next(resp))
178
        return total
179
180
181
class RedisResponse(object):
182
    """An iterable of redis command responses."""
183
184
    def __init__(self, reader):
185
        """An iterable of redis command responses.
186
187
        :param reader: A RedisNet reader instance
188
        :type reader: RedisNet
189
190
        :raises RedisError: for any socket related errors
191
        :raises RedisError: for any unknown responses
192
        :raises RedisError: for any Redis Errors returned
193
        :raises RedisError: for any ValueErrors encountered when casting
194
        """
195
        self.reader = reader
196
        # handlers consume responses and add them to self.vals
197
        self.func = defaultdict(lambda: RedisResponse.h_unknown)
198
        self.func["*"] = lambda buff: self.parse(int(buff))
199
        self.func["+"] = lambda buff: self.vals.append(str(buff))
200
        # remove the \r from the string
201
        self.func["$"] = lambda buff: \
202
            self.vals.append(self.reader.readnbytes(int(buff) + 2))
203
        self.func[":"] = lambda buff: self.vals.append(int(buff))
204
        self.func["-"] = RedisResponse.h_error
205
        self.vals = deque()
206
        self.parse()
207
208
    def parse(self, nitems=None):
209
        """Read the full response from self.sock.
210
211
        :raises RedisError: for any socket related Exceptions
212
        :raises RedisError: for any unknown types read
213
        :raises RedisError: for any redis protocol errors
214
        :rtype: varies
215
        """
216
        nitems = 1 if nitems is None else nitems
217
        for i in xrange(nitems):
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'xrange'
Loading history...
Unused Code introduced by
The variable i seems to be unused.
Loading history...
218
            try:
219
                buff = self.reader.readline()
220
                self.func[buff[0]](buff[1:])
221
            except (ValueError, IndexError) as exc:
222
                msg = "could not parse response: {0}: {1}"
223
                raise RedisError(msg.format(buff, exc))
224
225
    @staticmethod
226
    def h_unknown(buff):
227
        """Uknown response handler.
228
229
        :param buff: A response buffer read from Redis
230
        :type buff: str
231
        :raises RedisError: this function always raises a RedisError
232
        """
233
        raise RedisError("unknown command: {0}".format(buff))
234
235
    @staticmethod
236
    def h_error(buff):
237
        """Raise a RedisError with unknown command buff.
238
239
        :param buff: A response buffer read from Redis
240
        :type buff: str
241
        :raises RedisError: on any socket related exceptions
242
        :rtype: str
243
        """
244
        msg = "RedisResponse: h_error({0})"
245
        raise RedisError(msg.format(buff))
246
247
    def __iter__(self):
248
        """A Redis command response iterator.
249
250
        :rtype: iterator
251
        """
252
        return self
253
254
    def __next__(self):
255
        """Return the next response, if any.
256
257
        :rtype: object
258
        """
259
        if not self.vals:
260
            raise StopIteration()
261
        return self.vals.popleft()
262
263
    def next(self):
264
        """Return the next response, if any.
265
266
        :rtype: object
267
        """
268
        if not self.vals:
269
            raise StopIteration()
270
        return self.vals.popleft()
271
272
273
class RedisNet(object):
274
    """A helper class that talks to Redis on a unix/tcp socket."""
275
276
    BUFF_LEN = 8192
277
278
    def __init__(self, log, addr, sfamily, timeout):
279
        """A helper class that talks to Redis on a unix/tcp socket.
280
281
        :param log: A logger created from loggging.getLogger
282
        :type log: logging.RootLogger
283
        :param addr: Either a tuple of ('host', port) or a path to a unix socket
284
        :type addr: str or tuple(str, int)
285
        :param sfamily: The socket family eg. socket.AF_INET or AF_UNIX
286
        :type sfamily: int
287
        :param timeout: The timeout in seconds for all socket operations
288
        :type timeout: float or int
289
        """
290
        self.log = log
291
        # addr can be unix socket or (host, port) tuple
292
        self.addr = addr
293
        # socket.AF_INET or socket.AF_UNIX
294
        self.sfamily = sfamily
295
        # all socket operations timeout
296
        self.timeout = timeout
297
        self.sock = None
298
        # read from our socket into this buffer
299
        # keep an index in the buffer that we've read up to
300
        # and record the total number of bytes in the buffer
301
        self.buff = ""
302
        self.buff_end = -1
303
        self.buff_i = -1
304
305 View Code Duplication
    def connect(self):
306
        """Connect to Redis.
307
308
        :raises RedisError: for any socket related exceptions
309
        :rtype: Exception or None
310
        """
311
        if self.sock:
312
            self.disconnect()
313
        try:
314
            # create the socket
315
            self.sock = socket.socket(self.sfamily, socket.SOCK_STREAM)
316
            # set timeout for socket operations
317
            self.sock.settimeout(self.timeout)
318
            self.sock.connect(self.addr)
319
            msg = "RedisNet: connected: {0}"
320
            self.log.info(msg.format(self.addr))
321
        except Exception as exc:
322
            msg = "RedisNet: Exception during connect: {0}"
323
            self.log.error(msg.format(exc))
324
            raise RedisError(msg.format(exc))
325
        return True
326
327
    def disconnect(self):
328
        """Disconnect from Redis.
329
330
        :raises RedisError: for any socket related exceptions
331
        """
332
        self.log.debug("RedisNet: disconnect")
333
        if self.sock:
334
            try:
335
                self.sock.close()
336
                self.sock = None
337
            except Exception as exc:
338
                msg = "RedisNet: exception during disconnect: {0}"
339
                self.log.error(msg.format(exc))
340
                raise RedisError(msg.format(exc))
341
342
    def read(self):
343
        """Read RedisNet.BUFF_LEN bytes from our socket into self.buff.
344
345
        Calls here overwrite self.buff and reset self.buff_i and
346
        self.buff_end.
347
348
        :raises RedisError: for any socket related exceptions
349
        """
350
        if not self.sock and not self.connect():
351
            msg = "RedisNet: unable to connect to: {0}"
352
            raise RedisError(msg.format(self.addr))
353
354
        try:
355
            self.buff = self.sock.recv(RedisNet.BUFF_LEN)
356
            self.buff_end = len(self.buff)
357
            self.buff_i = 0
358
        except Exception as exc:
359
            msg = "RedisNet: Exception during readline: {0}"
360
            self.log.error(msg.format(exc))
361
            self.disconnect()
362
            raise RedisError(msg.format(exc))
363
364
    def recv(self, nbytes):
365
        """Read nbytes from our socket and return it.
366
367
        :param nbytes: The number of bytes to read
368
        :type nbytes: int
369
        :raises RedisError: for any socket related exceptions
370
        :rytpe: str
371
        """
372
        if not self.sock and not self.connect():
373
            msg = "RedisNet: unable to connect to: {0}"
374
            raise RedisError(msg.format(self.addr))
375
376
        ret = ""
377
        try:
378
            ret = self.sock.recv(nbytes)
379
        except Exception as exc:
380
            msg = "RedisNet: Exception during recv: {0}"
381
            self.log.error(msg.format(exc))
382
            self.disconnect()
383
            raise RedisError(msg.format(exc))
384
        return ret
385
386
    def readline(self):
387
        """Get the next available line.
388
389
        :raises RedisError: for any socket related exceptions
390
        :rytpe: str
391
        """
392
        if not self.sock and not self.connect():
393
            msg = "RedisNet: unable to connect to: {0}"
394
            raise RedisError(msg.format(self.addr))
395
396
        buffs = deque()
397
        while True:
398
            # do we have any data available?
399
            if self.buff_end < 0 or self.buff_i >= self.buff_end:
400
                # read data, reset buffer state
401
                while self.buff_end < 1:
402
                    self.read()
403
            # now we have data, do we have a newline?
404
            i = self.buff[self.buff_i:].find("\n")
405
            if i > -1:
406
                # return line, advance buffer past it
407
                # move i past the newline
408
                # also need to find
409
                buff_i = self.buff_i
410
                buffs.append(self.buff[buff_i:buff_i+i])
411
                # advance beyond i
412
                self.buff_i = buff_i + i + 1
413
                # reset if we have no buffer left
414
                if self.buff_i >= self.buff_end:
415
                    self.buff_i = -1
416
                    self.buff_end = -1
417
                break
418
            # no newline yet, record and keep reading
419
            buffs.append(self.buff[self.buff_i:])
420
            self.buff_end = -1
421
            self.buff_i = -1
422
        ret = "".join(buffs)
423
        return ret
424
425
    def readnbytes(self, nbytes):
426
        """Read nbytes from our socket.
427
428
        :param nbytes: The number of bytes to read
429
        :type nbytes: int
430
        :raises RedisError: for any socket related exceptions
431
        :rytpe: str
432
        """
433
434
        # any bytes in our buffer?
435
        ret = ""
436
        buffs = deque()
437
        if self.buff_end and self.buff_i < self.buff_end:
438
            # do we have enough buffer to fullfill the request?
439
            nbytes_left = self.buff_end - self.buff_i
440
            if nbytes_left >= nbytes:
441
                # yes, advance our pointer
442
                buffi = self.buff_i
443
                buffs.append(self.buff[buffi:buffi+nbytes])
444
                self.buff_i += nbytes
445
                nbytes = 0
446
            else:
447
                # no, consume all of the buffer and then get remaining
448
                buffs.append(self.buff[self.buff_i:])
449
                # reset so next access on buffer forces a read
450
                self.buff_i = -1
451
                self.buff_end = -1
452
                nbytes -= nbytes_left
453
        # do we need more bytes?
454
        if nbytes:
455
            # just do a recv - don't use our buffer
456
            buffs.append(self.recv(nbytes))
457
458
        # join the buffers
459
        ret = "".join(buffs)
460
        return ret
461
462
    def send(self, cmd):
463
        """Send the supplied string to the redis server.
464
465
        :param cmd: The string to send to the redis server
466
        :type cmd: str
467
        :raises RedisError: for any socket related exceptions
468
        """
469
        if not self.sock and not self.connect():
470
            msg = "RedisNet: unable to connect to: {0}"
471
            raise RedisError(msg.format(self.addr))
472
        # send info request
473
        try:
474
            self.sock.sendall(cmd)
475
        except Exception as exc:
476
            msg = "RedisNet: exception sending to server: {0}"
477
            self.log.error(msg.format(exc))
478
            self.disconnect()
479
            raise RedisError(msg.format(exc))
480
481
482
class Redis(plumd.Reader):
483
    """Plugin to record redis metrics."""
484
485
    # default config values
486
    defaults = {
487
        'poll.interval': 10,
488
        'gauges': [
489
            "aof_current_rewrite_time_sec",
490
            "aof_enabled",
491
            "aof_last_rewrite_time_sec",
492
            "aof_rewrite_in_progress",
493
            "aof_rewrite_scheduled",
494
            "blocked_clients",
495
            "client_biggest_input_buf",
496
            "client_longest_output_list",
497
            "connected_clients",
498
            "connected_slaves",
499
            "evicted_keys",
500
            "expired_keys",
501
            "instantaneous_input_kbps",
502
            "instantaneous_ops_per_sec",
503
            "instantaneous_output_kbps",
504
            "keyspace_hits",
505
            "keyspace_misses",
506
            "latest_fork_usec",
507
            "loading",
508
            "master_repl_offset",
509
            "mem_fragmentation_ratio",
510
            "pubsub_channels",
511
            "pubsub_patterns",
512
            "rdb_bgsave_in_progress",
513
            "rdb_changes_since_last_save",
514
            "rdb_current_bgsave_time_sec",
515
            "rdb_last_bgsave_time_sec",
516
            "rdb_last_save_time",
517
            "rejected_connections",
518
            "repl_backlog_active",
519
            "repl_backlog_first_byte_offset",
520
            "repl_backlog_histlen",
521
            "repl_backlog_size",
522
            "sync_full",
523
            "sync_partial_err",
524
            "sync_partial_ok",
525
            "total_commands_processed",
526
            "total_connections_received",
527
            "total_net_input_bytes",
528
            "total_net_output_bytes",
529
            "uptime_in_days",
530
            "uptime_in_seconds",
531
            "used_cpu_sys",
532
            "used_cpu_sys_children",
533
            "used_cpu_user",
534
            "used_cpu_user_children",
535
            "used_memory",
536
            "used_memory_lua",
537
            "used_memory_peak",
538
            "used_memory_rss",
539
            "master_last_io_seconds_ago",
540
            "master_sync_in_progress",
541
            "slave_repl_offset",
542
            "slave_priority",
543
            "slave_read_only",
544
            "connected_slaves",
545
            "master_repl_offset",
546
            "repl_backlog_active",
547
            "repl_backlog_size",
548
            "repl_backlog_first_byte_offset",
549
            "repl_backlog_histlen"
550
            "connected_slaves"
551
            ],
552
        'rates': [],
553
        'configs': [
554
            'maxmemory'
555
        ],
556
        'keys': {
557
            # 'type': { metric_prefix: [key_prefix*, ...] }
558
            'lists': {},
559
            'zsets': {},
560
            'sets': {},
561
            'hlls': {}
562
        },
563
        'addr': '127.0.0.1:6379',
564
        'addr_type': 'inet',
565
        'timeout': 10
566
    }
567
568
    def __init__(self, log, config):
569
        """Plugin to record redis metrics.
570
571
        :param log: A logger
572
        :type log: logging.RootLogger
573
        :param config: a plumd.config.Conf configuration helper instance.
574
        :type config: plumd.config.Conf
575
        """
576
        super(Redis, self).__init__(log, config)
577
        self.config.defaults(Redis.defaults)
578
579
        # metrics to record
580
        self.gauges = self.config.get('gauges')
581
        self.rates = self.config.get('rates')
582
        self.configs = self.config.get('configs')
583
        self.keys = self.config.get('keys')
584
585
        # Redis connection - either unix socket or tcp
586
        addr = self.config.get('addr')
587
        addr_type = self.config.get('addr_type').lower()
588
        if addr_type == "unix":
589
            sfamily = socket.AF_UNIX
590
        elif addr_type == "inet":
591
            try:
592
                host, port = addr.split(":")
593
            except AttributeError:
594
                msg = "Redis: invalid address: {0}, (host:port)"
595
                raise plumd.ConfigError(msg.format(addr))
596
            addr = (host, int(port))
597
            sfamily = socket.AF_INET
598
        else:
599
            msg = "Redis: unsupported connection type: {0} (unix, inet)"
600
            raise plumd.ConfigError(msg.format(addr_type))
601
        timeout = config.get('timeout')
602
        self.client = RedisClient(self.log, addr, sfamily, timeout)
603
        self.calc = Differential()
604
605
    def poll(self):
606
        """Query Redis for metrics.
607
608
        :rtype: ResultSet
609
        """
610
        # catch exceptions - simply skip the poll on error
611
        try:
612
            result = plumd.Result("redis")
613
614
            # config values
615
            self.record_configs(result)
616
617
            # key sizes
618
            self.record_sizes(result)
619
620
            # get server metrics
621
            stats = self.client.info()
622
623
            # record gauges, rates
624
            self.record_metrics(stats, result)
625
626
            # replication, if any slaves are connected
627
            if "slave0" in stats:
628
                self.record_slaves(stats, result)
629
630
            # db metrics, maxmem
631
            self.record_dbs(stats, result)
632
633
            # record lists, zsets, sets and hll sizes
634
            self.record_sizes(result)
635
636
            # and finally command stats - if available
637
            self.record_cmdstats(result)
638
639
        except RedisError as exc:
640
            msg = "Redis: exception during poll: {0}"
641
            self.log.error(msg.format(exc))
642
        return plumd.ResultSet([result])
643
644
    def record_cmdstats(self, result):
645
        """Record the stats from info commandstats.
646
647
        :param result: A result object to add metrics to
648
        :type result: ResultSet
649
        """
650
        name = self.name
651
        infos = self.client.info("commandstats")
652
        for key in sorted(infos.keys()):
653
            vals = infos[key].split(",")
654
            cstat, cname = key.split("_")
655
            for val in vals:
656
                mname, mval = val.split("=")
657
                metric = "{0}.{1}.{2}.{3}".format(name, cstat, cname, mname)
658
                result.add(plumd.Float(metric, mval))
659
660
    def record_metrics(self, stats, result):
661
        """Record the configured gauges and metrics.
662
663
        :param stats: Dictionary returned from info command
664
        :type stats: dict
665
        :param result: A result object to add metrics to
666
        :type result: ResultSet
667
        """
668
        timest = time.time()
669
        name = self.name
670
671
        # record gauges
672
        for stat in self.gauges:
673
            if stat in stats:
674
                mname = "{0}.{1}".format(name, stat)
675
                result.add(plumd.Float(mname, stats[stat]))
676
677
        # record rates
678
        for stat in self.rates:
679
            if stat in stats:
680
                mname = "{0}.{1}".format(name, stat)
681
                mval = self.calc.per_second(mname, float(stats[stat]), timest)
682
                result.add(plumd.Float(mname, mval))
683
684
    def record_dbs(self, stats, result):
685
        """Record per database metrics into result.
686
687
        :param stats: Dictionary returned from info command
688
        :type stats: dict
689
        :param result: A result object to add metrics to
690
        :type result: ResultSet
691
        """
692
        # db0:keys=1,expires=0,avg_ttl=0
693
        name = self.name
694
        db_fmt = "db{0}"
695
        metric_fmt = "{0}.db.{1}.{2}"
696
697
        for i in xrange(0, len(stats.keys())):
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'xrange'
Loading history...
698
            dbname = db_fmt.format(i)
699
            if dbname not in stats:
700
                break
701
            try:
702
                vals = stats[dbname].split(",")
703
                dbmetrics = dict((k, v)
704
                                 for k, v in (v.split('=') for v in vals))
705
                for key, val in dbmetrics.items():
706
                    metric_str = metric_fmt.format(name, i, key)
707
                    result.add(plumd.Int(metric_str, val))
708
            except KeyError:
709
                self.log.error("Redis: invalid db entry: {0}".format(dbname))
710
711
    def record_slaves(self, stats, result):
712
        """Record slave metrics into result.
713
714
        :param stats: A dictionary returned from info command
715
        :type stats: dict
716
        :param result: A ResultSet object to add metrics to
717
        :type result: ResultSet
718
        """
719
        # slave0:ip=127.0.0.1,port=6399,state=online,offset=239,lag=1
720
        name = self.name
721
        slave_str = "slave{0}"
722
        moffstr = 'master_repl_offset'
723
        moffset = 0
724
        try:
725
            moffset = int(stats[moffstr])
726
        except(TypeError, KeyError):
727
            self.log.error("Redis: no {0} value".format(moffstr))
728
729
        # for each slave entry
730
        for i in xrange(0, len(stats.keys())):
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'xrange'
Loading history...
731
            sname = slave_str.format(i)
732
            if sname not in stats:
733
                break
734
            try:
735
                vals = stats[sname].split(",")
736
                smetrics = dict((k, v)
737
                                for k, v in (v.split('=') for v in vals))
738
                sip = smetrics['ip'].replace(".", "_")
739
                smname = "{0}_{1}".format(sip, smetrics['port'])
740
741
                # record offset and lag
742
                mname = "{0}.slave.{1}.offset".format(name, smname)
743
                soffset = moffset - int(smetrics['offset'])
744
                result.add(plumd.Int(mname, soffset))
745
                mname = "{0}.slave.{1}.lag".format(name, sname)
746
                result.add(plumd.Int(mname, smetrics['lag']))
747
748
                # if slave is online set online = 1, otherwise 0
749
                sonline = 1 if smetrics['state'] == "online" else 0
750
                mname = "{0}.slave.{1}.online".format(name, sname)
751
                result.add(plumd.Int(mname, sonline))
752
            except(TypeError, KeyError, ValueError):
753
                self.log.error("Redis: invalid slave entry: {0}".format(sname))
754
755
    def record_configs(self, result):
756
        """Record the configured configuration values.
757
758
        :param result: A ResultSet to record max mem to.
759
        :type result: plumd.ResultSet
760
        """
761
        configs = self.configs
762
        if not configs:
763
            return
764
        name = self.name
765
        for config in self.client.config_get_multi(configs):
766
            for key, val in config.items():
767
                mstr = "{0}.configs.{1}".format(name, key)
768
                result.add(plumd.Float(mstr, val))
769
770
    def record_sizes(self, result):
771
        """Record the total sizes of the configured keys.
772
773
        For each type of key (list, zset, set, hyperloglog)
774
        scan for a list of keys matching the prefix and record the
775
        total number of items for all keys matching that prefix.
776
777
        :param result: A ResultSet to record into.
778
        :type result: plumd.ResultSet
779
        """
780
        if not self.keys:
781
            return
782
        keys = self.config.get("keys")
783
        if "lists" in keys:
784
            self.record_lists(keys['lists'], result)
785
        if "zsets" in keys:
786
            self.record_zsets(keys['zsets'], result)
787
        if "sets" in keys:
788
            self.record_sets(keys['sets'], result)
789
        if "hlls" in keys:
790
            self.record_hlls(keys['hlls'], result)
791
792
    def record_lists(self, lconfig, result):
793
        """Record the total length of the configured lists.
794
795
        eg. lconfig: {"metric_name": [ "list*", "of*", "globs*"]}
796
797
        :param lconfig: A dict of metric name => globs
798
        :type lconfig: dict
799
        :param result: A ResultSet to record into.
800
        :type result: plumd.ResultSet
801
        """
802
        name = self.name
803
        for mprefix, kprefixes in lconfig.items():
804
            for prefix in kprefixes:
805
                # get the total for this prefix
806
                total = self.client.llen_multi(self.client.scan(prefix))
807
                mstr = "{0}.sizes.lists.{1}".format(name, mprefix)
808
                result.add(plumd.Int(mstr, total))
809
810
    def record_zsets(self, zconfig, result):
811
        """Record the total length of the configured zsets.
812
813
        eg. zconfig: {"metric_name": [ "list*", "of*", "globs*"]}
814
815
        :param zconfig: A dict of metric name => globs
816
        :type zconfig: dict
817
        :param result: A ResultSet to record into.
818
        :type result: plumd.ResultSet
819
        """
820
        name = self.name
821
        for mprefix, kprefixes in zconfig.items():
822
            for prefix in kprefixes:
823
                # get the total for this prefix
824
                total = self.client.zcard_multi(self.client.scan(prefix))
825
                mstr = "{0}.sizes.zset.{1}".format(name, mprefix)
826
                result.add(plumd.Int(mstr, total))
827
828
    def record_sets(self, sconfig, result):
829
        """Record the total length of the configured sets.
830
831
        eg. sconfig: {"metric_name": [ "list*", "of*", "globs*"]}
832
833
        :param sconfig: A dict of metric name => globs
834
        :type sconfig: dict
835
        :param result: A ResultSet to record into.
836
        :type result: plumd.ResultSet
837
        """
838
        name = self.name
839
        for mprefix, kprefixes in sconfig.items():
840
            for prefix in kprefixes:
841
                # get the total for this prefix
842
                total = self.client.scard_multi(self.client.scan(prefix))
843
                mstr = "{0}.sizes.set.{1}".format(name, mprefix)
844
                result.add(plumd.Int(mstr, total))
845
846
    def record_hlls(self, hllconfig, result):
847
        """Record the total length of the configured hlls.
848
849
        eg. sconfig: {"metric_name": [ "list*", "of*", "globs*"]}
850
851
        :param hllconfig: A dict of metric name => globs
852
        :type hllconfig: dict
853
        :param result: A ResultSet to record into.
854
        :type result: plumd.ResultSet
855
        """
856
        name = self.name
857
        for mprefix, kprefixes in hllconfig.items():
858
            for prefix in kprefixes:
859
                # get the total for this prefix
860
                total = self.client.pfcount_multi(self.client.scan(prefix))
861
                mstr = "{0}.sizes.hll.{1}".format(name, mprefix)
862
                result.add(plumd.Int(mstr, total))
863