1
|
|
|
# -*- coding: utf-8 -*- |
2
|
|
|
"""A redis reader plugin with builtin redis client.""" |
3
|
|
|
import sys |
4
|
|
|
import time |
5
|
|
|
import socket |
6
|
|
|
from collections import deque |
7
|
|
|
from collections import defaultdict |
8
|
|
|
|
9
|
|
|
import plumd |
10
|
|
|
from plumd.util import Differential |
11
|
|
|
|
12
|
|
|
__author__ = 'Kenny Freeman' |
13
|
|
|
__email__ = '[email protected]' |
14
|
|
|
__license__ = "ISCL" |
15
|
|
|
__docformat__ = 'reStructuredText' |
16
|
|
|
|
17
|
|
|
PY3 = sys.version_info > (3,) |
18
|
|
|
|
19
|
|
|
|
20
|
|
|
class RedisError(Exception): |
21
|
|
|
"""A generic Redis error.""" |
22
|
|
|
|
23
|
|
|
pass |
24
|
|
|
|
25
|
|
|
|
26
|
|
|
class RedisClient(object): |
27
|
|
|
"""A minimal Redis client.""" |
28
|
|
|
|
29
|
|
|
def __init__(self, log, addr, sfamily, timeout): |
30
|
|
|
"""A minimal Redis client. |
31
|
|
|
|
32
|
|
|
:param log: A logger created from loggging.getLogger |
33
|
|
|
:type log: logging.RootLogger |
34
|
|
|
:param addr: Either a tuple of ('host', port) or a path to a unix socket |
35
|
|
|
:type addr: str or tuple(str, int) |
36
|
|
|
:param sfamily: The socket family eg. socket.AF_INET or AF_UNIX |
37
|
|
|
:type sfamily: int |
38
|
|
|
:param timeout: The timeout in seconds for all socket operations |
39
|
|
|
:type timeout: float or int |
40
|
|
|
""" |
41
|
|
|
self.log = log |
42
|
|
|
self.net = RedisNet(log, addr, sfamily, timeout) |
43
|
|
|
|
44
|
|
|
def info(self, section=None): |
45
|
|
|
"""Return redis info. |
46
|
|
|
|
47
|
|
|
:param section: The info section to request from Redis |
48
|
|
|
:type section: str |
49
|
|
|
:raises RedisError: for any socket related exceptions |
50
|
|
|
:raises RedisError: for unexpcted server responses |
51
|
|
|
:rtype: dict |
52
|
|
|
""" |
53
|
|
|
ret = {} |
54
|
|
|
section = "all" if section is None else section |
55
|
|
|
self.net.send("INFO {0}\r\n".format(section)) |
56
|
|
|
for info_str in RedisResponse(self.net): |
57
|
|
|
for line in info_str.split("\n"): |
58
|
|
|
if not line or line[0] == "#" or line == '\r': |
59
|
|
|
continue |
60
|
|
|
if line.find(":") >= 0: |
61
|
|
|
key, val = line.split(":") |
62
|
|
|
ret[key] = val |
63
|
|
|
return ret |
64
|
|
|
|
65
|
|
|
def config_get_multi(self, globs): |
66
|
|
|
"""Return redis config. |
67
|
|
|
|
68
|
|
|
:param globs: An containing glob search strings |
69
|
|
|
:type globs: iterable |
70
|
|
|
:raises RedisError: for any socket related exceptions |
71
|
|
|
:raises RedisError: for unexpcted server responses |
72
|
|
|
:rtype: dict |
73
|
|
|
""" |
74
|
|
|
for glob_str in globs: |
75
|
|
|
self.net.send("CONFIG GET {0}\r\n".format(glob_str)) |
76
|
|
|
vals = [val.strip() for val in RedisResponse(self.net)] |
77
|
|
|
yield dict(zip(vals[0::2], vals[1::2])) |
78
|
|
|
|
79
|
|
|
def scan(self, prefix, count=None): |
80
|
|
|
"""Return a deque of key names that match the requested prefix. |
81
|
|
|
|
82
|
|
|
:param prefix: The key prefix/glob to search for |
83
|
|
|
:type prefix: str |
84
|
|
|
:param count: The number of keys to request on each iteration |
85
|
|
|
:type count: int |
86
|
|
|
:raises RedisError: for any socket related exceptions |
87
|
|
|
:raises RedisError: for unexpcted server responses |
88
|
|
|
:rtype: deque |
89
|
|
|
""" |
90
|
|
|
scan_cmd = "scan {0} match {1} count {2}\r\n" |
91
|
|
|
count = 10 if count is None else count |
92
|
|
|
cursor = 0 |
93
|
|
|
while True: |
94
|
|
|
# send scan request |
95
|
|
|
self.net.send(scan_cmd.format(cursor, prefix, count)) |
96
|
|
|
# response is the next cursor followed by a list of matches |
97
|
|
|
resp = RedisResponse(self.net) |
98
|
|
|
cursor = int(next(resp)) |
99
|
|
|
for key in resp: |
100
|
|
|
yield key.strip() |
101
|
|
|
if cursor == 0: |
102
|
|
|
break |
103
|
|
|
|
104
|
|
|
def get_multi(self, keys): |
105
|
|
|
"""Return the total value of all matching keys. |
106
|
|
|
|
107
|
|
|
:param keys: The iterable of keys to return the total for. |
108
|
|
|
:type keys: iterable |
109
|
|
|
:raises RedisError: for any socket related exceptions |
110
|
|
|
:raises RedisError: for unexpcted server responses |
111
|
|
|
:raise ValueError: when a key doesn't cast to int |
112
|
|
|
:rtype: int |
113
|
|
|
""" |
114
|
|
|
get_cmd = "get {0}\r\n" |
115
|
|
|
total = 0 |
116
|
|
|
for key in keys: |
117
|
|
|
# send scan request |
118
|
|
|
self.net.send(get_cmd.format(key)) |
119
|
|
|
# response should just be an int |
120
|
|
|
resp = RedisResponse(self.net) |
121
|
|
|
total += int(next(resp)) |
122
|
|
|
return total |
123
|
|
|
|
124
|
|
|
def llen_multi(self, keys): |
125
|
|
|
"""Return the total length of each key provided. |
126
|
|
|
|
127
|
|
|
:param keys: The iterable of keys to return the total length of. |
128
|
|
|
:type keys: iterable |
129
|
|
|
:raises RedisError: for any socket related exceptions |
130
|
|
|
:raises RedisError: for unexpcted server responses |
131
|
|
|
:rtype: int |
132
|
|
|
""" |
133
|
|
|
llen_cmd = "llen {0}\r\n" |
134
|
|
|
total = 0 |
135
|
|
|
for key in keys: |
136
|
|
|
# send scan request |
137
|
|
|
self.net.send(llen_cmd.format(key)) |
138
|
|
|
# response should just be an int |
139
|
|
|
resp = RedisResponse(self.net) |
140
|
|
|
total += int(next(resp)) |
141
|
|
|
return total |
142
|
|
|
|
143
|
|
|
def zcard_multi(self, keys): |
144
|
|
|
"""Return the total cardinality of each key provided. |
145
|
|
|
|
146
|
|
|
:param keys: The iterable of keys to return the total cardinality of. |
147
|
|
|
:type keys: iterable |
148
|
|
|
:raises RedisError: for any socket related exceptions |
149
|
|
|
:raises RedisError: for unexpcted server responses |
150
|
|
|
:rtype: int |
151
|
|
|
""" |
152
|
|
|
zcard_cmd = "zcard {0}\r\n" |
153
|
|
|
total = 0 |
154
|
|
|
for key in keys: |
155
|
|
|
# send scan request |
156
|
|
|
self.net.send(zcard_cmd.format(key)) |
157
|
|
|
# response should just be an int |
158
|
|
|
resp = RedisResponse(self.net) |
159
|
|
|
total += int(next(resp)) |
160
|
|
|
return total |
161
|
|
|
|
162
|
|
|
def scard_multi(self, keys): |
163
|
|
|
"""Return the total cardinality of each key provided. |
164
|
|
|
|
165
|
|
|
:param keys: The iterable of keys to return the total cardinality of. |
166
|
|
|
:type keys: iterable |
167
|
|
|
:raises RedisError: for any socket related exceptions |
168
|
|
|
:raises RedisError: for unexpcted server responses |
169
|
|
|
:rtype: int |
170
|
|
|
""" |
171
|
|
|
scard_cmd = "scard {0}\r\n" |
172
|
|
|
total = 0 |
173
|
|
|
for key in keys: |
174
|
|
|
# send scan request |
175
|
|
|
self.net.send(scard_cmd.format(key)) |
176
|
|
|
# response should just be an int |
177
|
|
|
resp = RedisResponse(self.net) |
178
|
|
|
total += int(next(resp)) |
179
|
|
|
return total |
180
|
|
|
|
181
|
|
|
def pfcount_multi(self, keys): |
182
|
|
|
"""Return the total cardinality of each key provided. |
183
|
|
|
|
184
|
|
|
:param keys: The iterable of keys to return the total cardinality of. |
185
|
|
|
:type keys: iterable |
186
|
|
|
:raises RedisError: for any socket related exceptions |
187
|
|
|
:raises RedisError: for unexpcted server responses |
188
|
|
|
:rtype: int |
189
|
|
|
""" |
190
|
|
|
pfcount_cmd = "pfcount {0}\r\n" |
191
|
|
|
total = 0 |
192
|
|
|
for key in keys: |
193
|
|
|
# send scan request |
194
|
|
|
self.net.send(pfcount_cmd.format(key)) |
195
|
|
|
# response should just be an int |
196
|
|
|
resp = RedisResponse(self.net) |
197
|
|
|
total += int(next(resp)) |
198
|
|
|
return total |
199
|
|
|
|
200
|
|
|
|
201
|
|
|
class RedisResponse(object): |
202
|
|
|
"""An iterable of redis command responses.""" |
203
|
|
|
|
204
|
|
|
def __init__(self, reader): |
205
|
|
|
"""An iterable of redis command responses. |
206
|
|
|
|
207
|
|
|
:param reader: A RedisNet reader instance |
208
|
|
|
:type reader: RedisNet |
209
|
|
|
|
210
|
|
|
:raises RedisError: for any socket related errors |
211
|
|
|
:raises RedisError: for any unknown responses |
212
|
|
|
:raises RedisError: for any Redis Errors returned |
213
|
|
|
:raises RedisError: for any ValueErrors encountered when casting |
214
|
|
|
""" |
215
|
|
|
self.reader = reader |
216
|
|
|
# handlers consume responses and add them to self.vals |
217
|
|
|
self.func = defaultdict(lambda: RedisResponse.h_unknown) |
218
|
|
|
self.func["*"] = lambda buff: self.parse(int(buff)) |
219
|
|
|
self.func["+"] = lambda buff: self.vals.append(str(buff)) |
220
|
|
|
# remove the \r from the string |
221
|
|
|
self.func["$"] = lambda buff: \ |
222
|
|
|
self.vals.append(self.reader.readnbytes(int(buff) + 2)) |
223
|
|
|
self.func[":"] = lambda buff: self.vals.append(int(buff)) |
224
|
|
|
self.func["-"] = RedisResponse.h_error |
225
|
|
|
self.vals = deque() |
226
|
|
|
self.parse() |
227
|
|
|
|
228
|
|
|
def parse(self, nitems=None): |
229
|
|
|
"""Read the full response from self.sock. |
230
|
|
|
|
231
|
|
|
:raises RedisError: for any socket related Exceptions |
232
|
|
|
:raises RedisError: for any unknown types read |
233
|
|
|
:raises RedisError: for any redis protocol errors |
234
|
|
|
:rtype: varies |
235
|
|
|
""" |
236
|
|
|
nitems = 1 if nitems is None else nitems |
237
|
|
|
for i in xrange(nitems): |
|
|
|
|
238
|
|
|
try: |
239
|
|
|
buff = self.reader.readline() |
240
|
|
|
self.func[buff[0]](buff[1:]) |
241
|
|
|
except (ValueError, IndexError) as exc: |
242
|
|
|
msg = "could not parse response: {0}: {1}" |
243
|
|
|
raise RedisError(msg.format(buff, exc)) |
244
|
|
|
|
245
|
|
|
@staticmethod |
246
|
|
|
def h_unknown(buff): |
247
|
|
|
"""Uknown response handler. |
248
|
|
|
|
249
|
|
|
:param buff: A response buffer read from Redis |
250
|
|
|
:type buff: str |
251
|
|
|
:raises RedisError: this function always raises a RedisError |
252
|
|
|
""" |
253
|
|
|
raise RedisError("unknown command: {0}".format(buff)) |
254
|
|
|
|
255
|
|
|
@staticmethod |
256
|
|
|
def h_error(buff): |
257
|
|
|
"""Raise a RedisError with unknown command buff. |
258
|
|
|
|
259
|
|
|
:param buff: A response buffer read from Redis |
260
|
|
|
:type buff: str |
261
|
|
|
:raises RedisError: on any socket related exceptions |
262
|
|
|
:rtype: str |
263
|
|
|
""" |
264
|
|
|
msg = "RedisResponse: h_error({0})" |
265
|
|
|
raise RedisError(msg.format(buff)) |
266
|
|
|
|
267
|
|
|
def __iter__(self): |
268
|
|
|
"""A Redis command response iterator. |
269
|
|
|
|
270
|
|
|
:rtype: iterator |
271
|
|
|
""" |
272
|
|
|
return self |
273
|
|
|
|
274
|
|
|
def __next__(self): |
275
|
|
|
"""Return the next response, if any. |
276
|
|
|
|
277
|
|
|
:rtype: object |
278
|
|
|
""" |
279
|
|
|
if not self.vals: |
280
|
|
|
raise StopIteration() |
281
|
|
|
return self.vals.popleft() |
282
|
|
|
|
283
|
|
|
def next(self): |
284
|
|
|
"""Return the next response, if any. |
285
|
|
|
|
286
|
|
|
:rtype: object |
287
|
|
|
""" |
288
|
|
|
if not self.vals: |
289
|
|
|
raise StopIteration() |
290
|
|
|
return self.vals.popleft() |
291
|
|
|
|
292
|
|
|
|
293
|
|
|
class RedisNet(object): |
294
|
|
|
"""A helper class that talks to Redis on a unix/tcp socket.""" |
295
|
|
|
|
296
|
|
|
BUFF_LEN = 8192 |
297
|
|
|
|
298
|
|
|
def __init__(self, log, addr, sfamily, timeout): |
299
|
|
|
"""A helper class that talks to Redis on a unix/tcp socket. |
300
|
|
|
|
301
|
|
|
:param log: A logger created from loggging.getLogger |
302
|
|
|
:type log: logging.RootLogger |
303
|
|
|
:param addr: Either a tuple of ('host', port) or a path to a unix socket |
304
|
|
|
:type addr: str or tuple(str, int) |
305
|
|
|
:param sfamily: The socket family eg. socket.AF_INET or AF_UNIX |
306
|
|
|
:type sfamily: int |
307
|
|
|
:param timeout: The timeout in seconds for all socket operations |
308
|
|
|
:type timeout: float or int |
309
|
|
|
""" |
310
|
|
|
self.log = log |
311
|
|
|
# addr can be unix socket or (host, port) tuple |
312
|
|
|
self.addr = addr |
313
|
|
|
# socket.AF_INET or socket.AF_UNIX |
314
|
|
|
self.sfamily = sfamily |
315
|
|
|
# all socket operations timeout |
316
|
|
|
self.timeout = timeout |
317
|
|
|
self.sock = None |
318
|
|
|
# read from our socket into this buffer |
319
|
|
|
# keep an index in the buffer that we've read up to |
320
|
|
|
# and record the total number of bytes in the buffer |
321
|
|
|
self.buff = "" |
322
|
|
|
self.buff_end = -1 |
323
|
|
|
self.buff_i = -1 |
324
|
|
|
|
325
|
|
|
def connect(self): |
326
|
|
|
"""Connect to Redis. |
327
|
|
|
|
328
|
|
|
:raises RedisError: for any socket related exceptions |
329
|
|
|
:rtype: Exception or None |
330
|
|
|
""" |
331
|
|
|
if self.sock: |
332
|
|
|
self.disconnect() |
333
|
|
|
try: |
334
|
|
|
# create the socket |
335
|
|
|
self.sock = socket.socket(self.sfamily, socket.SOCK_STREAM) |
336
|
|
|
# set timeout for socket operations |
337
|
|
|
self.sock.settimeout(self.timeout) |
338
|
|
|
self.sock.connect(self.addr) |
339
|
|
|
msg = "RedisNet: connected: {0}:{1}" |
340
|
|
|
self.log.info(msg.format(self.addr[0], self.addr[1])) |
341
|
|
|
except Exception as exc: |
342
|
|
|
msg = "RedisNet: Exception during connect: {0}" |
343
|
|
|
self.log.error(msg.format(exc)) |
344
|
|
|
raise RedisError(msg.format(exc)) |
345
|
|
|
return True |
346
|
|
|
|
347
|
|
|
def disconnect(self): |
348
|
|
|
"""Disconnect from Redis. |
349
|
|
|
|
350
|
|
|
:raises RedisError: for any socket related exceptions |
351
|
|
|
""" |
352
|
|
|
self.log.debug("RedisNet: disconnect") |
353
|
|
|
if self.sock: |
354
|
|
|
try: |
355
|
|
|
self.sock.close() |
356
|
|
|
self.sock = None |
357
|
|
|
except Exception as exc: |
358
|
|
|
msg = "RedisNet: exception during disconnect: {0}" |
359
|
|
|
self.log.error(msg.format(exc)) |
360
|
|
|
raise RedisError(msg.format(exc)) |
361
|
|
|
|
362
|
|
|
def read(self): |
363
|
|
|
"""Read RedisNet.BUFF_LEN bytes from our socket into self.buff. |
364
|
|
|
|
365
|
|
|
Calls here overwrite self.buff and reset self.buff_i and |
366
|
|
|
self.buff_end. |
367
|
|
|
|
368
|
|
|
:raises RedisError: for any socket related exceptions |
369
|
|
|
""" |
370
|
|
|
if not self.sock and not self.connect(): |
371
|
|
|
msg = "RedisNet: unable to connect to: {0}" |
372
|
|
|
raise RedisError(msg.format(self.addr)) |
373
|
|
|
|
374
|
|
|
try: |
375
|
|
|
self.buff = self.sock.recv(RedisNet.BUFF_LEN) |
376
|
|
|
self.buff_end = len(self.buff) |
377
|
|
|
self.buff_i = 0 |
378
|
|
|
except Exception as exc: |
379
|
|
|
msg = "RedisNet: Exception during readline: {0}" |
380
|
|
|
self.log.error(msg.format(exc)) |
381
|
|
|
self.disconnect() |
382
|
|
|
raise RedisError(msg.format(exc)) |
383
|
|
|
|
384
|
|
|
def recv(self, nbytes): |
385
|
|
|
"""Read nbytes from our socket and return it. |
386
|
|
|
|
387
|
|
|
:param nbytes: The number of bytes to read |
388
|
|
|
:type nbytes: int |
389
|
|
|
:raises RedisError: for any socket related exceptions |
390
|
|
|
:rytpe: str |
391
|
|
|
""" |
392
|
|
|
if not self.sock and not self.connect(): |
393
|
|
|
msg = "RedisNet: unable to connect to: {0}" |
394
|
|
|
raise RedisError(msg.format(self.addr)) |
395
|
|
|
|
396
|
|
|
ret = "" |
397
|
|
|
try: |
398
|
|
|
ret = self.sock.recv(nbytes) |
399
|
|
|
except Exception as exc: |
400
|
|
|
msg = "RedisNet: Exception during recv: {0}" |
401
|
|
|
self.log.error(msg.format(exc)) |
402
|
|
|
self.disconnect() |
403
|
|
|
raise RedisError(msg.format(exc)) |
404
|
|
|
return ret |
405
|
|
|
|
406
|
|
|
def readline(self): |
407
|
|
|
"""Get the next available line. |
408
|
|
|
|
409
|
|
|
:raises RedisError: for any socket related exceptions |
410
|
|
|
:rytpe: str |
411
|
|
|
""" |
412
|
|
|
if not self.sock and not self.connect(): |
413
|
|
|
msg = "RedisNet: unable to connect to: {0}" |
414
|
|
|
raise RedisError(msg.format(self.addr)) |
415
|
|
|
|
416
|
|
|
buffs = deque() |
417
|
|
|
while True: |
418
|
|
|
# do we have any data available? |
419
|
|
|
if self.buff_end < 0 or self.buff_i >= self.buff_end: |
420
|
|
|
# read data, reset buffer state |
421
|
|
|
while self.buff_end < 1: |
422
|
|
|
self.read() |
423
|
|
|
# now we have data, do we have a newline? |
424
|
|
|
i = self.buff[self.buff_i:].find("\n") |
425
|
|
|
if i > -1: |
426
|
|
|
# return line, advance buffer past it |
427
|
|
|
# move i past the newline |
428
|
|
|
# also need to find |
429
|
|
|
buff_i = self.buff_i |
430
|
|
|
buffs.append(self.buff[buff_i:buff_i+i]) |
431
|
|
|
# advance beyond i |
432
|
|
|
self.buff_i = buff_i + i + 1 |
433
|
|
|
# reset if we have no buffer left |
434
|
|
|
if self.buff_i >= self.buff_end: |
435
|
|
|
self.buff_i = -1 |
436
|
|
|
self.buff_end = -1 |
437
|
|
|
break |
438
|
|
|
# no newline yet, record and keep reading |
439
|
|
|
buffs.append(self.buff[self.buff_i:]) |
440
|
|
|
self.buff_end = -1 |
441
|
|
|
self.buff_i = -1 |
442
|
|
|
ret = "".join(buffs) |
443
|
|
|
return ret |
444
|
|
|
|
445
|
|
|
def readnbytes(self, nbytes): |
446
|
|
|
"""Read nbytes from our socket. |
447
|
|
|
|
448
|
|
|
:param nbytes: The number of bytes to read |
449
|
|
|
:type nbytes: int |
450
|
|
|
:raises RedisError: for any socket related exceptions |
451
|
|
|
:rytpe: str |
452
|
|
|
""" |
453
|
|
|
|
454
|
|
|
# any bytes in our buffer? |
455
|
|
|
ret = "" |
456
|
|
|
buffs = deque() |
457
|
|
|
if self.buff_end and self.buff_i < self.buff_end: |
458
|
|
|
# do we have enough buffer to fullfill the request? |
459
|
|
|
nbytes_left = self.buff_end - self.buff_i |
460
|
|
|
if nbytes_left >= nbytes: |
461
|
|
|
# yes, advance our pointer |
462
|
|
|
buffi = self.buff_i |
463
|
|
|
buffs.append(self.buff[buffi:buffi+nbytes]) |
464
|
|
|
self.buff_i += nbytes |
465
|
|
|
nbytes = 0 |
466
|
|
|
else: |
467
|
|
|
# no, consume all of the buffer and then get remaining |
468
|
|
|
buffs.append(self.buff[self.buff_i:]) |
469
|
|
|
# reset so next access on buffer forces a read |
470
|
|
|
self.buff_i = -1 |
471
|
|
|
self.buff_end = -1 |
472
|
|
|
nbytes -= nbytes_left |
473
|
|
|
# do we need more bytes? |
474
|
|
|
if nbytes: |
475
|
|
|
# just do a recv - don't use our buffer |
476
|
|
|
buffs.append(self.recv(nbytes)) |
477
|
|
|
|
478
|
|
|
# join the buffers |
479
|
|
|
ret = "".join(buffs) |
480
|
|
|
return ret |
481
|
|
|
|
482
|
|
|
def send(self, cmd): |
483
|
|
|
"""Send the supplied string to the redis server. |
484
|
|
|
|
485
|
|
|
:param cmd: The string to send to the redis server |
486
|
|
|
:type cmd: str |
487
|
|
|
:raises RedisError: for any socket related exceptions |
488
|
|
|
""" |
489
|
|
|
if not self.sock and not self.connect(): |
490
|
|
|
msg = "RedisNet: unable to connect to: {0}" |
491
|
|
|
raise RedisError(msg.format(self.addr)) |
492
|
|
|
# send info request |
493
|
|
|
try: |
494
|
|
|
self.sock.sendall(cmd) |
495
|
|
|
except Exception as exc: |
496
|
|
|
msg = "RedisNet: exception sending to server: {0}" |
497
|
|
|
self.log.error(msg.format(exc)) |
498
|
|
|
self.disconnect() |
499
|
|
|
raise RedisError(msg.format(exc)) |
500
|
|
|
|
501
|
|
|
|
502
|
|
|
class Redis(plumd.Reader): |
503
|
|
|
"""Plugin to record redis metrics.""" |
504
|
|
|
|
505
|
|
|
# default config values |
506
|
|
|
defaults = { |
507
|
|
|
'poll.interval': 10, |
508
|
|
|
'gauges': [ |
509
|
|
|
"aof_current_rewrite_time_sec", |
510
|
|
|
"aof_enabled", |
511
|
|
|
"aof_last_rewrite_time_sec", |
512
|
|
|
"aof_rewrite_in_progress", |
513
|
|
|
"aof_rewrite_scheduled", |
514
|
|
|
"blocked_clients", |
515
|
|
|
"client_biggest_input_buf", |
516
|
|
|
"client_longest_output_list", |
517
|
|
|
"connected_clients", |
518
|
|
|
"connected_slaves", |
519
|
|
|
"evicted_keys", |
520
|
|
|
"expired_keys", |
521
|
|
|
"instantaneous_input_kbps", |
522
|
|
|
"instantaneous_ops_per_sec", |
523
|
|
|
"instantaneous_output_kbps", |
524
|
|
|
"keyspace_hits", |
525
|
|
|
"keyspace_misses", |
526
|
|
|
"latest_fork_usec", |
527
|
|
|
"loading", |
528
|
|
|
"master_repl_offset", |
529
|
|
|
"mem_fragmentation_ratio", |
530
|
|
|
"pubsub_channels", |
531
|
|
|
"pubsub_patterns", |
532
|
|
|
"rdb_bgsave_in_progress", |
533
|
|
|
"rdb_changes_since_last_save", |
534
|
|
|
"rdb_current_bgsave_time_sec", |
535
|
|
|
"rdb_last_bgsave_time_sec", |
536
|
|
|
"rdb_last_save_time", |
537
|
|
|
"rejected_connections", |
538
|
|
|
"repl_backlog_active", |
539
|
|
|
"repl_backlog_first_byte_offset", |
540
|
|
|
"repl_backlog_histlen", |
541
|
|
|
"repl_backlog_size", |
542
|
|
|
"sync_full", |
543
|
|
|
"sync_partial_err", |
544
|
|
|
"sync_partial_ok", |
545
|
|
|
"total_commands_processed", |
546
|
|
|
"total_connections_received", |
547
|
|
|
"total_net_input_bytes", |
548
|
|
|
"total_net_output_bytes", |
549
|
|
|
"uptime_in_days", |
550
|
|
|
"uptime_in_seconds", |
551
|
|
|
"used_cpu_sys", |
552
|
|
|
"used_cpu_sys_children", |
553
|
|
|
"used_cpu_user", |
554
|
|
|
"used_cpu_user_children", |
555
|
|
|
"used_memory", |
556
|
|
|
"used_memory_lua", |
557
|
|
|
"used_memory_peak", |
558
|
|
|
"used_memory_rss", |
559
|
|
|
"master_last_io_seconds_ago", |
560
|
|
|
"master_sync_in_progress", |
561
|
|
|
"slave_repl_offset", |
562
|
|
|
"slave_priority", |
563
|
|
|
"slave_read_only", |
564
|
|
|
"connected_slaves", |
565
|
|
|
"master_repl_offset", |
566
|
|
|
"repl_backlog_active", |
567
|
|
|
"repl_backlog_size", |
568
|
|
|
"repl_backlog_first_byte_offset", |
569
|
|
|
"repl_backlog_histlen" |
570
|
|
|
"connected_slaves" |
571
|
|
|
], |
572
|
|
|
'rates': [], |
573
|
|
|
'configs': [ |
574
|
|
|
'maxmemory' |
575
|
|
|
], |
576
|
|
|
'keys': { |
577
|
|
|
# 'type': { metric_prefix: [key_prefix*, ...] } |
578
|
|
|
'lists': {}, |
579
|
|
|
'zsets': {}, |
580
|
|
|
'sets': {}, |
581
|
|
|
'hlls': {} |
582
|
|
|
}, |
583
|
|
|
'addr': '127.0.0.1:6379', |
584
|
|
|
'addr_type': 'inet', |
585
|
|
|
'timeout': 10 |
586
|
|
|
} |
587
|
|
|
|
588
|
|
|
def __init__(self, log, config): |
589
|
|
|
"""Plugin to record redis metrics. |
590
|
|
|
|
591
|
|
|
:param log: A logger |
592
|
|
|
:type log: logging.RootLogger |
593
|
|
|
:param config: a plumd.config.Conf configuration helper instance. |
594
|
|
|
:type config: plumd.config.Conf |
595
|
|
|
""" |
596
|
|
|
super(Redis, self).__init__(log, config) |
597
|
|
|
self.config.defaults(Redis.defaults) |
598
|
|
|
|
599
|
|
|
# metrics to record |
600
|
|
|
self.gauges = self.config.get('gauges') |
601
|
|
|
self.rates = self.config.get('rates') |
602
|
|
|
self.configs = self.config.get('configs') |
603
|
|
|
self.keys = self.config.get('keys') |
604
|
|
|
|
605
|
|
|
# Redis connection - either unix socket or tcp |
606
|
|
|
addr = self.config.get('addr') |
607
|
|
|
addr_type = self.config.get('addr_type').lower() |
608
|
|
|
if addr_type == "unix": |
609
|
|
|
sfamily = socket.AF_UNIX |
610
|
|
|
elif addr_type == "inet": |
611
|
|
|
try: |
612
|
|
|
host, port = addr.split(":") |
613
|
|
|
except AttributeError: |
614
|
|
|
msg = "Redis: invalid address: {0}, (host:port)" |
615
|
|
|
raise plumd.ConfigError(msg.format(addr)) |
616
|
|
|
addr = (host, int(port)) |
617
|
|
|
sfamily = socket.AF_INET |
618
|
|
|
else: |
619
|
|
|
msg = "Redis: unsupported connection type: {0} (unix, inet)" |
620
|
|
|
raise plumd.ConfigError(msg.format(addr_type)) |
621
|
|
|
timeout = config.get('timeout') |
622
|
|
|
self.client = RedisClient(self.log, addr, sfamily, timeout) |
623
|
|
|
self.calc = Differential() |
624
|
|
|
|
625
|
|
|
def poll(self): |
626
|
|
|
"""Query Redis for metrics. |
627
|
|
|
|
628
|
|
|
:rtype: ResultSet |
629
|
|
|
""" |
630
|
|
|
# catch exceptions - simply skip the poll on error |
631
|
|
|
try: |
632
|
|
|
result = plumd.Result("redis") |
633
|
|
|
|
634
|
|
|
# config values |
635
|
|
|
self.record_configs(result) |
636
|
|
|
|
637
|
|
|
# key sizes |
638
|
|
|
self.record_sizes(result) |
639
|
|
|
|
640
|
|
|
# get server metrics |
641
|
|
|
stats = self.client.info() |
642
|
|
|
|
643
|
|
|
# record gauges, rates |
644
|
|
|
self.record_metrics(stats, result) |
645
|
|
|
|
646
|
|
|
# replication, if any slaves are connected |
647
|
|
|
if "slave0" in stats: |
648
|
|
|
self.record_slaves(stats, result) |
649
|
|
|
|
650
|
|
|
# db metrics, maxmem |
651
|
|
|
self.record_dbs(stats, result) |
652
|
|
|
|
653
|
|
|
# record lists, zsets, sets and hll sizes |
654
|
|
|
self.record_sizes(result) |
655
|
|
|
|
656
|
|
|
# and finally command stats - if available |
657
|
|
|
self.record_cmdstats(result) |
658
|
|
|
|
659
|
|
|
except RedisError as exc: |
660
|
|
|
msg = "Redis: exception during poll: {0}" |
661
|
|
|
self.log.error(msg.format(exc)) |
662
|
|
|
return plumd.ResultSet([result]) |
663
|
|
|
|
664
|
|
|
def record_cmdstats(self, result): |
665
|
|
|
"""Record the stats from info commandstats. |
666
|
|
|
|
667
|
|
|
:param result: A result object to add metrics to |
668
|
|
|
:type result: ResultSet |
669
|
|
|
""" |
670
|
|
|
name = self.name |
671
|
|
|
infos = self.client.info("commandstats") |
672
|
|
|
for key in sorted(infos.keys()): |
673
|
|
|
vals = infos[key].split(",") |
674
|
|
|
cstat, cname = key.split("_") |
675
|
|
|
for val in vals: |
676
|
|
|
mname, mval = val.split("=") |
677
|
|
|
metric = "{0}.{1}.{2}.{3}".format(name, cstat, cname, mname) |
678
|
|
|
result.add(plumd.Float(metric, mval)) |
679
|
|
|
|
680
|
|
|
def record_metrics(self, stats, result): |
681
|
|
|
"""Record the configured gauges and metrics. |
682
|
|
|
|
683
|
|
|
:param stats: Dictionary returned from info command |
684
|
|
|
:type stats: dict |
685
|
|
|
:param result: A result object to add metrics to |
686
|
|
|
:type result: ResultSet |
687
|
|
|
""" |
688
|
|
|
timest = time.time() |
689
|
|
|
name = self.name |
690
|
|
|
|
691
|
|
|
# record gauges |
692
|
|
|
for stat in self.gauges: |
693
|
|
|
if stat in stats: |
694
|
|
|
mname = "{0}.{1}".format(name, stat) |
695
|
|
|
result.add(plumd.Float(mname, stats[stat])) |
696
|
|
|
|
697
|
|
|
# record rates |
698
|
|
|
for stat in self.rates: |
699
|
|
|
if stat in stats: |
700
|
|
|
mname = "{0}.{1}".format(name, stat) |
701
|
|
|
mval = self.calc.per_second(mname, float(stats[stat]), timest) |
702
|
|
|
result.add(plumd.Float(mname, mval)) |
703
|
|
|
|
704
|
|
|
def record_dbs(self, stats, result): |
705
|
|
|
"""Record per database metrics into result. |
706
|
|
|
|
707
|
|
|
:param stats: Dictionary returned from info command |
708
|
|
|
:type stats: dict |
709
|
|
|
:param result: A result object to add metrics to |
710
|
|
|
:type result: ResultSet |
711
|
|
|
""" |
712
|
|
|
# db0:keys=1,expires=0,avg_ttl=0 |
713
|
|
|
name = self.name |
714
|
|
|
db_fmt = "db{0}" |
715
|
|
|
metric_fmt = "{0}.db.{1}.{2}" |
716
|
|
|
|
717
|
|
|
for i in xrange(0, len(stats.keys())): |
|
|
|
|
718
|
|
|
dbname = db_fmt.format(i) |
719
|
|
|
if dbname not in stats: |
720
|
|
|
break |
721
|
|
|
try: |
722
|
|
|
vals = stats[dbname].split(",") |
723
|
|
|
dbmetrics = dict((k, v) |
724
|
|
|
for k, v in (v.split('=') for v in vals)) |
725
|
|
|
for key, val in dbmetrics.items(): |
726
|
|
|
metric_str = metric_fmt.format(name, i, key) |
727
|
|
|
result.add(plumd.Int(metric_str, val)) |
728
|
|
|
except KeyError: |
729
|
|
|
self.log.error("Redis: invalid db entry: {0}".format(dbname)) |
730
|
|
|
|
731
|
|
|
def record_slaves(self, stats, result): |
732
|
|
|
"""Record slave metrics into result. |
733
|
|
|
|
734
|
|
|
:param stats: A dictionary returned from info command |
735
|
|
|
:type stats: dict |
736
|
|
|
:param result: A ResultSet object to add metrics to |
737
|
|
|
:type result: ResultSet |
738
|
|
|
""" |
739
|
|
|
# slave0:ip=127.0.0.1,port=6399,state=online,offset=239,lag=1 |
740
|
|
|
name = self.name |
741
|
|
|
slave_str = "slave{0}" |
742
|
|
|
moffstr = 'master_repl_offset' |
743
|
|
|
moffset = 0 |
744
|
|
|
try: |
745
|
|
|
moffset = int(stats[moffstr]) |
746
|
|
|
except(TypeError, KeyError): |
747
|
|
|
self.log.error("Redis: no {0} value".format(moffstr)) |
748
|
|
|
|
749
|
|
|
# for each slave entry |
750
|
|
|
for i in xrange(0, len(stats.keys())): |
|
|
|
|
751
|
|
|
sname = slave_str.format(i) |
752
|
|
|
if sname not in stats: |
753
|
|
|
break |
754
|
|
|
try: |
755
|
|
|
vals = stats[sname].split(",") |
756
|
|
|
smetrics = dict((k, v) |
757
|
|
|
for k, v in (v.split('=') for v in vals)) |
758
|
|
|
sip = smetrics['ip'].replace(".", "_") |
759
|
|
|
smname = "{0}_{1}".format(sip, smetrics['port']) |
760
|
|
|
|
761
|
|
|
# record offset and lag |
762
|
|
|
mname = "{0}.slave.{1}.offset".format(name, smname) |
763
|
|
|
soffset = moffset - int(smetrics['offset']) |
764
|
|
|
result.add(plumd.Int(mname, soffset)) |
765
|
|
|
mname = "{0}.slave.{1}.lag".format(name, sname) |
766
|
|
|
result.add(plumd.Int(mname, smetrics['lag'])) |
767
|
|
|
|
768
|
|
|
# if slave is online set online = 1, otherwise 0 |
769
|
|
|
sonline = 1 if smetrics['state'] == "online" else 0 |
770
|
|
|
mname = "{0}.slave.{1}.online".format(name, sname) |
771
|
|
|
result.add(plumd.Int(mname, sonline)) |
772
|
|
|
except(TypeError, KeyError, ValueError): |
773
|
|
|
self.log.error("Redis: invalid slave entry: {0}".format(sname)) |
774
|
|
|
|
775
|
|
|
def record_configs(self, result): |
776
|
|
|
"""Record the configured configuration values. |
777
|
|
|
|
778
|
|
|
:param result: A ResultSet to record max mem to. |
779
|
|
|
:type result: plumd.ResultSet |
780
|
|
|
""" |
781
|
|
|
configs = self.configs |
782
|
|
|
if not configs: |
783
|
|
|
return |
784
|
|
|
name = self.name |
785
|
|
|
for config in self.client.config_get_multi(configs): |
786
|
|
|
for key, val in config.items(): |
787
|
|
|
mstr = "{0}.configs.{1}".format(name, key) |
788
|
|
|
result.add(plumd.Float(mstr, val)) |
789
|
|
|
|
790
|
|
|
def record_sizes(self, result): |
791
|
|
|
"""Record the total sizes of the configured keys. |
792
|
|
|
|
793
|
|
|
For each type of key (list, zset, set, hyperloglog) record |
794
|
|
|
total number of items for all keys. |
795
|
|
|
|
796
|
|
|
:param result: A ResultSet to record into. |
797
|
|
|
:type result: plumd.ResultSet |
798
|
|
|
""" |
799
|
|
|
if not self.keys: |
800
|
|
|
return |
801
|
|
|
keys = self.config.get("keys") |
802
|
|
|
if "strings" in keys: |
803
|
|
|
self.record_strings(keys['strings'], result) |
804
|
|
|
if "lists" in keys: |
805
|
|
|
self.record_lists(keys['lists'], result) |
806
|
|
|
if "zsets" in keys: |
807
|
|
|
self.record_zsets(keys['zsets'], result) |
808
|
|
|
if "sets" in keys: |
809
|
|
|
self.record_sets(keys['sets'], result) |
810
|
|
|
if "hlls" in keys: |
811
|
|
|
self.record_hlls(keys['hlls'], result) |
812
|
|
|
|
813
|
|
|
def record_strings(self, lconfig, result): |
814
|
|
|
"""Record the total size of the configured string keys. |
815
|
|
|
|
816
|
|
|
eg. lconfig: {"metric_name": [ "list", "of", "keys"]} |
817
|
|
|
|
818
|
|
|
:param lconfig: A dict of metric name => list of key names |
819
|
|
|
:type lconfig: dict |
820
|
|
|
:param result: A ResultSet to record into. |
821
|
|
|
:type result: plumd.ResultSet |
822
|
|
|
""" |
823
|
|
|
name = self.name |
824
|
|
|
for mprefix, keys in lconfig.items(): |
825
|
|
|
total = 0 |
826
|
|
|
# get the total for this prefix |
827
|
|
|
try: |
828
|
|
|
total = self.client.get_multi(keys) |
829
|
|
|
except RedisError as exc: |
830
|
|
|
msg = "ERROR: redis: record_strings: are {0} strings? {1}" |
831
|
|
|
print(msg.format(mprefix, exc)) |
832
|
|
|
else: |
833
|
|
|
mstr = "{0}.sizes.strings.{1}".format(name, mprefix) |
834
|
|
|
result.add(plumd.Int(mstr, total)) |
835
|
|
|
|
836
|
|
|
def record_lists(self, lconfig, result): |
837
|
|
|
"""Record the total length of the configured lists. |
838
|
|
|
|
839
|
|
|
eg. lconfig: {"metric_name": [ "list", "of", "keys"]} |
840
|
|
|
|
841
|
|
|
:param lconfig: A dict of metric name => list of key names |
842
|
|
|
:type lconfig: dict |
843
|
|
|
:param result: A ResultSet to record into. |
844
|
|
|
:type result: plumd.ResultSet |
845
|
|
|
""" |
846
|
|
|
name = self.name |
847
|
|
|
for mprefix, keys in lconfig.items(): |
848
|
|
|
total = 0 |
849
|
|
|
# get the total for this prefix |
850
|
|
|
try: |
851
|
|
|
total = self.client.llen_multi(keys) |
852
|
|
|
except RedisError as exc: |
853
|
|
|
msg = "ERROR: redis: record_lists: are {0} lists? {1}" |
854
|
|
|
print(msg.format(mprefix, exc)) |
855
|
|
|
mstr = "{0}.sizes.lists.{1}".format(name, mprefix) |
856
|
|
|
result.add(plumd.Int(mstr, total)) |
857
|
|
|
|
858
|
|
|
def record_zsets(self, zconfig, result): |
859
|
|
|
"""Record the total length of the configured zsets. |
860
|
|
|
|
861
|
|
|
eg. zconfig: {"metric_name": [ "list", "of", "keys"]} |
862
|
|
|
|
863
|
|
|
:param zconfig: A dict of metric name => list of key names |
864
|
|
|
:type zconfig: dict |
865
|
|
|
:param result: A ResultSet to record into. |
866
|
|
|
:type result: plumd.ResultSet |
867
|
|
|
""" |
868
|
|
|
name = self.name |
869
|
|
|
for mprefix, keys in zconfig.items(): |
870
|
|
|
# get the total for this prefix |
871
|
|
|
try: |
872
|
|
|
total = self.client.zcard_multi(keys) |
873
|
|
|
except RedisError as exc: |
874
|
|
|
msg = "ERROR: redis: record_zsets: are {0} zsets? {1}" |
875
|
|
|
print(msg.format(mprefix, exc)) |
876
|
|
|
else: |
877
|
|
|
mstr = "{0}.sizes.zset.{1}".format(name, mprefix) |
878
|
|
|
result.add(plumd.Int(mstr, total)) |
879
|
|
|
|
880
|
|
|
def record_sets(self, sconfig, result): |
881
|
|
|
"""Record the total length of the configured sets. |
882
|
|
|
|
883
|
|
|
eg. sconfig: {"metric_name": [ "list", "of", "keys"]} |
884
|
|
|
|
885
|
|
|
:param sconfig: A dict of metric name => list of key names |
886
|
|
|
:type sconfig: dict |
887
|
|
|
:param result: A ResultSet to record into. |
888
|
|
|
:type result: plumd.ResultSet |
889
|
|
|
""" |
890
|
|
|
name = self.name |
891
|
|
|
for mprefix, keys in sconfig.items(): |
892
|
|
|
# get the total for this prefix |
893
|
|
|
try: |
894
|
|
|
total = self.client.scard_multi(keys) |
895
|
|
|
except RedisError as exc: |
896
|
|
|
msg = "ERROR: redis: record_sets: are {0} sets? {1}" |
897
|
|
|
print(msg.format(mprefix, exc)) |
898
|
|
|
else: |
899
|
|
|
mstr = "{0}.sizes.set.{1}".format(name, mprefix) |
900
|
|
|
result.add(plumd.Int(mstr, total)) |
901
|
|
|
|
902
|
|
|
def record_hlls(self, hllconfig, result): |
903
|
|
|
"""Record the total length of the configured hlls. |
904
|
|
|
|
905
|
|
|
eg. sconfig: {"metric_name": [ "list", "of", "keys"]} |
906
|
|
|
|
907
|
|
|
:param hllconfig: A dict of metric name => list of key names |
908
|
|
|
:type hllconfig: dict |
909
|
|
|
:param result: A ResultSet to record into. |
910
|
|
|
:type result: plumd.ResultSet |
911
|
|
|
""" |
912
|
|
|
name = self.name |
913
|
|
|
for mprefix, keys in hllconfig.items(): |
914
|
|
|
# get the total for this prefix |
915
|
|
|
try: |
916
|
|
|
total = self.client.pfcount_multi(keys) |
917
|
|
|
except RedisError as exc: |
918
|
|
|
msg = "ERROR: redis: record_hlls: are {0} hyperloglogs? {1}" |
919
|
|
|
print(msg.format(mprefix, exc)) |
920
|
|
|
else: |
921
|
|
|
mstr = "{0}.sizes.hll.{1}".format(name, mprefix) |
922
|
|
|
result.add(plumd.Int(mstr, total)) |
923
|
|
|
|