1
|
|
|
# -*- coding: utf-8 -*- |
2
|
|
|
"""A redis reader plugin with builtin redis client.""" |
3
|
|
|
import sys |
4
|
|
|
import time |
5
|
|
|
import socket |
6
|
|
|
from collections import deque |
7
|
|
|
from collections import defaultdict |
8
|
|
|
|
9
|
|
|
import plumd |
10
|
|
|
from plumd.util import Differential |
11
|
|
|
|
12
|
|
|
__author__ = 'Kenny Freeman' |
13
|
|
|
__email__ = '[email protected]' |
14
|
|
|
__license__ = "ISCL" |
15
|
|
|
__docformat__ = 'reStructuredText' |
16
|
|
|
|
17
|
|
|
PY3 = sys.version_info > (3,) |
18
|
|
|
|
19
|
|
|
|
20
|
|
|
class RedisError(Exception): |
21
|
|
|
"""A generic Redis error.""" |
22
|
|
|
|
23
|
|
|
pass |
24
|
|
|
|
25
|
|
|
|
26
|
|
|
class RedisClient(object): |
27
|
|
|
"""A minimal Redis client.""" |
28
|
|
|
|
29
|
|
|
def __init__(self, log, addr, sfamily, timeout): |
30
|
|
|
"""A minimal Redis client. |
31
|
|
|
|
32
|
|
|
:param log: A logger created from loggging.getLogger |
33
|
|
|
:type log: logging.RootLogger |
34
|
|
|
:param addr: Either a tuple of ('host', port) or a path to a unix socket |
35
|
|
|
:type addr: str or tuple(str, int) |
36
|
|
|
:param sfamily: The socket family eg. socket.AF_INET or AF_UNIX |
37
|
|
|
:type sfamily: int |
38
|
|
|
:param timeout: The timeout in seconds for all socket operations |
39
|
|
|
:type timeout: float or int |
40
|
|
|
""" |
41
|
|
|
self.log = log |
42
|
|
|
self.net = RedisNet(log, addr, sfamily, timeout) |
43
|
|
|
|
44
|
|
|
def info(self, section=None): |
45
|
|
|
"""Return redis info. |
46
|
|
|
|
47
|
|
|
:param section: The info section to request from Redis |
48
|
|
|
:type section: str |
49
|
|
|
:raises RedisError: for any socket related exceptions |
50
|
|
|
:raises RedisError: for unexpcted server responses |
51
|
|
|
:rtype: dict |
52
|
|
|
""" |
53
|
|
|
ret = {} |
54
|
|
|
section = "all" if section is None else section |
55
|
|
|
self.net.send("INFO {0}\r\n".format(section)) |
56
|
|
|
for info_str in RedisResponse(self.net): |
57
|
|
|
for line in info_str.split("\n"): |
58
|
|
|
if not line or line[0] == "#" or line == '\r': |
59
|
|
|
continue |
60
|
|
|
if line.find(":") >= 0: |
61
|
|
|
key, val = line.split(":") |
62
|
|
|
ret[key] = val |
63
|
|
|
return ret |
64
|
|
|
|
65
|
|
|
def config_get_multi(self, globs): |
66
|
|
|
"""Return redis config. |
67
|
|
|
|
68
|
|
|
:param globs: An containing glob search strings |
69
|
|
|
:type globs: iterable |
70
|
|
|
:raises RedisError: for any socket related exceptions |
71
|
|
|
:raises RedisError: for unexpcted server responses |
72
|
|
|
:rtype: dict |
73
|
|
|
""" |
74
|
|
|
for glob_str in globs: |
75
|
|
|
self.net.send("CONFIG GET {0}\r\n".format(glob_str)) |
76
|
|
|
vals = [val.strip() for val in RedisResponse(self.net)] |
77
|
|
|
yield dict(zip(vals[0::2], vals[1::2])) |
78
|
|
|
|
79
|
|
|
def scan(self, prefix, count=None): |
80
|
|
|
"""Return a deque of key names that match the requested prefix. |
81
|
|
|
|
82
|
|
|
:param prefix: The key prefix/glob to search for |
83
|
|
|
:type prefix: str |
84
|
|
|
:param count: The number of keys to request on each iteration |
85
|
|
|
:type count: int |
86
|
|
|
:raises RedisError: for any socket related exceptions |
87
|
|
|
:raises RedisError: for unexpcted server responses |
88
|
|
|
:rtype: deque |
89
|
|
|
""" |
90
|
|
|
scan_cmd = "scan {0} match {1} count {2}\r\n" |
91
|
|
|
count = 10 if count is None else count |
92
|
|
|
cursor = 0 |
93
|
|
|
while True: |
94
|
|
|
# send scan request |
95
|
|
|
self.net.send(scan_cmd.format(cursor, prefix, count)) |
96
|
|
|
# response is the next cursor followed by a list of matches |
97
|
|
|
resp = RedisResponse(self.net) |
98
|
|
|
cursor = int(next(resp)) |
99
|
|
|
for key in resp: |
100
|
|
|
yield key.strip() |
101
|
|
|
if cursor == 0: |
102
|
|
|
break |
103
|
|
|
|
104
|
|
|
def llen_multi(self, keys): |
105
|
|
|
"""Return the total length of each key provided. |
106
|
|
|
|
107
|
|
|
:param keys: The iterable of keys to return the total length of. |
108
|
|
|
:type keys: iterable |
109
|
|
|
:raises RedisError: for any socket related exceptions |
110
|
|
|
:raises RedisError: for unexpcted server responses |
111
|
|
|
:rtype: int |
112
|
|
|
""" |
113
|
|
|
llen_cmd = "llen {0}\r\n" |
114
|
|
|
total = 0 |
115
|
|
|
for key in keys: |
116
|
|
|
# send scan request |
117
|
|
|
self.net.send(llen_cmd.format(key)) |
118
|
|
|
# response should just be an int |
119
|
|
|
resp = RedisResponse(self.net) |
120
|
|
|
total += int(next(resp)) |
121
|
|
|
return total |
122
|
|
|
|
123
|
|
|
def zcard_multi(self, keys): |
124
|
|
|
"""Return the total cardinality of each key provided. |
125
|
|
|
|
126
|
|
|
:param keys: The iterable of keys to return the total cardinality of. |
127
|
|
|
:type keys: iterable |
128
|
|
|
:raises RedisError: for any socket related exceptions |
129
|
|
|
:raises RedisError: for unexpcted server responses |
130
|
|
|
:rtype: int |
131
|
|
|
""" |
132
|
|
|
zcard_cmd = "zcard {0}\r\n" |
133
|
|
|
total = 0 |
134
|
|
|
for key in keys: |
135
|
|
|
# send scan request |
136
|
|
|
self.net.send(zcard_cmd.format(key)) |
137
|
|
|
# response should just be an int |
138
|
|
|
resp = RedisResponse(self.net) |
139
|
|
|
total += int(next(resp)) |
140
|
|
|
return total |
141
|
|
|
|
142
|
|
|
def scard_multi(self, keys): |
143
|
|
|
"""Return the total cardinality of each key provided. |
144
|
|
|
|
145
|
|
|
:param keys: The iterable of keys to return the total cardinality of. |
146
|
|
|
:type keys: iterable |
147
|
|
|
:raises RedisError: for any socket related exceptions |
148
|
|
|
:raises RedisError: for unexpcted server responses |
149
|
|
|
:rtype: int |
150
|
|
|
""" |
151
|
|
|
scard_cmd = "scard {0}\r\n" |
152
|
|
|
total = 0 |
153
|
|
|
for key in keys: |
154
|
|
|
# send scan request |
155
|
|
|
self.net.send(scard_cmd.format(key)) |
156
|
|
|
# response should just be an int |
157
|
|
|
resp = RedisResponse(self.net) |
158
|
|
|
total += int(next(resp)) |
159
|
|
|
return total |
160
|
|
|
|
161
|
|
|
def pfcount_multi(self, keys): |
162
|
|
|
"""Return the total cardinality of each key provided. |
163
|
|
|
|
164
|
|
|
:param keys: The iterable of keys to return the total cardinality of. |
165
|
|
|
:type keys: iterable |
166
|
|
|
:raises RedisError: for any socket related exceptions |
167
|
|
|
:raises RedisError: for unexpcted server responses |
168
|
|
|
:rtype: int |
169
|
|
|
""" |
170
|
|
|
pfcount_cmd = "pfcount {0}\r\n" |
171
|
|
|
total = 0 |
172
|
|
|
for key in keys: |
173
|
|
|
# send scan request |
174
|
|
|
self.net.send(pfcount_cmd.format(key)) |
175
|
|
|
# response should just be an int |
176
|
|
|
resp = RedisResponse(self.net) |
177
|
|
|
total += int(next(resp)) |
178
|
|
|
return total |
179
|
|
|
|
180
|
|
|
|
181
|
|
|
class RedisResponse(object): |
182
|
|
|
"""An iterable of redis command responses.""" |
183
|
|
|
|
184
|
|
|
def __init__(self, reader): |
185
|
|
|
"""An iterable of redis command responses. |
186
|
|
|
|
187
|
|
|
:param reader: A RedisNet reader instance |
188
|
|
|
:type reader: RedisNet |
189
|
|
|
|
190
|
|
|
:raises RedisError: for any socket related errors |
191
|
|
|
:raises RedisError: for any unknown responses |
192
|
|
|
:raises RedisError: for any Redis Errors returned |
193
|
|
|
:raises RedisError: for any ValueErrors encountered when casting |
194
|
|
|
""" |
195
|
|
|
self.reader = reader |
196
|
|
|
# handlers consume responses and add them to self.vals |
197
|
|
|
self.func = defaultdict(lambda: RedisResponse.h_unknown) |
198
|
|
|
self.func["*"] = lambda buff: self.parse(int(buff)) |
199
|
|
|
self.func["+"] = lambda buff: self.vals.append(str(buff)) |
200
|
|
|
# remove the \r from the string |
201
|
|
|
self.func["$"] = lambda buff: \ |
202
|
|
|
self.vals.append(self.reader.readnbytes(int(buff) + 2)) |
203
|
|
|
self.func[":"] = lambda buff: self.vals.append(int(buff)) |
204
|
|
|
self.func["-"] = RedisResponse.h_error |
205
|
|
|
self.vals = deque() |
206
|
|
|
self.parse() |
207
|
|
|
|
208
|
|
|
def parse(self, nitems=None): |
209
|
|
|
"""Read the full response from self.sock. |
210
|
|
|
|
211
|
|
|
:raises RedisError: for any socket related Exceptions |
212
|
|
|
:raises RedisError: for any unknown types read |
213
|
|
|
:raises RedisError: for any redis protocol errors |
214
|
|
|
:rtype: varies |
215
|
|
|
""" |
216
|
|
|
nitems = 1 if nitems is None else nitems |
217
|
|
|
for i in xrange(nitems): |
|
|
|
|
218
|
|
|
try: |
219
|
|
|
buff = self.reader.readline() |
220
|
|
|
self.func[buff[0]](buff[1:]) |
221
|
|
|
except (ValueError, IndexError) as exc: |
222
|
|
|
msg = "could not parse response: {0}: {1}" |
223
|
|
|
raise RedisError(msg.format(buff, exc)) |
224
|
|
|
|
225
|
|
|
@staticmethod |
226
|
|
|
def h_unknown(buff): |
227
|
|
|
"""Uknown response handler. |
228
|
|
|
|
229
|
|
|
:param buff: A response buffer read from Redis |
230
|
|
|
:type buff: str |
231
|
|
|
:raises RedisError: this function always raises a RedisError |
232
|
|
|
""" |
233
|
|
|
raise RedisError("unknown command: {0}".format(buff)) |
234
|
|
|
|
235
|
|
|
@staticmethod |
236
|
|
|
def h_error(buff): |
237
|
|
|
"""Raise a RedisError with unknown command buff. |
238
|
|
|
|
239
|
|
|
:param buff: A response buffer read from Redis |
240
|
|
|
:type buff: str |
241
|
|
|
:raises RedisError: on any socket related exceptions |
242
|
|
|
:rtype: str |
243
|
|
|
""" |
244
|
|
|
msg = "RedisResponse: h_error({0})" |
245
|
|
|
raise RedisError(msg.format(buff)) |
246
|
|
|
|
247
|
|
|
def __iter__(self): |
248
|
|
|
"""A Redis command response iterator. |
249
|
|
|
|
250
|
|
|
:rtype: iterator |
251
|
|
|
""" |
252
|
|
|
return self |
253
|
|
|
|
254
|
|
|
def __next__(self): |
255
|
|
|
"""Return the next response, if any. |
256
|
|
|
|
257
|
|
|
:rtype: object |
258
|
|
|
""" |
259
|
|
|
if not self.vals: |
260
|
|
|
raise StopIteration() |
261
|
|
|
return self.vals.popleft() |
262
|
|
|
|
263
|
|
|
def next(self): |
264
|
|
|
"""Return the next response, if any. |
265
|
|
|
|
266
|
|
|
:rtype: object |
267
|
|
|
""" |
268
|
|
|
if not self.vals: |
269
|
|
|
raise StopIteration() |
270
|
|
|
return self.vals.popleft() |
271
|
|
|
|
272
|
|
|
|
273
|
|
|
class RedisNet(object): |
274
|
|
|
"""A helper class that talks to Redis on a unix/tcp socket.""" |
275
|
|
|
|
276
|
|
|
BUFF_LEN = 8192 |
277
|
|
|
|
278
|
|
|
def __init__(self, log, addr, sfamily, timeout): |
279
|
|
|
"""A helper class that talks to Redis on a unix/tcp socket. |
280
|
|
|
|
281
|
|
|
:param log: A logger created from loggging.getLogger |
282
|
|
|
:type log: logging.RootLogger |
283
|
|
|
:param addr: Either a tuple of ('host', port) or a path to a unix socket |
284
|
|
|
:type addr: str or tuple(str, int) |
285
|
|
|
:param sfamily: The socket family eg. socket.AF_INET or AF_UNIX |
286
|
|
|
:type sfamily: int |
287
|
|
|
:param timeout: The timeout in seconds for all socket operations |
288
|
|
|
:type timeout: float or int |
289
|
|
|
""" |
290
|
|
|
self.log = log |
291
|
|
|
# addr can be unix socket or (host, port) tuple |
292
|
|
|
self.addr = addr |
293
|
|
|
# socket.AF_INET or socket.AF_UNIX |
294
|
|
|
self.sfamily = sfamily |
295
|
|
|
# all socket operations timeout |
296
|
|
|
self.timeout = timeout |
297
|
|
|
self.sock = None |
298
|
|
|
# read from our socket into this buffer |
299
|
|
|
# keep an index in the buffer that we've read up to |
300
|
|
|
# and record the total number of bytes in the buffer |
301
|
|
|
self.buff = "" |
302
|
|
|
self.buff_end = -1 |
303
|
|
|
self.buff_i = -1 |
304
|
|
|
|
305
|
|
View Code Duplication |
def connect(self): |
306
|
|
|
"""Connect to Redis. |
307
|
|
|
|
308
|
|
|
:raises RedisError: for any socket related exceptions |
309
|
|
|
:rtype: Exception or None |
310
|
|
|
""" |
311
|
|
|
if self.sock: |
312
|
|
|
self.disconnect() |
313
|
|
|
try: |
314
|
|
|
# create the socket |
315
|
|
|
self.sock = socket.socket(self.sfamily, socket.SOCK_STREAM) |
316
|
|
|
# set timeout for socket operations |
317
|
|
|
self.sock.settimeout(self.timeout) |
318
|
|
|
self.sock.connect(self.addr) |
319
|
|
|
msg = "RedisNet: connected: {0}" |
320
|
|
|
self.log.info(msg.format(self.addr)) |
321
|
|
|
except Exception as exc: |
322
|
|
|
msg = "RedisNet: Exception during connect: {0}" |
323
|
|
|
self.log.error(msg.format(exc)) |
324
|
|
|
raise RedisError(msg.format(exc)) |
325
|
|
|
return True |
326
|
|
|
|
327
|
|
|
def disconnect(self): |
328
|
|
|
"""Disconnect from Redis. |
329
|
|
|
|
330
|
|
|
:raises RedisError: for any socket related exceptions |
331
|
|
|
""" |
332
|
|
|
self.log.debug("RedisNet: disconnect") |
333
|
|
|
if self.sock: |
334
|
|
|
try: |
335
|
|
|
self.sock.close() |
336
|
|
|
self.sock = None |
337
|
|
|
except Exception as exc: |
338
|
|
|
msg = "RedisNet: exception during disconnect: {0}" |
339
|
|
|
self.log.error(msg.format(exc)) |
340
|
|
|
raise RedisError(msg.format(exc)) |
341
|
|
|
|
342
|
|
|
def read(self): |
343
|
|
|
"""Read RedisNet.BUFF_LEN bytes from our socket into self.buff. |
344
|
|
|
|
345
|
|
|
Calls here overwrite self.buff and reset self.buff_i and |
346
|
|
|
self.buff_end. |
347
|
|
|
|
348
|
|
|
:raises RedisError: for any socket related exceptions |
349
|
|
|
""" |
350
|
|
|
if not self.sock and not self.connect(): |
351
|
|
|
msg = "RedisNet: unable to connect to: {0}" |
352
|
|
|
raise RedisError(msg.format(self.addr)) |
353
|
|
|
|
354
|
|
|
try: |
355
|
|
|
self.buff = self.sock.recv(RedisNet.BUFF_LEN) |
356
|
|
|
self.buff_end = len(self.buff) |
357
|
|
|
self.buff_i = 0 |
358
|
|
|
except Exception as exc: |
359
|
|
|
msg = "RedisNet: Exception during readline: {0}" |
360
|
|
|
self.log.error(msg.format(exc)) |
361
|
|
|
self.disconnect() |
362
|
|
|
raise RedisError(msg.format(exc)) |
363
|
|
|
|
364
|
|
|
def recv(self, nbytes): |
365
|
|
|
"""Read nbytes from our socket and return it. |
366
|
|
|
|
367
|
|
|
:param nbytes: The number of bytes to read |
368
|
|
|
:type nbytes: int |
369
|
|
|
:raises RedisError: for any socket related exceptions |
370
|
|
|
:rytpe: str |
371
|
|
|
""" |
372
|
|
|
if not self.sock and not self.connect(): |
373
|
|
|
msg = "RedisNet: unable to connect to: {0}" |
374
|
|
|
raise RedisError(msg.format(self.addr)) |
375
|
|
|
|
376
|
|
|
ret = "" |
377
|
|
|
try: |
378
|
|
|
ret = self.sock.recv(nbytes) |
379
|
|
|
except Exception as exc: |
380
|
|
|
msg = "RedisNet: Exception during recv: {0}" |
381
|
|
|
self.log.error(msg.format(exc)) |
382
|
|
|
self.disconnect() |
383
|
|
|
raise RedisError(msg.format(exc)) |
384
|
|
|
return ret |
385
|
|
|
|
386
|
|
|
def readline(self): |
387
|
|
|
"""Get the next available line. |
388
|
|
|
|
389
|
|
|
:raises RedisError: for any socket related exceptions |
390
|
|
|
:rytpe: str |
391
|
|
|
""" |
392
|
|
|
if not self.sock and not self.connect(): |
393
|
|
|
msg = "RedisNet: unable to connect to: {0}" |
394
|
|
|
raise RedisError(msg.format(self.addr)) |
395
|
|
|
|
396
|
|
|
buffs = deque() |
397
|
|
|
while True: |
398
|
|
|
# do we have any data available? |
399
|
|
|
if self.buff_end < 0 or self.buff_i >= self.buff_end: |
400
|
|
|
# read data, reset buffer state |
401
|
|
|
while self.buff_end < 1: |
402
|
|
|
self.read() |
403
|
|
|
# now we have data, do we have a newline? |
404
|
|
|
i = self.buff[self.buff_i:].find("\n") |
405
|
|
|
if i > -1: |
406
|
|
|
# return line, advance buffer past it |
407
|
|
|
# move i past the newline |
408
|
|
|
# also need to find |
409
|
|
|
buff_i = self.buff_i |
410
|
|
|
buffs.append(self.buff[buff_i:buff_i+i]) |
411
|
|
|
# advance beyond i |
412
|
|
|
self.buff_i = buff_i + i + 1 |
413
|
|
|
# reset if we have no buffer left |
414
|
|
|
if self.buff_i >= self.buff_end: |
415
|
|
|
self.buff_i = -1 |
416
|
|
|
self.buff_end = -1 |
417
|
|
|
break |
418
|
|
|
# no newline yet, record and keep reading |
419
|
|
|
buffs.append(self.buff[self.buff_i:]) |
420
|
|
|
self.buff_end = -1 |
421
|
|
|
self.buff_i = -1 |
422
|
|
|
ret = "".join(buffs) |
423
|
|
|
return ret |
424
|
|
|
|
425
|
|
|
def readnbytes(self, nbytes): |
426
|
|
|
"""Read nbytes from our socket. |
427
|
|
|
|
428
|
|
|
:param nbytes: The number of bytes to read |
429
|
|
|
:type nbytes: int |
430
|
|
|
:raises RedisError: for any socket related exceptions |
431
|
|
|
:rytpe: str |
432
|
|
|
""" |
433
|
|
|
|
434
|
|
|
# any bytes in our buffer? |
435
|
|
|
ret = "" |
436
|
|
|
buffs = deque() |
437
|
|
|
if self.buff_end and self.buff_i < self.buff_end: |
438
|
|
|
# do we have enough buffer to fullfill the request? |
439
|
|
|
nbytes_left = self.buff_end - self.buff_i |
440
|
|
|
if nbytes_left >= nbytes: |
441
|
|
|
# yes, advance our pointer |
442
|
|
|
buffi = self.buff_i |
443
|
|
|
buffs.append(self.buff[buffi:buffi+nbytes]) |
444
|
|
|
self.buff_i += nbytes |
445
|
|
|
nbytes = 0 |
446
|
|
|
else: |
447
|
|
|
# no, consume all of the buffer and then get remaining |
448
|
|
|
buffs.append(self.buff[self.buff_i:]) |
449
|
|
|
# reset so next access on buffer forces a read |
450
|
|
|
self.buff_i = -1 |
451
|
|
|
self.buff_end = -1 |
452
|
|
|
nbytes -= nbytes_left |
453
|
|
|
# do we need more bytes? |
454
|
|
|
if nbytes: |
455
|
|
|
# just do a recv - don't use our buffer |
456
|
|
|
buffs.append(self.recv(nbytes)) |
457
|
|
|
|
458
|
|
|
# join the buffers |
459
|
|
|
ret = "".join(buffs) |
460
|
|
|
return ret |
461
|
|
|
|
462
|
|
|
def send(self, cmd): |
463
|
|
|
"""Send the supplied string to the redis server. |
464
|
|
|
|
465
|
|
|
:param cmd: The string to send to the redis server |
466
|
|
|
:type cmd: str |
467
|
|
|
:raises RedisError: for any socket related exceptions |
468
|
|
|
""" |
469
|
|
|
if not self.sock and not self.connect(): |
470
|
|
|
msg = "RedisNet: unable to connect to: {0}" |
471
|
|
|
raise RedisError(msg.format(self.addr)) |
472
|
|
|
# send info request |
473
|
|
|
try: |
474
|
|
|
self.sock.sendall(cmd) |
475
|
|
|
except Exception as exc: |
476
|
|
|
msg = "RedisNet: exception sending to server: {0}" |
477
|
|
|
self.log.error(msg.format(exc)) |
478
|
|
|
self.disconnect() |
479
|
|
|
raise RedisError(msg.format(exc)) |
480
|
|
|
|
481
|
|
|
|
482
|
|
|
class Redis(plumd.Reader): |
483
|
|
|
"""Plugin to record redis metrics.""" |
484
|
|
|
|
485
|
|
|
# default config values |
486
|
|
|
defaults = { |
487
|
|
|
'poll.interval': 10, |
488
|
|
|
'gauges': [ |
489
|
|
|
"aof_current_rewrite_time_sec", |
490
|
|
|
"aof_enabled", |
491
|
|
|
"aof_last_rewrite_time_sec", |
492
|
|
|
"aof_rewrite_in_progress", |
493
|
|
|
"aof_rewrite_scheduled", |
494
|
|
|
"blocked_clients", |
495
|
|
|
"client_biggest_input_buf", |
496
|
|
|
"client_longest_output_list", |
497
|
|
|
"connected_clients", |
498
|
|
|
"connected_slaves", |
499
|
|
|
"evicted_keys", |
500
|
|
|
"expired_keys", |
501
|
|
|
"instantaneous_input_kbps", |
502
|
|
|
"instantaneous_ops_per_sec", |
503
|
|
|
"instantaneous_output_kbps", |
504
|
|
|
"keyspace_hits", |
505
|
|
|
"keyspace_misses", |
506
|
|
|
"latest_fork_usec", |
507
|
|
|
"loading", |
508
|
|
|
"master_repl_offset", |
509
|
|
|
"mem_fragmentation_ratio", |
510
|
|
|
"pubsub_channels", |
511
|
|
|
"pubsub_patterns", |
512
|
|
|
"rdb_bgsave_in_progress", |
513
|
|
|
"rdb_changes_since_last_save", |
514
|
|
|
"rdb_current_bgsave_time_sec", |
515
|
|
|
"rdb_last_bgsave_time_sec", |
516
|
|
|
"rdb_last_save_time", |
517
|
|
|
"rejected_connections", |
518
|
|
|
"repl_backlog_active", |
519
|
|
|
"repl_backlog_first_byte_offset", |
520
|
|
|
"repl_backlog_histlen", |
521
|
|
|
"repl_backlog_size", |
522
|
|
|
"sync_full", |
523
|
|
|
"sync_partial_err", |
524
|
|
|
"sync_partial_ok", |
525
|
|
|
"total_commands_processed", |
526
|
|
|
"total_connections_received", |
527
|
|
|
"total_net_input_bytes", |
528
|
|
|
"total_net_output_bytes", |
529
|
|
|
"uptime_in_days", |
530
|
|
|
"uptime_in_seconds", |
531
|
|
|
"used_cpu_sys", |
532
|
|
|
"used_cpu_sys_children", |
533
|
|
|
"used_cpu_user", |
534
|
|
|
"used_cpu_user_children", |
535
|
|
|
"used_memory", |
536
|
|
|
"used_memory_lua", |
537
|
|
|
"used_memory_peak", |
538
|
|
|
"used_memory_rss", |
539
|
|
|
"master_last_io_seconds_ago", |
540
|
|
|
"master_sync_in_progress", |
541
|
|
|
"slave_repl_offset", |
542
|
|
|
"slave_priority", |
543
|
|
|
"slave_read_only", |
544
|
|
|
"connected_slaves", |
545
|
|
|
"master_repl_offset", |
546
|
|
|
"repl_backlog_active", |
547
|
|
|
"repl_backlog_size", |
548
|
|
|
"repl_backlog_first_byte_offset", |
549
|
|
|
"repl_backlog_histlen" |
550
|
|
|
"connected_slaves" |
551
|
|
|
], |
552
|
|
|
'rates': [], |
553
|
|
|
'configs': [ |
554
|
|
|
'maxmemory' |
555
|
|
|
], |
556
|
|
|
'keys': { |
557
|
|
|
# 'type': { metric_prefix: [key_prefix*, ...] } |
558
|
|
|
'lists': {}, |
559
|
|
|
'zsets': {}, |
560
|
|
|
'sets': {}, |
561
|
|
|
'hlls': {} |
562
|
|
|
}, |
563
|
|
|
'addr': '127.0.0.1:6379', |
564
|
|
|
'addr_type': 'inet', |
565
|
|
|
'timeout': 10 |
566
|
|
|
} |
567
|
|
|
|
568
|
|
|
def __init__(self, log, config): |
569
|
|
|
"""Plugin to record redis metrics. |
570
|
|
|
|
571
|
|
|
:param log: A logger |
572
|
|
|
:type log: logging.RootLogger |
573
|
|
|
:param config: a plumd.config.Conf configuration helper instance. |
574
|
|
|
:type config: plumd.config.Conf |
575
|
|
|
""" |
576
|
|
|
super(Redis, self).__init__(log, config) |
577
|
|
|
self.config.defaults(Redis.defaults) |
578
|
|
|
|
579
|
|
|
# metrics to record |
580
|
|
|
self.gauges = self.config.get('gauges') |
581
|
|
|
self.rates = self.config.get('rates') |
582
|
|
|
self.configs = self.config.get('configs') |
583
|
|
|
self.keys = self.config.get('keys') |
584
|
|
|
|
585
|
|
|
# Redis connection - either unix socket or tcp |
586
|
|
|
addr = self.config.get('addr') |
587
|
|
|
addr_type = self.config.get('addr_type').lower() |
588
|
|
|
if addr_type == "unix": |
589
|
|
|
sfamily = socket.AF_UNIX |
590
|
|
|
elif addr_type == "inet": |
591
|
|
|
try: |
592
|
|
|
host, port = addr.split(":") |
593
|
|
|
except AttributeError: |
594
|
|
|
msg = "Redis: invalid address: {0}, (host:port)" |
595
|
|
|
raise plumd.ConfigError(msg.format(addr)) |
596
|
|
|
addr = (host, int(port)) |
597
|
|
|
sfamily = socket.AF_INET |
598
|
|
|
else: |
599
|
|
|
msg = "Redis: unsupported connection type: {0} (unix, inet)" |
600
|
|
|
raise plumd.ConfigError(msg.format(addr_type)) |
601
|
|
|
timeout = config.get('timeout') |
602
|
|
|
self.client = RedisClient(self.log, addr, sfamily, timeout) |
603
|
|
|
self.calc = Differential() |
604
|
|
|
|
605
|
|
|
def poll(self): |
606
|
|
|
"""Query Redis for metrics. |
607
|
|
|
|
608
|
|
|
:rtype: ResultSet |
609
|
|
|
""" |
610
|
|
|
# catch exceptions - simply skip the poll on error |
611
|
|
|
try: |
612
|
|
|
result = plumd.Result("redis") |
613
|
|
|
|
614
|
|
|
# config values |
615
|
|
|
self.record_configs(result) |
616
|
|
|
|
617
|
|
|
# key sizes |
618
|
|
|
self.record_sizes(result) |
619
|
|
|
|
620
|
|
|
# get server metrics |
621
|
|
|
stats = self.client.info() |
622
|
|
|
|
623
|
|
|
# record gauges, rates |
624
|
|
|
self.record_metrics(stats, result) |
625
|
|
|
|
626
|
|
|
# replication, if any slaves are connected |
627
|
|
|
if "slave0" in stats: |
628
|
|
|
self.record_slaves(stats, result) |
629
|
|
|
|
630
|
|
|
# db metrics, maxmem |
631
|
|
|
self.record_dbs(stats, result) |
632
|
|
|
|
633
|
|
|
# record lists, zsets, sets and hll sizes |
634
|
|
|
self.record_sizes(result) |
635
|
|
|
|
636
|
|
|
# and finally command stats - if available |
637
|
|
|
self.record_cmdstats(result) |
638
|
|
|
|
639
|
|
|
except RedisError as exc: |
640
|
|
|
msg = "Redis: exception during poll: {0}" |
641
|
|
|
self.log.error(msg.format(exc)) |
642
|
|
|
return plumd.ResultSet([result]) |
643
|
|
|
|
644
|
|
|
def record_cmdstats(self, result): |
645
|
|
|
"""Record the stats from info commandstats. |
646
|
|
|
|
647
|
|
|
:param result: A result object to add metrics to |
648
|
|
|
:type result: ResultSet |
649
|
|
|
""" |
650
|
|
|
name = self.name |
651
|
|
|
infos = self.client.info("commandstats") |
652
|
|
|
for key in sorted(infos.keys()): |
653
|
|
|
vals = infos[key].split(",") |
654
|
|
|
cstat, cname = key.split("_") |
655
|
|
|
for val in vals: |
656
|
|
|
mname, mval = val.split("=") |
657
|
|
|
metric = "{0}.{1}.{2}.{3}".format(name, cstat, cname, mname) |
658
|
|
|
result.add(plumd.Float(metric, mval)) |
659
|
|
|
|
660
|
|
|
def record_metrics(self, stats, result): |
661
|
|
|
"""Record the configured gauges and metrics. |
662
|
|
|
|
663
|
|
|
:param stats: Dictionary returned from info command |
664
|
|
|
:type stats: dict |
665
|
|
|
:param result: A result object to add metrics to |
666
|
|
|
:type result: ResultSet |
667
|
|
|
""" |
668
|
|
|
timest = time.time() |
669
|
|
|
name = self.name |
670
|
|
|
|
671
|
|
|
# record gauges |
672
|
|
|
for stat in self.gauges: |
673
|
|
|
if stat in stats: |
674
|
|
|
mname = "{0}.{1}".format(name, stat) |
675
|
|
|
result.add(plumd.Float(mname, stats[stat])) |
676
|
|
|
|
677
|
|
|
# record rates |
678
|
|
|
for stat in self.rates: |
679
|
|
|
if stat in stats: |
680
|
|
|
mname = "{0}.{1}".format(name, stat) |
681
|
|
|
mval = self.calc.per_second(mname, float(stats[stat]), timest) |
682
|
|
|
result.add(plumd.Float(mname, mval)) |
683
|
|
|
|
684
|
|
|
def record_dbs(self, stats, result): |
685
|
|
|
"""Record per database metrics into result. |
686
|
|
|
|
687
|
|
|
:param stats: Dictionary returned from info command |
688
|
|
|
:type stats: dict |
689
|
|
|
:param result: A result object to add metrics to |
690
|
|
|
:type result: ResultSet |
691
|
|
|
""" |
692
|
|
|
# db0:keys=1,expires=0,avg_ttl=0 |
693
|
|
|
name = self.name |
694
|
|
|
db_fmt = "db{0}" |
695
|
|
|
metric_fmt = "{0}.db.{1}.{2}" |
696
|
|
|
|
697
|
|
|
for i in xrange(0, len(stats.keys())): |
|
|
|
|
698
|
|
|
dbname = db_fmt.format(i) |
699
|
|
|
if dbname not in stats: |
700
|
|
|
break |
701
|
|
|
try: |
702
|
|
|
vals = stats[dbname].split(",") |
703
|
|
|
dbmetrics = dict((k, v) |
704
|
|
|
for k, v in (v.split('=') for v in vals)) |
705
|
|
|
for key, val in dbmetrics.items(): |
706
|
|
|
metric_str = metric_fmt.format(name, i, key) |
707
|
|
|
result.add(plumd.Int(metric_str, val)) |
708
|
|
|
except KeyError: |
709
|
|
|
self.log.error("Redis: invalid db entry: {0}".format(dbname)) |
710
|
|
|
|
711
|
|
|
def record_slaves(self, stats, result): |
712
|
|
|
"""Record slave metrics into result. |
713
|
|
|
|
714
|
|
|
:param stats: A dictionary returned from info command |
715
|
|
|
:type stats: dict |
716
|
|
|
:param result: A ResultSet object to add metrics to |
717
|
|
|
:type result: ResultSet |
718
|
|
|
""" |
719
|
|
|
# slave0:ip=127.0.0.1,port=6399,state=online,offset=239,lag=1 |
720
|
|
|
name = self.name |
721
|
|
|
slave_str = "slave{0}" |
722
|
|
|
moffstr = 'master_repl_offset' |
723
|
|
|
moffset = 0 |
724
|
|
|
try: |
725
|
|
|
moffset = int(stats[moffstr]) |
726
|
|
|
except(TypeError, KeyError): |
727
|
|
|
self.log.error("Redis: no {0} value".format(moffstr)) |
728
|
|
|
|
729
|
|
|
# for each slave entry |
730
|
|
|
for i in xrange(0, len(stats.keys())): |
|
|
|
|
731
|
|
|
sname = slave_str.format(i) |
732
|
|
|
if sname not in stats: |
733
|
|
|
break |
734
|
|
|
try: |
735
|
|
|
vals = stats[sname].split(",") |
736
|
|
|
smetrics = dict((k, v) |
737
|
|
|
for k, v in (v.split('=') for v in vals)) |
738
|
|
|
sip = smetrics['ip'].replace(".", "_") |
739
|
|
|
smname = "{0}_{1}".format(sip, smetrics['port']) |
740
|
|
|
|
741
|
|
|
# record offset and lag |
742
|
|
|
mname = "{0}.slave.{1}.offset".format(name, smname) |
743
|
|
|
soffset = moffset - int(smetrics['offset']) |
744
|
|
|
result.add(plumd.Int(mname, soffset)) |
745
|
|
|
mname = "{0}.slave.{1}.lag".format(name, sname) |
746
|
|
|
result.add(plumd.Int(mname, smetrics['lag'])) |
747
|
|
|
|
748
|
|
|
# if slave is online set online = 1, otherwise 0 |
749
|
|
|
sonline = 1 if smetrics['state'] == "online" else 0 |
750
|
|
|
mname = "{0}.slave.{1}.online".format(name, sname) |
751
|
|
|
result.add(plumd.Int(mname, sonline)) |
752
|
|
|
except(TypeError, KeyError, ValueError): |
753
|
|
|
self.log.error("Redis: invalid slave entry: {0}".format(sname)) |
754
|
|
|
|
755
|
|
|
def record_configs(self, result): |
756
|
|
|
"""Record the configured configuration values. |
757
|
|
|
|
758
|
|
|
:param result: A ResultSet to record max mem to. |
759
|
|
|
:type result: plumd.ResultSet |
760
|
|
|
""" |
761
|
|
|
configs = self.configs |
762
|
|
|
if not configs: |
763
|
|
|
return |
764
|
|
|
name = self.name |
765
|
|
|
for config in self.client.config_get_multi(configs): |
766
|
|
|
for key, val in config.items(): |
767
|
|
|
mstr = "{0}.configs.{1}".format(name, key) |
768
|
|
|
result.add(plumd.Float(mstr, val)) |
769
|
|
|
|
770
|
|
|
def record_sizes(self, result): |
771
|
|
|
"""Record the total sizes of the configured keys. |
772
|
|
|
|
773
|
|
|
For each type of key (list, zset, set, hyperloglog) |
774
|
|
|
scan for a list of keys matching the prefix and record the |
775
|
|
|
total number of items for all keys matching that prefix. |
776
|
|
|
|
777
|
|
|
:param result: A ResultSet to record into. |
778
|
|
|
:type result: plumd.ResultSet |
779
|
|
|
""" |
780
|
|
|
if not self.keys: |
781
|
|
|
return |
782
|
|
|
keys = self.config.get("keys") |
783
|
|
|
if "lists" in keys: |
784
|
|
|
self.record_lists(keys['lists'], result) |
785
|
|
|
if "zsets" in keys: |
786
|
|
|
self.record_zsets(keys['zsets'], result) |
787
|
|
|
if "sets" in keys: |
788
|
|
|
self.record_sets(keys['sets'], result) |
789
|
|
|
if "hlls" in keys: |
790
|
|
|
self.record_hlls(keys['hlls'], result) |
791
|
|
|
|
792
|
|
|
def record_lists(self, lconfig, result): |
793
|
|
|
"""Record the total length of the configured lists. |
794
|
|
|
|
795
|
|
|
eg. lconfig: {"metric_name": [ "list*", "of*", "globs*"]} |
796
|
|
|
|
797
|
|
|
:param lconfig: A dict of metric name => globs |
798
|
|
|
:type lconfig: dict |
799
|
|
|
:param result: A ResultSet to record into. |
800
|
|
|
:type result: plumd.ResultSet |
801
|
|
|
""" |
802
|
|
|
name = self.name |
803
|
|
|
for mprefix, kprefixes in lconfig.items(): |
804
|
|
|
for prefix in kprefixes: |
805
|
|
|
# get the total for this prefix |
806
|
|
|
total = self.client.llen_multi(self.client.scan(prefix)) |
807
|
|
|
mstr = "{0}.sizes.lists.{1}".format(name, mprefix) |
808
|
|
|
result.add(plumd.Int(mstr, total)) |
809
|
|
|
|
810
|
|
|
def record_zsets(self, zconfig, result): |
811
|
|
|
"""Record the total length of the configured zsets. |
812
|
|
|
|
813
|
|
|
eg. zconfig: {"metric_name": [ "list*", "of*", "globs*"]} |
814
|
|
|
|
815
|
|
|
:param zconfig: A dict of metric name => globs |
816
|
|
|
:type zconfig: dict |
817
|
|
|
:param result: A ResultSet to record into. |
818
|
|
|
:type result: plumd.ResultSet |
819
|
|
|
""" |
820
|
|
|
name = self.name |
821
|
|
|
for mprefix, kprefixes in zconfig.items(): |
822
|
|
|
for prefix in kprefixes: |
823
|
|
|
# get the total for this prefix |
824
|
|
|
total = self.client.zcard_multi(self.client.scan(prefix)) |
825
|
|
|
mstr = "{0}.sizes.zset.{1}".format(name, mprefix) |
826
|
|
|
result.add(plumd.Int(mstr, total)) |
827
|
|
|
|
828
|
|
|
def record_sets(self, sconfig, result): |
829
|
|
|
"""Record the total length of the configured sets. |
830
|
|
|
|
831
|
|
|
eg. sconfig: {"metric_name": [ "list*", "of*", "globs*"]} |
832
|
|
|
|
833
|
|
|
:param sconfig: A dict of metric name => globs |
834
|
|
|
:type sconfig: dict |
835
|
|
|
:param result: A ResultSet to record into. |
836
|
|
|
:type result: plumd.ResultSet |
837
|
|
|
""" |
838
|
|
|
name = self.name |
839
|
|
|
for mprefix, kprefixes in sconfig.items(): |
840
|
|
|
for prefix in kprefixes: |
841
|
|
|
# get the total for this prefix |
842
|
|
|
total = self.client.scard_multi(self.client.scan(prefix)) |
843
|
|
|
mstr = "{0}.sizes.set.{1}".format(name, mprefix) |
844
|
|
|
result.add(plumd.Int(mstr, total)) |
845
|
|
|
|
846
|
|
|
def record_hlls(self, hllconfig, result): |
847
|
|
|
"""Record the total length of the configured hlls. |
848
|
|
|
|
849
|
|
|
eg. sconfig: {"metric_name": [ "list*", "of*", "globs*"]} |
850
|
|
|
|
851
|
|
|
:param hllconfig: A dict of metric name => globs |
852
|
|
|
:type hllconfig: dict |
853
|
|
|
:param result: A ResultSet to record into. |
854
|
|
|
:type result: plumd.ResultSet |
855
|
|
|
""" |
856
|
|
|
name = self.name |
857
|
|
|
for mprefix, kprefixes in hllconfig.items(): |
858
|
|
|
for prefix in kprefixes: |
859
|
|
|
# get the total for this prefix |
860
|
|
|
total = self.client.pfcount_multi(self.client.scan(prefix)) |
861
|
|
|
mstr = "{0}.sizes.hll.{1}".format(name, mprefix) |
862
|
|
|
result.add(plumd.Int(mstr, total)) |
863
|
|
|
|