1
|
|
|
# -*- coding: utf-8 -*- |
2
|
|
|
import sys |
3
|
|
|
|
4
|
|
|
PY3 = sys.version_info > (3,) |
5
|
|
|
|
6
|
|
|
import re |
|
|
|
|
7
|
|
|
import os |
|
|
|
|
8
|
|
|
import time |
9
|
|
|
import socket |
10
|
|
|
import os.path |
|
|
|
|
11
|
|
|
from collections import deque |
12
|
|
|
from collections import defaultdict |
13
|
|
|
|
14
|
|
|
import plumd |
15
|
|
|
from plumd.util import Differential |
16
|
|
|
from plumd.util import get_file_list |
|
|
|
|
17
|
|
|
|
18
|
|
|
__author__ = 'Kenny Freeman' |
19
|
|
|
__email__ = '[email protected]' |
20
|
|
|
__license__ = "ISCL" |
21
|
|
|
__docformat__ = 'reStructuredText' |
22
|
|
|
|
23
|
|
|
|
24
|
|
|
class RedisError(Exception): |
25
|
|
|
pass |
26
|
|
|
|
27
|
|
|
|
28
|
|
|
class RedisClient(object): |
29
|
|
|
"""A minimal Redis client that supports only the commands needed |
30
|
|
|
for Redis metrics collection.""" |
31
|
|
|
|
32
|
|
|
def __init__(self, log, addr, sfamily, timeout): |
33
|
|
|
"""A minimal Redis client that supports only the commands needed |
34
|
|
|
for Redis metrics collection. |
35
|
|
|
|
36
|
|
|
:param log: A logger created from loggging.getLogger |
37
|
|
|
:type log: logging.RootLogger |
38
|
|
|
:param addr: Either a tuple of ('host', port) or a path to a unix socket |
39
|
|
|
:type addr: str or tuple(str, int) |
40
|
|
|
:param sfamily: The socket family eg. socket.AF_INET or AF_UNIX |
41
|
|
|
:type sfamily: int |
42
|
|
|
:param timeout: The timeout in seconds for all socket operations |
43
|
|
|
:type timeout: float or int |
44
|
|
|
""" |
45
|
|
|
self.log = log |
46
|
|
|
self.net = RedisNet(log, addr, sfamily, timeout) |
47
|
|
|
|
48
|
|
|
def info(self, section=None): |
49
|
|
|
"""Redis info command : http://redis.io/commands/info |
50
|
|
|
|
51
|
|
|
:param section: The info section to request from Redis |
52
|
|
|
:type section: str |
53
|
|
|
:raises RedisError: for any socket related exceptions |
54
|
|
|
:raises RedisError: for unexpcted server responses |
55
|
|
|
:rtype: dict |
56
|
|
|
""" |
57
|
|
|
ret = {} |
58
|
|
|
section = "all" if section is None else section |
59
|
|
|
self.net.send("INFO {0}\r\n".format(section)) |
60
|
|
|
for info_str in RedisResponse(self.net): |
61
|
|
|
for line in info_str.split("\n"): |
62
|
|
|
if not line or line[0] == "#" or line == '\r': |
63
|
|
|
continue |
64
|
|
|
if line.find(":") >= 0: |
65
|
|
|
key, val = line.split(":") |
66
|
|
|
ret[key] = val |
67
|
|
|
return ret |
68
|
|
|
|
69
|
|
|
def config_get_multi(self, globs): |
70
|
|
|
"""Redis config get command : http://redis.io/commands/config-get |
71
|
|
|
|
72
|
|
|
:param globs: An containing glob search strings |
73
|
|
|
:type globs: iterable |
74
|
|
|
:raises RedisError: for any socket related exceptions |
75
|
|
|
:raises RedisError: for unexpcted server responses |
76
|
|
|
:rtype: dict |
77
|
|
|
""" |
78
|
|
|
for glob_str in globs: |
79
|
|
|
self.net.send("CONFIG GET {0}\r\n".format(glob_str)) |
80
|
|
|
vals = [ val.strip() for val in RedisResponse(self.net) ] |
|
|
|
|
81
|
|
|
yield(dict(zip(vals[0::2], vals[1::2]))) |
|
|
|
|
82
|
|
|
|
83
|
|
|
def scan(self, prefix, count=None): |
84
|
|
|
"""Returns a deque of key names that match the requested prefix. |
85
|
|
|
|
86
|
|
|
:param prefix: The key prefix/glob to search for |
87
|
|
|
:type prefix: str |
88
|
|
|
:param count: The number of keys to request on each iteration |
89
|
|
|
:type count: int |
90
|
|
|
:raises RedisError: for any socket related exceptions |
91
|
|
|
:raises RedisError: for unexpcted server responses |
92
|
|
|
:rtype: deque |
93
|
|
|
""" |
94
|
|
|
scan_cmd = "scan {0} match {1} count {2}\r\n" |
95
|
|
|
count = 10 if count is None else count |
96
|
|
|
matches = deque() |
|
|
|
|
97
|
|
|
cursor = "0" |
98
|
|
|
while True: |
99
|
|
|
# send scan request |
100
|
|
|
self.net.send(scan_cmd.format(cursor, prefix, count)) |
101
|
|
|
# response is the next cursor followed by a list of matches |
102
|
|
|
resp = RedisResponse(self.net) |
103
|
|
|
cursor = int(next(resp)) |
104
|
|
|
for key in resp: |
105
|
|
|
yield(key.strip()) |
|
|
|
|
106
|
|
|
if cursor == 0: |
107
|
|
|
break |
108
|
|
|
|
109
|
|
|
def llen_multi(self, keys): |
110
|
|
|
"""Returns the total length of each key provided. |
111
|
|
|
|
112
|
|
|
:param keys: The iterable of keys to return the total length of. |
113
|
|
|
:type keys: iterable |
114
|
|
|
:raises RedisError: for any socket related exceptions |
115
|
|
|
:raises RedisError: for unexpcted server responses |
116
|
|
|
:rtype: int |
117
|
|
|
""" |
118
|
|
|
llen_cmd = "llen {0}\r\n" |
119
|
|
|
total = 0 |
120
|
|
|
for key in keys: |
121
|
|
|
# send scan request |
122
|
|
|
self.net.send(llen_cmd.format(key)) |
123
|
|
|
# response should just be an int |
124
|
|
|
resp = RedisResponse(self.net) |
125
|
|
|
total += int(next(resp)) |
126
|
|
|
return total |
127
|
|
|
|
128
|
|
|
def zcard_multi(self, keys): |
129
|
|
|
"""Returns the total cardinality of each key provided. |
130
|
|
|
|
131
|
|
|
:param keys: The iterable of keys to return the total cardinality of. |
132
|
|
|
:type keys: iterable |
133
|
|
|
:raises RedisError: for any socket related exceptions |
134
|
|
|
:raises RedisError: for unexpcted server responses |
135
|
|
|
:rtype: int |
136
|
|
|
""" |
137
|
|
|
zcard_cmd = "zcard {0}\r\n" |
138
|
|
|
total = 0 |
139
|
|
|
for key in keys: |
140
|
|
|
# send scan request |
141
|
|
|
self.net.send(zcard_cmd.format(key)) |
142
|
|
|
# response should just be an int |
143
|
|
|
resp = RedisResponse(self.net) |
144
|
|
|
total += int(next(resp)) |
145
|
|
|
return total |
146
|
|
|
|
147
|
|
|
def scard_multi(self, keys): |
148
|
|
|
"""Returns the total cardinality of each key provided. |
149
|
|
|
|
150
|
|
|
:param keys: The iterable of keys to return the total cardinality of. |
151
|
|
|
:type keys: iterable |
152
|
|
|
:raises RedisError: for any socket related exceptions |
153
|
|
|
:raises RedisError: for unexpcted server responses |
154
|
|
|
:rtype: int |
155
|
|
|
""" |
156
|
|
|
scard_cmd = "scard {0}\r\n" |
157
|
|
|
total = 0 |
158
|
|
|
for key in keys: |
159
|
|
|
# send scan request |
160
|
|
|
self.net.send(scard_cmd.format(key)) |
161
|
|
|
# response should just be an int |
162
|
|
|
resp = RedisResponse(self.net) |
163
|
|
|
total += int(next(resp)) |
164
|
|
|
return total |
165
|
|
|
|
166
|
|
|
def pfcount_multi(self, keys): |
167
|
|
|
"""Returns the total cardinality of each key provided. |
168
|
|
|
|
169
|
|
|
:param keys: The iterable of keys to return the total cardinality of. |
170
|
|
|
:type keys: iterable |
171
|
|
|
:raises RedisError: for any socket related exceptions |
172
|
|
|
:raises RedisError: for unexpcted server responses |
173
|
|
|
:rtype: int |
174
|
|
|
""" |
175
|
|
|
pfcount_cmd = "pfcount {0}\r\n" |
176
|
|
|
total = 0 |
177
|
|
|
for key in keys: |
178
|
|
|
# send scan request |
179
|
|
|
self.net.send(pfcount_cmd.format(key)) |
180
|
|
|
# response should just be an int |
181
|
|
|
resp = RedisResponse(self.net) |
182
|
|
|
total += int(next(resp)) |
183
|
|
|
return total |
184
|
|
|
|
185
|
|
|
|
186
|
|
|
class RedisResponse(object): |
187
|
|
|
"""An iterable of redis command responses.""" |
188
|
|
|
|
189
|
|
|
def __init__(self, reader): |
190
|
|
|
"""An iterable of redis command responses. |
191
|
|
|
|
192
|
|
|
:param reader: A RedisNet reader instance |
193
|
|
|
:type reader: RedisNet |
194
|
|
|
|
195
|
|
|
:raises RedisError: for any socket related errors |
196
|
|
|
:raises RedisError: for any unknown responses |
197
|
|
|
:raises RedisError: for any Redis Errors returned |
198
|
|
|
:raises RedisError: for any ValueErrors encountered when casting |
199
|
|
|
""" |
200
|
|
|
self.reader = reader |
201
|
|
|
# handlers consume responses and add them to self.vals |
202
|
|
|
self.func = defaultdict(lambda: self.h_unknown) |
203
|
|
|
#self.func["*"] = self.h_array |
204
|
|
|
self.func["*"] = lambda buff: self.parse(int(buff)) |
205
|
|
|
#self.func["+"] = self.h_simple_str |
206
|
|
|
self.func["+"] = lambda buff: self.vals.append(str(buff)) |
207
|
|
|
#self.func["$"] = self.h_bulk_str |
208
|
|
|
# remove the \r from the string |
209
|
|
|
self.func["$"] = lambda buff: \ |
210
|
|
|
self.vals.append(self.reader.readnbytes(int(buff) + 2)) |
211
|
|
|
#self.func[":"] = self.h_int |
212
|
|
|
self.func[":"] = lambda buff: self.vals.append(int(buff)) |
213
|
|
|
self.func["-"] = self.h_error |
214
|
|
|
self.vals = deque() |
215
|
|
|
self.parse() |
216
|
|
|
|
217
|
|
|
def parse(self, nitems=None): |
218
|
|
|
"""Reads the full response from self.sock. |
219
|
|
|
|
220
|
|
|
:raises RedisError: for any socket related Exceptions |
221
|
|
|
:raises RedisError: for any unknown types read |
222
|
|
|
:raises RedisError: for any redis protocol errors |
223
|
|
|
:rtype: varies |
224
|
|
|
""" |
225
|
|
|
nitems = 1 if nitems is None else nitems |
226
|
|
|
for i in xrange(nitems): |
|
|
|
|
227
|
|
|
try: |
228
|
|
|
buff = self.reader.readline() |
229
|
|
|
self.func[buff[0]](buff[1:]) |
230
|
|
|
except (ValueError, IndexError) as e: |
231
|
|
|
msg = "could not parse response: {0}: {1}" |
232
|
|
|
raise RedisError(msg.format(buff, e)) |
233
|
|
|
|
234
|
|
|
def h_unknown(self, buff): |
235
|
|
|
"""Uknown response handler. |
236
|
|
|
|
237
|
|
|
:param buff: A response buffer read from Redis |
238
|
|
|
:type buff: str |
239
|
|
|
:raises RedisError: this function always raises a RedisError |
240
|
|
|
""" |
241
|
|
|
raise RedisError("unknown command: {0}".format(buff)) |
242
|
|
|
|
243
|
|
|
def h_error(self, buff): |
244
|
|
|
"""Raises a RedisError and sets contents to buff. |
245
|
|
|
|
246
|
|
|
:param buff: A response buffer read from Redis |
247
|
|
|
:type buff: str |
248
|
|
|
:raises RedisError: on any socket related exceptions |
249
|
|
|
:rtype: str |
250
|
|
|
""" |
251
|
|
|
msg = "RedisResponse: h_error({0})" |
|
|
|
|
252
|
|
|
raise RedisError("redis error: {0}".format(buff)) |
253
|
|
|
|
254
|
|
|
def __iter__(self): |
255
|
|
|
"""A Redis command response iterator. |
256
|
|
|
|
257
|
|
|
:rtype: iterator |
258
|
|
|
""" |
259
|
|
|
return self |
260
|
|
|
|
261
|
|
|
def __next__(self): |
262
|
|
|
"""Return the next response, if any. |
263
|
|
|
|
264
|
|
|
:rtype: object |
265
|
|
|
""" |
266
|
|
|
if not self.vals: |
267
|
|
|
raise StopIteration() |
268
|
|
|
return self.vals.popleft() |
269
|
|
|
|
270
|
|
|
def next(self): |
271
|
|
|
"""Return the next response, if any. |
272
|
|
|
|
273
|
|
|
:rtype: object |
274
|
|
|
""" |
275
|
|
|
if not self.vals: |
276
|
|
|
raise StopIteration() |
277
|
|
|
return self.vals.popleft() |
278
|
|
|
|
279
|
|
|
|
280
|
|
|
class RedisNet(object): |
281
|
|
|
"""A helper class that talks to Redis on a unix/tcp socket.""" |
282
|
|
|
|
283
|
|
|
BUFF_LEN = 8192 |
284
|
|
|
|
285
|
|
|
def __init__(self, log, addr, sfamily, timeout): |
286
|
|
|
"""A helper class that talks to Redis on a unix/tcp socket. |
287
|
|
|
|
288
|
|
|
:param log: A logger created from loggging.getLogger |
289
|
|
|
:type log: logging.RootLogger |
290
|
|
|
:param addr: Either a tuple of ('host', port) or a path to a unix socket |
291
|
|
|
:type addr: str or tuple(str, int) |
292
|
|
|
:param sfamily: The socket family eg. socket.AF_INET or AF_UNIX |
293
|
|
|
:type sfamily: int |
294
|
|
|
:param timeout: The timeout in seconds for all socket operations |
295
|
|
|
:type timeout: float or int |
296
|
|
|
""" |
297
|
|
|
self.log = log |
298
|
|
|
# addr can be unix socket or (host, port) tuple |
299
|
|
|
self.addr = addr |
300
|
|
|
# socket.AF_INET or socket.AF_UNIX |
301
|
|
|
self.sfamily = sfamily |
302
|
|
|
# all socket operations timeout |
303
|
|
|
self.timeout = timeout |
304
|
|
|
self.sock = None |
305
|
|
|
# read from our socket into this buffer |
306
|
|
|
# keep an index in the buffer that we've read up to |
307
|
|
|
# and record the total number of bytes in the buffer |
308
|
|
|
self.buff = "" |
309
|
|
|
self.buff_end = -1 |
310
|
|
|
self.buff_i = -1 |
311
|
|
|
|
312
|
|
View Code Duplication |
def connect(self): |
313
|
|
|
"""Connect to Redis. |
314
|
|
|
|
315
|
|
|
:raises RedisError: for any socket related exceptions |
316
|
|
|
:rtype: Exception or None |
317
|
|
|
""" |
318
|
|
|
#self.log.debug("RedisNet: connect") |
319
|
|
|
if self.sock: |
320
|
|
|
self.disconnect() |
321
|
|
|
try: |
322
|
|
|
# create the socket |
323
|
|
|
self.sock = socket.socket(self.sfamily, socket.SOCK_STREAM) |
324
|
|
|
# set timeout for socket operations |
325
|
|
|
self.sock.settimeout(self.timeout) |
326
|
|
|
self.sock.connect(self.addr) |
327
|
|
|
msg = "RedisNet: connected: {0}" |
328
|
|
|
self.log.info(msg.format(self.addr)) |
329
|
|
|
except Exception as e: |
330
|
|
|
msg = "RedisNet: Exception during connect: {0}" |
331
|
|
|
self.log.error(msg.format(e)) |
332
|
|
|
raise RedisError(msg.format(e)) |
333
|
|
|
return True |
334
|
|
|
|
335
|
|
|
def disconnect(self): |
336
|
|
|
"""Disconnect from Redis. |
337
|
|
|
|
338
|
|
|
:raises RedisError: for any socket related exceptions |
339
|
|
|
""" |
340
|
|
|
self.log.debug("RedisNet: disconnect") |
341
|
|
|
if self.sock: |
342
|
|
|
try: |
343
|
|
|
self.sock.close() |
344
|
|
|
self.sock = None |
345
|
|
|
except Exception as e: |
346
|
|
|
msg = "RedisNet: exception during disconnect: {0}" |
347
|
|
|
self.log.error(msg.format(e)) |
348
|
|
|
raise RedisError(msg.format(e)) |
349
|
|
|
|
350
|
|
|
def read(self): |
351
|
|
|
"""Read RedisNet.BUFF_LEN bytes from our socket into self.buff. |
352
|
|
|
|
353
|
|
|
Calls here overwrite self.buff and reset self.buff_i and |
354
|
|
|
self.buff_end. |
355
|
|
|
|
356
|
|
|
:raises RedisError: for any socket related exceptions |
357
|
|
|
""" |
358
|
|
|
#self.log.debug("RedisNet: read") |
359
|
|
|
if not self.sock and not self.connect(): |
360
|
|
|
msg = "RedisNet: unable to connect to: {0}" |
361
|
|
|
raise RedisError(msg.format(self.addr)) |
362
|
|
|
|
363
|
|
|
try: |
364
|
|
|
self.buff = self.sock.recv(RedisNet.BUFF_LEN) |
365
|
|
|
self.buff_end = len(self.buff) |
366
|
|
|
self.buff_i = 0 |
367
|
|
|
except Exception as e: |
368
|
|
|
msg = "RedisNet: Exception during readline: {0}" |
369
|
|
|
self.log.error(msg.format(e)) |
370
|
|
|
self.disconnect() |
371
|
|
|
raise RedisError(msg.format(e)) |
372
|
|
|
|
373
|
|
|
def recv(self, nbytes): |
374
|
|
|
"""Read nbytes from our socket and return it. |
375
|
|
|
|
376
|
|
|
:param nbytes: The number of bytes to read |
377
|
|
|
:type nbytes: int |
378
|
|
|
:raises RedisError: for any socket related exceptions |
379
|
|
|
:rytpe: str |
380
|
|
|
""" |
381
|
|
|
#self.log.debug("RedisNet: recv({0})".format(nbytes)) |
382
|
|
|
if not self.sock and not self.connect(): |
383
|
|
|
msg = "RedisNet: unable to connect to: {0}" |
384
|
|
|
raise RedisError(msg.format(self.addr)) |
385
|
|
|
|
386
|
|
|
ret = "" |
387
|
|
|
try: |
388
|
|
|
ret = self.sock.recv(nbytes) |
389
|
|
|
except Exception as e: |
390
|
|
|
msg = "RedisNet: Exception during recv: {0}" |
391
|
|
|
self.log.error(msg.format(e)) |
392
|
|
|
self.disconnect() |
393
|
|
|
raise RedisError(msg.format(e)) |
394
|
|
|
return ret |
395
|
|
|
|
396
|
|
|
def readline(self): |
397
|
|
|
"""Get the next available line. |
398
|
|
|
|
399
|
|
|
:raises RedisError: for any socket related exceptions |
400
|
|
|
:rytpe: str |
401
|
|
|
""" |
402
|
|
|
msg = "RedisNet: readline" |
403
|
|
|
#self.log.debug(msg) |
404
|
|
|
if not self.sock and not self.connect(): |
405
|
|
|
msg = "RedisNet: unable to connect to: {0}" |
406
|
|
|
raise RedisError(msg.format(self.addr)) |
407
|
|
|
|
408
|
|
|
buffs = deque() |
409
|
|
|
while True: |
410
|
|
|
# do we have any data available? |
411
|
|
|
if self.buff_end < 0 or self.buff_i >= self.buff_end: |
412
|
|
|
# read data, reset buffer state |
413
|
|
|
while self.buff_end < 1: |
414
|
|
|
self.read() |
415
|
|
|
# now we have data, do we have a newline? |
416
|
|
|
i = self.buff[self.buff_i:].find("\n") |
417
|
|
|
if i > -1: |
418
|
|
|
# return line, advance buffer past it |
419
|
|
|
# move i past the newline |
420
|
|
|
# also need to find |
421
|
|
|
buff_i = self.buff_i |
422
|
|
|
buffs.append(self.buff[buff_i:buff_i+i]) |
423
|
|
|
# advance beyond i |
424
|
|
|
self.buff_i = buff_i + i + 1 |
425
|
|
|
# reset if we have no buffer left |
426
|
|
|
if self.buff_i >= self.buff_end: |
427
|
|
|
self.buff_i = -1 |
428
|
|
|
self.buff_end = -1 |
429
|
|
|
break |
430
|
|
|
# no newline yet, record and keep reading |
431
|
|
|
buffs.append(self.buff[self.buff_i:]) |
432
|
|
|
self.buff_end = -1 |
433
|
|
|
self.buff_i = -1 |
434
|
|
|
ret = "".join(buffs) |
435
|
|
|
return ret |
436
|
|
|
|
437
|
|
|
def readnbytes(self, nbytes): |
438
|
|
|
"""Read nbytes from our socket. |
439
|
|
|
|
440
|
|
|
:param nbytes: The number of bytes to read |
441
|
|
|
:type nbytes: int |
442
|
|
|
:raises RedisError: for any socket related exceptions |
443
|
|
|
:rytpe: str |
444
|
|
|
""" |
445
|
|
|
#self.log.debug("RedisNet: readnbytes({0})".format(nbytes)) |
446
|
|
|
|
447
|
|
|
# any bytes in our buffer? |
448
|
|
|
ret = "" |
449
|
|
|
buffs = deque() |
450
|
|
|
if self.buff_end and self.buff_i < self.buff_end: |
451
|
|
|
# do we have enough buffer to fullfill the request? |
452
|
|
|
nbytes_left = self.buff_end - self.buff_i |
453
|
|
|
if nbytes_left >= nbytes: |
454
|
|
|
# yes, advance our pointer |
455
|
|
|
buffi = self.buff_i |
456
|
|
|
buffs.append(self.buff[buffi:buffi+nbytes]) |
457
|
|
|
self.buff_i += nbytes |
458
|
|
|
nbytes = 0 |
459
|
|
|
else: |
460
|
|
|
# no, consume all of the buffer and then get remaining |
461
|
|
|
buffs.append(self.buff[self.buff_i:]) |
462
|
|
|
# reset so next access on buffer forces a read |
463
|
|
|
self.buff_i = -1 |
464
|
|
|
self.buff_end = -1 |
465
|
|
|
nbytes -= nbytes_left |
466
|
|
|
# do we need more bytes? |
467
|
|
|
if nbytes: |
468
|
|
|
# just do a recv - don't use our buffer |
469
|
|
|
buffs.append(self.recv(nbytes)) |
470
|
|
|
|
471
|
|
|
# join the buffers |
472
|
|
|
ret = "".join(buffs) |
473
|
|
|
return ret |
474
|
|
|
|
475
|
|
|
def send(self, cmd): |
476
|
|
|
"""Send the supplied string to the redis server. |
477
|
|
|
|
478
|
|
|
:param cmd: The string to send to the redis server |
479
|
|
|
:type cmd: str |
480
|
|
|
:raises RedisError: for any socket related exceptions |
481
|
|
|
""" |
482
|
|
|
if not self.sock and not self.connect(): |
483
|
|
|
msg = "RedisNet: unable to connect to: {0}" |
484
|
|
|
raise RedisError(msg.format(self.addr)) |
485
|
|
|
# send info request |
486
|
|
|
try: |
487
|
|
|
self.sock.sendall(cmd) |
488
|
|
|
#self.log.debug("RedisNet: send: {0}".format(cmd.strip())) |
489
|
|
|
except Exception as e: |
490
|
|
|
msg = "RedisNet: exception sending to server: {0}" |
491
|
|
|
self.log.error(msg.format(e)) |
492
|
|
|
self.disconnect() |
493
|
|
|
raise RedisError(msg.format(e)) |
494
|
|
|
|
495
|
|
|
|
496
|
|
|
class Redis(plumd.Reader): |
497
|
|
|
"""Plugin to record redis metrics.""" |
498
|
|
|
|
499
|
|
|
# default config values |
500
|
|
|
defaults = { |
501
|
|
|
'poll.interval': 10, |
502
|
|
|
'gauges': [ |
503
|
|
|
"aof_current_rewrite_time_sec", |
504
|
|
|
"aof_enabled", |
505
|
|
|
"aof_last_rewrite_time_sec", |
506
|
|
|
"aof_rewrite_in_progress", |
507
|
|
|
"aof_rewrite_scheduled", |
508
|
|
|
"blocked_clients", |
509
|
|
|
"client_biggest_input_buf", |
510
|
|
|
"client_longest_output_list", |
511
|
|
|
"connected_clients", |
512
|
|
|
"connected_slaves", |
513
|
|
|
"evicted_keys", |
514
|
|
|
"expired_keys", |
515
|
|
|
"instantaneous_input_kbps", |
516
|
|
|
"instantaneous_ops_per_sec", |
517
|
|
|
"instantaneous_output_kbps", |
518
|
|
|
"keyspace_hits", |
519
|
|
|
"keyspace_misses", |
520
|
|
|
"latest_fork_usec", |
521
|
|
|
"loading", |
522
|
|
|
"master_repl_offset", |
523
|
|
|
"mem_fragmentation_ratio", |
524
|
|
|
"pubsub_channels", |
525
|
|
|
"pubsub_patterns", |
526
|
|
|
"rdb_bgsave_in_progress", |
527
|
|
|
"rdb_changes_since_last_save", |
528
|
|
|
"rdb_current_bgsave_time_sec", |
529
|
|
|
"rdb_last_bgsave_time_sec", |
530
|
|
|
"rdb_last_save_time", |
531
|
|
|
"rejected_connections", |
532
|
|
|
"repl_backlog_active", |
533
|
|
|
"repl_backlog_first_byte_offset", |
534
|
|
|
"repl_backlog_histlen", |
535
|
|
|
"repl_backlog_size", |
536
|
|
|
"sync_full", |
537
|
|
|
"sync_partial_err", |
538
|
|
|
"sync_partial_ok", |
539
|
|
|
"total_commands_processed", |
540
|
|
|
"total_connections_received", |
541
|
|
|
"total_net_input_bytes", |
542
|
|
|
"total_net_output_bytes", |
543
|
|
|
"uptime_in_days", |
544
|
|
|
"uptime_in_seconds", |
545
|
|
|
"used_cpu_sys", |
546
|
|
|
"used_cpu_sys_children", |
547
|
|
|
"used_cpu_user", |
548
|
|
|
"used_cpu_user_children", |
549
|
|
|
"used_memory", |
550
|
|
|
"used_memory_lua", |
551
|
|
|
"used_memory_peak", |
552
|
|
|
"used_memory_rss", |
553
|
|
|
"master_last_io_seconds_ago", |
554
|
|
|
"master_sync_in_progress", |
555
|
|
|
"slave_repl_offset", |
556
|
|
|
"slave_priority", |
557
|
|
|
"slave_read_only", |
558
|
|
|
"connected_slaves", |
559
|
|
|
"master_repl_offset", |
560
|
|
|
"repl_backlog_active", |
561
|
|
|
"repl_backlog_size", |
562
|
|
|
"repl_backlog_first_byte_offset", |
563
|
|
|
"repl_backlog_histlen" |
564
|
|
|
"connected_slaves" |
565
|
|
|
], |
566
|
|
|
'rates': [], |
567
|
|
|
'configs': [ |
568
|
|
|
'maxmemory' |
569
|
|
|
], |
570
|
|
|
'keys': { |
571
|
|
|
# 'type': { metric_prefix: [key_prefix*, ...] } |
572
|
|
|
'lists': {}, |
573
|
|
|
'zsets': {}, |
574
|
|
|
'sets': {}, |
575
|
|
|
'hlls': {} |
576
|
|
|
}, |
577
|
|
|
'addr': '127.0.0.1:6379', |
578
|
|
|
'addr_type': 'inet', |
579
|
|
|
'timeout': 10 |
580
|
|
|
} |
581
|
|
|
|
582
|
|
|
def __init__(self, log, config): |
583
|
|
|
"""Plugin to record redis metrics. |
584
|
|
|
|
585
|
|
|
:param log: A logger |
586
|
|
|
:type log: logging.RootLogger |
587
|
|
|
:param config: a plumd.config.Conf configuration helper instance. |
588
|
|
|
:type config: plumd.config.Conf |
589
|
|
|
""" |
590
|
|
|
super(Redis, self).__init__(log, config) |
591
|
|
|
self.config.defaults(Redis.defaults) |
592
|
|
|
|
593
|
|
|
# metrics to record |
594
|
|
|
self.gauges = self.config.get('gauges') |
595
|
|
|
self.rates = self.config.get('rates') |
596
|
|
|
self.configs = self.config.get('configs') |
597
|
|
|
self.keys = self.config.get('keys') |
598
|
|
|
|
599
|
|
|
# Redis connection - either unix socket or tcp |
600
|
|
|
addr = self.config.get('addr') |
601
|
|
|
addr_type = self.config.get('addr_type').lower() |
602
|
|
|
if addr_type == "unix": |
603
|
|
|
sfamily = socket.AF_UNIX |
604
|
|
|
elif addr_type == "inet": |
605
|
|
|
try: |
606
|
|
|
host, port = addr.split(":") |
607
|
|
|
except AttributeError as e: |
|
|
|
|
608
|
|
|
msg = "Redis: invalid address: {0}, (host:port)" |
609
|
|
|
raise plumd.ConfigError(msg.format(addr)) |
610
|
|
|
addr = (host, int(port)) |
611
|
|
|
sfamily = socket.AF_INET |
612
|
|
|
else: |
613
|
|
|
msg = "Redis: unsupported connection type: {0} (unix, inet)" |
614
|
|
|
raise plumd.ConfigError(msg.format(addr_type)) |
615
|
|
|
timeout = config.get('timeout') |
616
|
|
|
self.client = RedisClient(self.log, addr, sfamily, timeout) |
617
|
|
|
self.calc = Differential() |
618
|
|
|
|
619
|
|
|
def poll(self): |
620
|
|
|
"""Query Redis for metrics. |
621
|
|
|
|
622
|
|
|
:rtype: ResultSet |
623
|
|
|
""" |
624
|
|
|
# catch exceptions - simply skip the poll on error |
625
|
|
|
try: |
626
|
|
|
result = plumd.Result("redis") |
627
|
|
|
|
628
|
|
|
# config values |
629
|
|
|
self.record_configs(result) |
630
|
|
|
|
631
|
|
|
# key sizes |
632
|
|
|
self.record_sizes(result) |
633
|
|
|
|
634
|
|
|
# get server metrics |
635
|
|
|
stats = self.client.info() |
636
|
|
|
|
637
|
|
|
# record gauges, rates |
638
|
|
|
self.record_metrics(stats, result) |
639
|
|
|
|
640
|
|
|
# replication, if any slaves are connected |
641
|
|
|
if "slave0" in stats: |
642
|
|
|
self.record_slaves(stats, result) |
643
|
|
|
|
644
|
|
|
# db metrics, maxmem |
645
|
|
|
self.record_dbs(stats, result) |
646
|
|
|
|
647
|
|
|
# record lists, zsets, sets and hll sizes |
648
|
|
|
self.record_sizes(result) |
649
|
|
|
|
650
|
|
|
# and finally command stats - if available |
651
|
|
|
self.record_cmdstats(result) |
652
|
|
|
|
653
|
|
|
except RedisError as e: |
654
|
|
|
msg = "Redis: exception during poll: {0}" |
655
|
|
|
self.log.error(msg.format(e)) |
656
|
|
|
return plumd.ResultSet([result]) |
657
|
|
|
|
658
|
|
|
def record_cmdstats(self, result): |
659
|
|
|
"""Record the stats from info commandstats. |
660
|
|
|
|
661
|
|
|
:param result: A result object to add metrics to |
662
|
|
|
:type result: ResultSet |
663
|
|
|
""" |
664
|
|
|
ts = time.time() |
|
|
|
|
665
|
|
|
name = self.name |
666
|
|
|
infos = self.client.info("commandstats") |
667
|
|
|
for key in sorted(infos.keys()): |
668
|
|
|
vals = infos[key].split(",") |
669
|
|
|
cstat, cname = key.split("_") |
670
|
|
|
for val in vals: |
671
|
|
|
mname, mval = val.split("=") |
672
|
|
|
metric = "{0}.{1}.{2}.{3}".format(name, cstat, cname, mname) |
673
|
|
|
result.add(plumd.Float(metric, mval)) |
674
|
|
|
|
675
|
|
|
def record_metrics(self, stats, result): |
676
|
|
|
"""Record the configured gauges and metrics |
677
|
|
|
|
678
|
|
|
:param stats: Dictionary returned from info command |
679
|
|
|
:type stats: dict |
680
|
|
|
:param result: A result object to add metrics to |
681
|
|
|
:type result: ResultSet |
682
|
|
|
""" |
683
|
|
|
ts = time.time() |
684
|
|
|
name = self.name |
685
|
|
|
|
686
|
|
|
# record gauges |
687
|
|
|
for stat in self.gauges: |
688
|
|
|
if stat in stats: |
689
|
|
|
mname = "{0}.{1}".format(name, stat) |
690
|
|
|
result.add(plumd.Float(mname, stats[stat])) |
691
|
|
|
|
692
|
|
|
# record rates |
693
|
|
|
for stat in self.rates: |
694
|
|
|
if stat in stats: |
695
|
|
|
mname = "{0}.{1}".format(name, stat) |
696
|
|
|
mval = self.calc.per_second(mname, float(stats[stat]), ts) |
697
|
|
|
result.add(plumd.Float(mname, mval)) |
698
|
|
|
|
699
|
|
|
def record_dbs(self, stats, result): |
700
|
|
|
"""Record per database metrics into result. |
701
|
|
|
|
702
|
|
|
:param stats: Dictionary returned from info command |
703
|
|
|
:type stats: dict |
704
|
|
|
:param result: A result object to add metrics to |
705
|
|
|
:type result: ResultSet |
706
|
|
|
""" |
707
|
|
|
# db0:keys=1,expires=0,avg_ttl=0 |
708
|
|
|
name = self.name |
709
|
|
|
db_fmt = "db{0}" |
710
|
|
|
metric_fmt = "{0}.db.{1}.{2}" |
711
|
|
|
|
712
|
|
|
for i in xrange(0, len(stats.keys())): |
|
|
|
|
713
|
|
|
dbname = db_fmt.format(i) |
714
|
|
|
if dbname not in stats: |
715
|
|
|
break |
716
|
|
|
try: |
717
|
|
|
vals = stats[dbname].split(",") |
718
|
|
|
dbmetrics = dict((k, v) |
719
|
|
|
for k, v in (v.split('=') for v in vals)) |
720
|
|
|
for k, v in dbmetrics.items(): |
721
|
|
|
metric_str = metric_fmt.format(name, i, k) |
722
|
|
|
result.add(plumd.Int(metric_str, v)) |
723
|
|
|
except KeyError as E: |
|
|
|
|
724
|
|
|
self.log.error("Redis: invalid db entry: {0}".format(dbname)) |
725
|
|
|
|
726
|
|
|
def record_slaves(self, stats, result): |
727
|
|
|
"""Record slave metrics into result. |
728
|
|
|
|
729
|
|
|
:param stats: A dictionary returned from info command |
730
|
|
|
:type stats: dict |
731
|
|
|
:param result: A ResultSet object to add metrics to |
732
|
|
|
:type result: ResultSet |
733
|
|
|
""" |
734
|
|
|
# slave0:ip=127.0.0.1,port=6399,state=online,offset=239,lag=1 |
735
|
|
|
name = self.name |
736
|
|
|
slave_str = "slave{0}" |
737
|
|
|
moffstr = 'master_repl_offset' |
738
|
|
|
moffset = 0 |
739
|
|
|
try: |
740
|
|
|
moffset = int(stats[moffstr]) |
741
|
|
|
except(TypeError, KeyError) as e: |
|
|
|
|
742
|
|
|
self.log.error("Redis: no {0} value".format(moffstr)) |
743
|
|
|
|
744
|
|
|
# for each slave entry |
745
|
|
|
for i in xrange(0, len(stats.keys())): |
|
|
|
|
746
|
|
|
sname = slave_str.format(i) |
747
|
|
|
if sname not in stats: |
748
|
|
|
break |
749
|
|
|
try: |
750
|
|
|
vals = stats[sname].split(",") |
751
|
|
|
smetrics = dict((k, v) |
752
|
|
|
for k, v in (v.split('=') for v in vals)) |
753
|
|
|
sip = smetrics['ip'].replace(".", "_") |
754
|
|
|
smname = "{0}_{1}".format(sip, smetrics['port']) |
755
|
|
|
|
756
|
|
|
# record offset and lag |
757
|
|
|
mname = "{0}.slave.{1}.offset".format(name, smname) |
758
|
|
|
soffset = moffset - int(smetrics['offset']) |
759
|
|
|
result.add(plumd.Int(mname, soffset)) |
760
|
|
|
mname = "{0}.slave.{1}.lag".format(name, sname) |
761
|
|
|
result.add(plumd.Int(mname, smetrics['lag'])) |
762
|
|
|
|
763
|
|
|
# if slave is online set online = 1, otherwise 0 |
764
|
|
|
sonline = 1 if smetrics['state'] == "online" else 0 |
765
|
|
|
mname = "{0}.slave.{1}.online".format(name, sname) |
766
|
|
|
result.add(plumd.Int(mname, sonline)) |
767
|
|
|
except(TypeError, KeyError, ValueError) as e: |
768
|
|
|
self.log.error("Redis: invalid slave entry: {0}".format(sname)) |
769
|
|
|
|
770
|
|
|
def record_configs(self, result): |
771
|
|
|
"""Record the configured configuration values. |
772
|
|
|
|
773
|
|
|
:param result: A ResultSet to record max mem to. |
774
|
|
|
:type result: plumd.ResultSet |
775
|
|
|
""" |
776
|
|
|
configs = self.configs |
777
|
|
|
if not configs: |
778
|
|
|
return |
779
|
|
|
name = self.name |
780
|
|
|
for config in self.client.config_get_multi(configs): |
781
|
|
|
for key, val in config.items(): |
782
|
|
|
mstr = "{0}.configs.{1}".format(name, key) |
783
|
|
|
result.add(plumd.Float(mstr, val)) |
784
|
|
|
|
785
|
|
|
def record_sizes(self, result): |
786
|
|
|
"""For each type of key (list, zset, set, hyperloglog) |
787
|
|
|
scan for a list of keys matching the prefix and record the |
788
|
|
|
total number of items for the matching prefix. |
789
|
|
|
|
790
|
|
|
:param result: A ResultSet to record into. |
791
|
|
|
:type result: plumd.ResultSet |
792
|
|
|
""" |
793
|
|
|
if not self.keys: |
794
|
|
|
return |
795
|
|
|
keys = self.config.get("keys") |
796
|
|
|
if "lists" in keys: |
797
|
|
|
self.record_lists(keys['lists'], result) |
798
|
|
|
if "zsets" in keys: |
799
|
|
|
self.record_zsets(keys['zsets'], result) |
800
|
|
|
if "sets" in keys: |
801
|
|
|
self.record_sets(keys['sets'], result) |
802
|
|
|
if "hlls" in keys: |
803
|
|
|
self.record_hlls(keys['hlls'], result) |
804
|
|
|
|
805
|
|
|
def record_lists(self, lconfig, result): |
806
|
|
|
"""Record the total length of the configured lists: |
807
|
|
|
|
808
|
|
|
eg. lconfig: {"metric_name": [ "list*", "of*", "globs*"]} |
809
|
|
|
|
810
|
|
|
:param lconfig: A dict of metric name => globs |
811
|
|
|
:type lconfig: dict |
812
|
|
|
:param result: A ResultSet to record into. |
813
|
|
|
:type result: plumd.ResultSet |
814
|
|
|
""" |
815
|
|
|
name = self.name |
816
|
|
|
for mprefix, kprefixes in lconfig.items(): |
817
|
|
|
for prefix in kprefixes: |
818
|
|
|
# get the total for this prefix |
819
|
|
|
total = self.client.llen_multi(self.client.scan(prefix)) |
820
|
|
|
mstr = "{0}.sizes.lists.{1}".format(name, mprefix) |
821
|
|
|
result.add(plumd.Int(mstr, total)) |
822
|
|
|
|
823
|
|
|
def record_zsets(self, zconfig, result): |
824
|
|
|
"""Record the total length of the configured zsets: |
825
|
|
|
|
826
|
|
|
eg. zconfig: {"metric_name": [ "list*", "of*", "globs*"]} |
827
|
|
|
|
828
|
|
|
:param zconfig: A dict of metric name => globs |
829
|
|
|
:type zconfig: dict |
830
|
|
|
:param result: A ResultSet to record into. |
831
|
|
|
:type result: plumd.ResultSet |
832
|
|
|
""" |
833
|
|
|
name = self.name |
834
|
|
|
for mprefix, kprefixes in zconfig.items(): |
835
|
|
|
for prefix in kprefixes: |
836
|
|
|
# get the total for this prefix |
837
|
|
|
total = self.client.zcard_multi(self.client.scan(prefix)) |
838
|
|
|
mstr = "{0}.sizes.zset.{1}".format(name, mprefix) |
839
|
|
|
result.add(plumd.Int(mstr, total)) |
840
|
|
|
|
841
|
|
|
def record_sets(self, sconfig, result): |
842
|
|
|
"""Record the total length of the configured zsets: |
843
|
|
|
|
844
|
|
|
eg. sconfig: {"metric_name": [ "list*", "of*", "globs*"]} |
845
|
|
|
|
846
|
|
|
:param sconfig: A dict of metric name => globs |
847
|
|
|
:type sconfig: dict |
848
|
|
|
:param result: A ResultSet to record into. |
849
|
|
|
:type result: plumd.ResultSet |
850
|
|
|
""" |
851
|
|
|
name = self.name |
852
|
|
|
for mprefix, kprefixes in sconfig.items(): |
853
|
|
|
for prefix in kprefixes: |
854
|
|
|
# get the total for this prefix |
855
|
|
|
total = self.client.scard_multi(self.client.scan(prefix)) |
856
|
|
|
mstr = "{0}.sizes.set.{1}".format(name, mprefix) |
857
|
|
|
result.add(plumd.Int(mstr, total)) |
858
|
|
|
|
859
|
|
|
def record_hlls(self, hllconfig, result): |
860
|
|
|
"""Record the total length of the configured hlls: |
861
|
|
|
|
862
|
|
|
eg. sconfig: {"metric_name": [ "list*", "of*", "globs*"]} |
863
|
|
|
|
864
|
|
|
:param hllconfig: A dict of metric name => globs |
865
|
|
|
:type hllconfig: dict |
866
|
|
|
:param result: A ResultSet to record into. |
867
|
|
|
:type result: plumd.ResultSet |
868
|
|
|
""" |
869
|
|
|
name = self.name |
870
|
|
|
for mprefix, kprefixes in hllconfig.items(): |
871
|
|
|
for prefix in kprefixes: |
872
|
|
|
# get the total for this prefix |
873
|
|
|
total = self.client.pfcount_multi(self.client.scan(prefix)) |
874
|
|
|
mstr = "{0}.sizes.hll.{1}".format(name, mprefix) |
875
|
|
|
result.add(plumd.Int(mstr, total)) |
876
|
|
|
|