|
1
|
|
|
# -*- coding: utf-8 -*- |
|
2
|
|
|
import sys |
|
3
|
|
|
|
|
4
|
|
|
PY3 = sys.version_info > (3,) |
|
5
|
|
|
|
|
6
|
|
|
import re |
|
|
|
|
|
|
7
|
|
|
import os |
|
|
|
|
|
|
8
|
|
|
import time |
|
9
|
|
|
import socket |
|
10
|
|
|
import os.path |
|
|
|
|
|
|
11
|
|
|
from collections import deque |
|
12
|
|
|
from collections import defaultdict |
|
13
|
|
|
|
|
14
|
|
|
import plumd |
|
15
|
|
|
from plumd.util import Differential |
|
16
|
|
|
from plumd.util import get_file_list |
|
|
|
|
|
|
17
|
|
|
|
|
18
|
|
|
__author__ = 'Kenny Freeman' |
|
19
|
|
|
__email__ = '[email protected]' |
|
20
|
|
|
__license__ = "ISCL" |
|
21
|
|
|
__docformat__ = 'reStructuredText' |
|
22
|
|
|
|
|
23
|
|
|
|
|
24
|
|
|
class RedisError(Exception): |
|
25
|
|
|
pass |
|
26
|
|
|
|
|
27
|
|
|
|
|
28
|
|
|
class RedisClient(object): |
|
29
|
|
|
"""A minimal Redis client that supports only the commands needed |
|
30
|
|
|
for Redis metrics collection.""" |
|
31
|
|
|
|
|
32
|
|
|
def __init__(self, log, addr, sfamily, timeout): |
|
33
|
|
|
"""A minimal Redis client that supports only the commands needed |
|
34
|
|
|
for Redis metrics collection. |
|
35
|
|
|
|
|
36
|
|
|
:param log: A logger created from loggging.getLogger |
|
37
|
|
|
:type log: logging.RootLogger |
|
38
|
|
|
:param addr: Either a tuple of ('host', port) or a path to a unix socket |
|
39
|
|
|
:type addr: str or tuple(str, int) |
|
40
|
|
|
:param sfamily: The socket family eg. socket.AF_INET or AF_UNIX |
|
41
|
|
|
:type sfamily: int |
|
42
|
|
|
:param timeout: The timeout in seconds for all socket operations |
|
43
|
|
|
:type timeout: float or int |
|
44
|
|
|
""" |
|
45
|
|
|
self.log = log |
|
46
|
|
|
self.net = RedisNet(log, addr, sfamily, timeout) |
|
47
|
|
|
|
|
48
|
|
|
def info(self, section=None): |
|
49
|
|
|
"""Redis info command : http://redis.io/commands/info |
|
50
|
|
|
|
|
51
|
|
|
:param section: The info section to request from Redis |
|
52
|
|
|
:type section: str |
|
53
|
|
|
:raises RedisError: for any socket related exceptions |
|
54
|
|
|
:raises RedisError: for unexpcted server responses |
|
55
|
|
|
:rtype: dict |
|
56
|
|
|
""" |
|
57
|
|
|
ret = {} |
|
58
|
|
|
section = "all" if section is None else section |
|
59
|
|
|
self.net.send("INFO {0}\r\n".format(section)) |
|
60
|
|
|
for info_str in RedisResponse(self.net): |
|
61
|
|
|
for line in info_str.split("\n"): |
|
62
|
|
|
if not line or line[0] == "#" or line == '\r': |
|
63
|
|
|
continue |
|
64
|
|
|
if line.find(":") >= 0: |
|
65
|
|
|
key, val = line.split(":") |
|
66
|
|
|
ret[key] = val |
|
67
|
|
|
return ret |
|
68
|
|
|
|
|
69
|
|
|
def config_get_multi(self, globs): |
|
70
|
|
|
"""Redis config get command : http://redis.io/commands/config-get |
|
71
|
|
|
|
|
72
|
|
|
:param globs: An containing glob search strings |
|
73
|
|
|
:type globs: iterable |
|
74
|
|
|
:raises RedisError: for any socket related exceptions |
|
75
|
|
|
:raises RedisError: for unexpcted server responses |
|
76
|
|
|
:rtype: dict |
|
77
|
|
|
""" |
|
78
|
|
|
for glob_str in globs: |
|
79
|
|
|
self.net.send("CONFIG GET {0}\r\n".format(glob_str)) |
|
80
|
|
|
vals = [ val.strip() for val in RedisResponse(self.net) ] |
|
|
|
|
|
|
81
|
|
|
yield(dict(zip(vals[0::2], vals[1::2]))) |
|
|
|
|
|
|
82
|
|
|
|
|
83
|
|
|
def scan(self, prefix, count=None): |
|
84
|
|
|
"""Returns a deque of key names that match the requested prefix. |
|
85
|
|
|
|
|
86
|
|
|
:param prefix: The key prefix/glob to search for |
|
87
|
|
|
:type prefix: str |
|
88
|
|
|
:param count: The number of keys to request on each iteration |
|
89
|
|
|
:type count: int |
|
90
|
|
|
:raises RedisError: for any socket related exceptions |
|
91
|
|
|
:raises RedisError: for unexpcted server responses |
|
92
|
|
|
:rtype: deque |
|
93
|
|
|
""" |
|
94
|
|
|
scan_cmd = "scan {0} match {1} count {2}\r\n" |
|
95
|
|
|
count = 10 if count is None else count |
|
96
|
|
|
matches = deque() |
|
|
|
|
|
|
97
|
|
|
cursor = "0" |
|
98
|
|
|
while True: |
|
99
|
|
|
# send scan request |
|
100
|
|
|
self.net.send(scan_cmd.format(cursor, prefix, count)) |
|
101
|
|
|
# response is the next cursor followed by a list of matches |
|
102
|
|
|
resp = RedisResponse(self.net) |
|
103
|
|
|
cursor = int(next(resp)) |
|
104
|
|
|
for key in resp: |
|
105
|
|
|
yield(key.strip()) |
|
|
|
|
|
|
106
|
|
|
if cursor == 0: |
|
107
|
|
|
break |
|
108
|
|
|
|
|
109
|
|
|
def llen_multi(self, keys): |
|
110
|
|
|
"""Returns the total length of each key provided. |
|
111
|
|
|
|
|
112
|
|
|
:param keys: The iterable of keys to return the total length of. |
|
113
|
|
|
:type keys: iterable |
|
114
|
|
|
:raises RedisError: for any socket related exceptions |
|
115
|
|
|
:raises RedisError: for unexpcted server responses |
|
116
|
|
|
:rtype: int |
|
117
|
|
|
""" |
|
118
|
|
|
llen_cmd = "llen {0}\r\n" |
|
119
|
|
|
total = 0 |
|
120
|
|
|
for key in keys: |
|
121
|
|
|
# send scan request |
|
122
|
|
|
self.net.send(llen_cmd.format(key)) |
|
123
|
|
|
# response should just be an int |
|
124
|
|
|
resp = RedisResponse(self.net) |
|
125
|
|
|
total += int(next(resp)) |
|
126
|
|
|
return total |
|
127
|
|
|
|
|
128
|
|
|
def zcard_multi(self, keys): |
|
129
|
|
|
"""Returns the total cardinality of each key provided. |
|
130
|
|
|
|
|
131
|
|
|
:param keys: The iterable of keys to return the total cardinality of. |
|
132
|
|
|
:type keys: iterable |
|
133
|
|
|
:raises RedisError: for any socket related exceptions |
|
134
|
|
|
:raises RedisError: for unexpcted server responses |
|
135
|
|
|
:rtype: int |
|
136
|
|
|
""" |
|
137
|
|
|
zcard_cmd = "zcard {0}\r\n" |
|
138
|
|
|
total = 0 |
|
139
|
|
|
for key in keys: |
|
140
|
|
|
# send scan request |
|
141
|
|
|
self.net.send(zcard_cmd.format(key)) |
|
142
|
|
|
# response should just be an int |
|
143
|
|
|
resp = RedisResponse(self.net) |
|
144
|
|
|
total += int(next(resp)) |
|
145
|
|
|
return total |
|
146
|
|
|
|
|
147
|
|
|
def scard_multi(self, keys): |
|
148
|
|
|
"""Returns the total cardinality of each key provided. |
|
149
|
|
|
|
|
150
|
|
|
:param keys: The iterable of keys to return the total cardinality of. |
|
151
|
|
|
:type keys: iterable |
|
152
|
|
|
:raises RedisError: for any socket related exceptions |
|
153
|
|
|
:raises RedisError: for unexpcted server responses |
|
154
|
|
|
:rtype: int |
|
155
|
|
|
""" |
|
156
|
|
|
scard_cmd = "scard {0}\r\n" |
|
157
|
|
|
total = 0 |
|
158
|
|
|
for key in keys: |
|
159
|
|
|
# send scan request |
|
160
|
|
|
self.net.send(scard_cmd.format(key)) |
|
161
|
|
|
# response should just be an int |
|
162
|
|
|
resp = RedisResponse(self.net) |
|
163
|
|
|
total += int(next(resp)) |
|
164
|
|
|
return total |
|
165
|
|
|
|
|
166
|
|
|
def pfcount_multi(self, keys): |
|
167
|
|
|
"""Returns the total cardinality of each key provided. |
|
168
|
|
|
|
|
169
|
|
|
:param keys: The iterable of keys to return the total cardinality of. |
|
170
|
|
|
:type keys: iterable |
|
171
|
|
|
:raises RedisError: for any socket related exceptions |
|
172
|
|
|
:raises RedisError: for unexpcted server responses |
|
173
|
|
|
:rtype: int |
|
174
|
|
|
""" |
|
175
|
|
|
pfcount_cmd = "pfcount {0}\r\n" |
|
176
|
|
|
total = 0 |
|
177
|
|
|
for key in keys: |
|
178
|
|
|
# send scan request |
|
179
|
|
|
self.net.send(pfcount_cmd.format(key)) |
|
180
|
|
|
# response should just be an int |
|
181
|
|
|
resp = RedisResponse(self.net) |
|
182
|
|
|
total += int(next(resp)) |
|
183
|
|
|
return total |
|
184
|
|
|
|
|
185
|
|
|
|
|
186
|
|
|
class RedisResponse(object): |
|
187
|
|
|
"""An iterable of redis command responses.""" |
|
188
|
|
|
|
|
189
|
|
|
def __init__(self, reader): |
|
190
|
|
|
"""An iterable of redis command responses. |
|
191
|
|
|
|
|
192
|
|
|
:param reader: A RedisNet reader instance |
|
193
|
|
|
:type reader: RedisNet |
|
194
|
|
|
|
|
195
|
|
|
:raises RedisError: for any socket related errors |
|
196
|
|
|
:raises RedisError: for any unknown responses |
|
197
|
|
|
:raises RedisError: for any Redis Errors returned |
|
198
|
|
|
:raises RedisError: for any ValueErrors encountered when casting |
|
199
|
|
|
""" |
|
200
|
|
|
self.reader = reader |
|
201
|
|
|
# handlers consume responses and add them to self.vals |
|
202
|
|
|
self.func = defaultdict(lambda: self.h_unknown) |
|
203
|
|
|
#self.func["*"] = self.h_array |
|
204
|
|
|
self.func["*"] = lambda buff: self.parse(int(buff)) |
|
205
|
|
|
#self.func["+"] = self.h_simple_str |
|
206
|
|
|
self.func["+"] = lambda buff: self.vals.append(str(buff)) |
|
207
|
|
|
#self.func["$"] = self.h_bulk_str |
|
208
|
|
|
# remove the \r from the string |
|
209
|
|
|
self.func["$"] = lambda buff: \ |
|
210
|
|
|
self.vals.append(self.reader.readnbytes(int(buff) + 2)) |
|
211
|
|
|
#self.func[":"] = self.h_int |
|
212
|
|
|
self.func[":"] = lambda buff: self.vals.append(int(buff)) |
|
213
|
|
|
self.func["-"] = self.h_error |
|
214
|
|
|
self.vals = deque() |
|
215
|
|
|
self.parse() |
|
216
|
|
|
|
|
217
|
|
|
def parse(self, nitems=None): |
|
218
|
|
|
"""Reads the full response from self.sock. |
|
219
|
|
|
|
|
220
|
|
|
:raises RedisError: for any socket related Exceptions |
|
221
|
|
|
:raises RedisError: for any unknown types read |
|
222
|
|
|
:raises RedisError: for any redis protocol errors |
|
223
|
|
|
:rtype: varies |
|
224
|
|
|
""" |
|
225
|
|
|
nitems = 1 if nitems is None else nitems |
|
226
|
|
|
for i in xrange(nitems): |
|
|
|
|
|
|
227
|
|
|
try: |
|
228
|
|
|
buff = self.reader.readline() |
|
229
|
|
|
self.func[buff[0]](buff[1:]) |
|
230
|
|
|
except (ValueError, IndexError) as e: |
|
231
|
|
|
msg = "could not parse response: {0}: {1}" |
|
232
|
|
|
raise RedisError(msg.format(buff, e)) |
|
233
|
|
|
|
|
234
|
|
|
def h_unknown(self, buff): |
|
235
|
|
|
"""Uknown response handler. |
|
236
|
|
|
|
|
237
|
|
|
:param buff: A response buffer read from Redis |
|
238
|
|
|
:type buff: str |
|
239
|
|
|
:raises RedisError: this function always raises a RedisError |
|
240
|
|
|
""" |
|
241
|
|
|
raise RedisError("unknown command: {0}".format(buff)) |
|
242
|
|
|
|
|
243
|
|
|
def h_error(self, buff): |
|
244
|
|
|
"""Raises a RedisError and sets contents to buff. |
|
245
|
|
|
|
|
246
|
|
|
:param buff: A response buffer read from Redis |
|
247
|
|
|
:type buff: str |
|
248
|
|
|
:raises RedisError: on any socket related exceptions |
|
249
|
|
|
:rtype: str |
|
250
|
|
|
""" |
|
251
|
|
|
msg = "RedisResponse: h_error({0})" |
|
|
|
|
|
|
252
|
|
|
raise RedisError("redis error: {0}".format(buff)) |
|
253
|
|
|
|
|
254
|
|
|
def __iter__(self): |
|
255
|
|
|
"""A Redis command response iterator. |
|
256
|
|
|
|
|
257
|
|
|
:rtype: iterator |
|
258
|
|
|
""" |
|
259
|
|
|
return self |
|
260
|
|
|
|
|
261
|
|
|
def __next__(self): |
|
262
|
|
|
"""Return the next response, if any. |
|
263
|
|
|
|
|
264
|
|
|
:rtype: object |
|
265
|
|
|
""" |
|
266
|
|
|
if not self.vals: |
|
267
|
|
|
raise StopIteration() |
|
268
|
|
|
return self.vals.popleft() |
|
269
|
|
|
|
|
270
|
|
|
def next(self): |
|
271
|
|
|
"""Return the next response, if any. |
|
272
|
|
|
|
|
273
|
|
|
:rtype: object |
|
274
|
|
|
""" |
|
275
|
|
|
if not self.vals: |
|
276
|
|
|
raise StopIteration() |
|
277
|
|
|
return self.vals.popleft() |
|
278
|
|
|
|
|
279
|
|
|
|
|
280
|
|
|
class RedisNet(object): |
|
281
|
|
|
"""A helper class that talks to Redis on a unix/tcp socket.""" |
|
282
|
|
|
|
|
283
|
|
|
BUFF_LEN = 8192 |
|
284
|
|
|
|
|
285
|
|
|
def __init__(self, log, addr, sfamily, timeout): |
|
286
|
|
|
"""A helper class that talks to Redis on a unix/tcp socket. |
|
287
|
|
|
|
|
288
|
|
|
:param log: A logger created from loggging.getLogger |
|
289
|
|
|
:type log: logging.RootLogger |
|
290
|
|
|
:param addr: Either a tuple of ('host', port) or a path to a unix socket |
|
291
|
|
|
:type addr: str or tuple(str, int) |
|
292
|
|
|
:param sfamily: The socket family eg. socket.AF_INET or AF_UNIX |
|
293
|
|
|
:type sfamily: int |
|
294
|
|
|
:param timeout: The timeout in seconds for all socket operations |
|
295
|
|
|
:type timeout: float or int |
|
296
|
|
|
""" |
|
297
|
|
|
self.log = log |
|
298
|
|
|
# addr can be unix socket or (host, port) tuple |
|
299
|
|
|
self.addr = addr |
|
300
|
|
|
# socket.AF_INET or socket.AF_UNIX |
|
301
|
|
|
self.sfamily = sfamily |
|
302
|
|
|
# all socket operations timeout |
|
303
|
|
|
self.timeout = timeout |
|
304
|
|
|
self.sock = None |
|
305
|
|
|
# read from our socket into this buffer |
|
306
|
|
|
# keep an index in the buffer that we've read up to |
|
307
|
|
|
# and record the total number of bytes in the buffer |
|
308
|
|
|
self.buff = "" |
|
309
|
|
|
self.buff_end = -1 |
|
310
|
|
|
self.buff_i = -1 |
|
311
|
|
|
|
|
312
|
|
View Code Duplication |
def connect(self): |
|
313
|
|
|
"""Connect to Redis. |
|
314
|
|
|
|
|
315
|
|
|
:raises RedisError: for any socket related exceptions |
|
316
|
|
|
:rtype: Exception or None |
|
317
|
|
|
""" |
|
318
|
|
|
#self.log.debug("RedisNet: connect") |
|
319
|
|
|
if self.sock: |
|
320
|
|
|
self.disconnect() |
|
321
|
|
|
try: |
|
322
|
|
|
# create the socket |
|
323
|
|
|
self.sock = socket.socket(self.sfamily, socket.SOCK_STREAM) |
|
324
|
|
|
# set timeout for socket operations |
|
325
|
|
|
self.sock.settimeout(self.timeout) |
|
326
|
|
|
self.sock.connect(self.addr) |
|
327
|
|
|
msg = "RedisNet: connected: {0}" |
|
328
|
|
|
self.log.info(msg.format(self.addr)) |
|
329
|
|
|
except Exception as e: |
|
330
|
|
|
msg = "RedisNet: Exception during connect: {0}" |
|
331
|
|
|
self.log.error(msg.format(e)) |
|
332
|
|
|
raise RedisError(msg.format(e)) |
|
333
|
|
|
return True |
|
334
|
|
|
|
|
335
|
|
|
def disconnect(self): |
|
336
|
|
|
"""Disconnect from Redis. |
|
337
|
|
|
|
|
338
|
|
|
:raises RedisError: for any socket related exceptions |
|
339
|
|
|
""" |
|
340
|
|
|
self.log.debug("RedisNet: disconnect") |
|
341
|
|
|
if self.sock: |
|
342
|
|
|
try: |
|
343
|
|
|
self.sock.close() |
|
344
|
|
|
self.sock = None |
|
345
|
|
|
except Exception as e: |
|
346
|
|
|
msg = "RedisNet: exception during disconnect: {0}" |
|
347
|
|
|
self.log.error(msg.format(e)) |
|
348
|
|
|
raise RedisError(msg.format(e)) |
|
349
|
|
|
|
|
350
|
|
|
def read(self): |
|
351
|
|
|
"""Read RedisNet.BUFF_LEN bytes from our socket into self.buff. |
|
352
|
|
|
|
|
353
|
|
|
Calls here overwrite self.buff and reset self.buff_i and |
|
354
|
|
|
self.buff_end. |
|
355
|
|
|
|
|
356
|
|
|
:raises RedisError: for any socket related exceptions |
|
357
|
|
|
""" |
|
358
|
|
|
#self.log.debug("RedisNet: read") |
|
359
|
|
|
if not self.sock and not self.connect(): |
|
360
|
|
|
msg = "RedisNet: unable to connect to: {0}" |
|
361
|
|
|
raise RedisError(msg.format(self.addr)) |
|
362
|
|
|
|
|
363
|
|
|
try: |
|
364
|
|
|
self.buff = self.sock.recv(RedisNet.BUFF_LEN) |
|
365
|
|
|
self.buff_end = len(self.buff) |
|
366
|
|
|
self.buff_i = 0 |
|
367
|
|
|
except Exception as e: |
|
368
|
|
|
msg = "RedisNet: Exception during readline: {0}" |
|
369
|
|
|
self.log.error(msg.format(e)) |
|
370
|
|
|
self.disconnect() |
|
371
|
|
|
raise RedisError(msg.format(e)) |
|
372
|
|
|
|
|
373
|
|
|
def recv(self, nbytes): |
|
374
|
|
|
"""Read nbytes from our socket and return it. |
|
375
|
|
|
|
|
376
|
|
|
:param nbytes: The number of bytes to read |
|
377
|
|
|
:type nbytes: int |
|
378
|
|
|
:raises RedisError: for any socket related exceptions |
|
379
|
|
|
:rytpe: str |
|
380
|
|
|
""" |
|
381
|
|
|
#self.log.debug("RedisNet: recv({0})".format(nbytes)) |
|
382
|
|
|
if not self.sock and not self.connect(): |
|
383
|
|
|
msg = "RedisNet: unable to connect to: {0}" |
|
384
|
|
|
raise RedisError(msg.format(self.addr)) |
|
385
|
|
|
|
|
386
|
|
|
ret = "" |
|
387
|
|
|
try: |
|
388
|
|
|
ret = self.sock.recv(nbytes) |
|
389
|
|
|
except Exception as e: |
|
390
|
|
|
msg = "RedisNet: Exception during recv: {0}" |
|
391
|
|
|
self.log.error(msg.format(e)) |
|
392
|
|
|
self.disconnect() |
|
393
|
|
|
raise RedisError(msg.format(e)) |
|
394
|
|
|
return ret |
|
395
|
|
|
|
|
396
|
|
|
def readline(self): |
|
397
|
|
|
"""Get the next available line. |
|
398
|
|
|
|
|
399
|
|
|
:raises RedisError: for any socket related exceptions |
|
400
|
|
|
:rytpe: str |
|
401
|
|
|
""" |
|
402
|
|
|
msg = "RedisNet: readline" |
|
403
|
|
|
#self.log.debug(msg) |
|
404
|
|
|
if not self.sock and not self.connect(): |
|
405
|
|
|
msg = "RedisNet: unable to connect to: {0}" |
|
406
|
|
|
raise RedisError(msg.format(self.addr)) |
|
407
|
|
|
|
|
408
|
|
|
buffs = deque() |
|
409
|
|
|
while True: |
|
410
|
|
|
# do we have any data available? |
|
411
|
|
|
if self.buff_end < 0 or self.buff_i >= self.buff_end: |
|
412
|
|
|
# read data, reset buffer state |
|
413
|
|
|
while self.buff_end < 1: |
|
414
|
|
|
self.read() |
|
415
|
|
|
# now we have data, do we have a newline? |
|
416
|
|
|
i = self.buff[self.buff_i:].find("\n") |
|
417
|
|
|
if i > -1: |
|
418
|
|
|
# return line, advance buffer past it |
|
419
|
|
|
# move i past the newline |
|
420
|
|
|
# also need to find |
|
421
|
|
|
buff_i = self.buff_i |
|
422
|
|
|
buffs.append(self.buff[buff_i:buff_i+i]) |
|
423
|
|
|
# advance beyond i |
|
424
|
|
|
self.buff_i = buff_i + i + 1 |
|
425
|
|
|
# reset if we have no buffer left |
|
426
|
|
|
if self.buff_i >= self.buff_end: |
|
427
|
|
|
self.buff_i = -1 |
|
428
|
|
|
self.buff_end = -1 |
|
429
|
|
|
break |
|
430
|
|
|
# no newline yet, record and keep reading |
|
431
|
|
|
buffs.append(self.buff[self.buff_i:]) |
|
432
|
|
|
self.buff_end = -1 |
|
433
|
|
|
self.buff_i = -1 |
|
434
|
|
|
ret = "".join(buffs) |
|
435
|
|
|
return ret |
|
436
|
|
|
|
|
437
|
|
|
def readnbytes(self, nbytes): |
|
438
|
|
|
"""Read nbytes from our socket. |
|
439
|
|
|
|
|
440
|
|
|
:param nbytes: The number of bytes to read |
|
441
|
|
|
:type nbytes: int |
|
442
|
|
|
:raises RedisError: for any socket related exceptions |
|
443
|
|
|
:rytpe: str |
|
444
|
|
|
""" |
|
445
|
|
|
#self.log.debug("RedisNet: readnbytes({0})".format(nbytes)) |
|
446
|
|
|
|
|
447
|
|
|
# any bytes in our buffer? |
|
448
|
|
|
ret = "" |
|
449
|
|
|
buffs = deque() |
|
450
|
|
|
if self.buff_end and self.buff_i < self.buff_end: |
|
451
|
|
|
# do we have enough buffer to fullfill the request? |
|
452
|
|
|
nbytes_left = self.buff_end - self.buff_i |
|
453
|
|
|
if nbytes_left >= nbytes: |
|
454
|
|
|
# yes, advance our pointer |
|
455
|
|
|
buffi = self.buff_i |
|
456
|
|
|
buffs.append(self.buff[buffi:buffi+nbytes]) |
|
457
|
|
|
self.buff_i += nbytes |
|
458
|
|
|
nbytes = 0 |
|
459
|
|
|
else: |
|
460
|
|
|
# no, consume all of the buffer and then get remaining |
|
461
|
|
|
buffs.append(self.buff[self.buff_i:]) |
|
462
|
|
|
# reset so next access on buffer forces a read |
|
463
|
|
|
self.buff_i = -1 |
|
464
|
|
|
self.buff_end = -1 |
|
465
|
|
|
nbytes -= nbytes_left |
|
466
|
|
|
# do we need more bytes? |
|
467
|
|
|
if nbytes: |
|
468
|
|
|
# just do a recv - don't use our buffer |
|
469
|
|
|
buffs.append(self.recv(nbytes)) |
|
470
|
|
|
|
|
471
|
|
|
# join the buffers |
|
472
|
|
|
ret = "".join(buffs) |
|
473
|
|
|
return ret |
|
474
|
|
|
|
|
475
|
|
|
def send(self, cmd): |
|
476
|
|
|
"""Send the supplied string to the redis server. |
|
477
|
|
|
|
|
478
|
|
|
:param cmd: The string to send to the redis server |
|
479
|
|
|
:type cmd: str |
|
480
|
|
|
:raises RedisError: for any socket related exceptions |
|
481
|
|
|
""" |
|
482
|
|
|
if not self.sock and not self.connect(): |
|
483
|
|
|
msg = "RedisNet: unable to connect to: {0}" |
|
484
|
|
|
raise RedisError(msg.format(self.addr)) |
|
485
|
|
|
# send info request |
|
486
|
|
|
try: |
|
487
|
|
|
self.sock.sendall(cmd) |
|
488
|
|
|
#self.log.debug("RedisNet: send: {0}".format(cmd.strip())) |
|
489
|
|
|
except Exception as e: |
|
490
|
|
|
msg = "RedisNet: exception sending to server: {0}" |
|
491
|
|
|
self.log.error(msg.format(e)) |
|
492
|
|
|
self.disconnect() |
|
493
|
|
|
raise RedisError(msg.format(e)) |
|
494
|
|
|
|
|
495
|
|
|
|
|
496
|
|
|
class Redis(plumd.Reader): |
|
497
|
|
|
"""Plugin to record redis metrics.""" |
|
498
|
|
|
|
|
499
|
|
|
# default config values |
|
500
|
|
|
defaults = { |
|
501
|
|
|
'poll.interval': 10, |
|
502
|
|
|
'gauges': [ |
|
503
|
|
|
"aof_current_rewrite_time_sec", |
|
504
|
|
|
"aof_enabled", |
|
505
|
|
|
"aof_last_rewrite_time_sec", |
|
506
|
|
|
"aof_rewrite_in_progress", |
|
507
|
|
|
"aof_rewrite_scheduled", |
|
508
|
|
|
"blocked_clients", |
|
509
|
|
|
"client_biggest_input_buf", |
|
510
|
|
|
"client_longest_output_list", |
|
511
|
|
|
"connected_clients", |
|
512
|
|
|
"connected_slaves", |
|
513
|
|
|
"evicted_keys", |
|
514
|
|
|
"expired_keys", |
|
515
|
|
|
"instantaneous_input_kbps", |
|
516
|
|
|
"instantaneous_ops_per_sec", |
|
517
|
|
|
"instantaneous_output_kbps", |
|
518
|
|
|
"keyspace_hits", |
|
519
|
|
|
"keyspace_misses", |
|
520
|
|
|
"latest_fork_usec", |
|
521
|
|
|
"loading", |
|
522
|
|
|
"master_repl_offset", |
|
523
|
|
|
"mem_fragmentation_ratio", |
|
524
|
|
|
"pubsub_channels", |
|
525
|
|
|
"pubsub_patterns", |
|
526
|
|
|
"rdb_bgsave_in_progress", |
|
527
|
|
|
"rdb_changes_since_last_save", |
|
528
|
|
|
"rdb_current_bgsave_time_sec", |
|
529
|
|
|
"rdb_last_bgsave_time_sec", |
|
530
|
|
|
"rdb_last_save_time", |
|
531
|
|
|
"rejected_connections", |
|
532
|
|
|
"repl_backlog_active", |
|
533
|
|
|
"repl_backlog_first_byte_offset", |
|
534
|
|
|
"repl_backlog_histlen", |
|
535
|
|
|
"repl_backlog_size", |
|
536
|
|
|
"sync_full", |
|
537
|
|
|
"sync_partial_err", |
|
538
|
|
|
"sync_partial_ok", |
|
539
|
|
|
"total_commands_processed", |
|
540
|
|
|
"total_connections_received", |
|
541
|
|
|
"total_net_input_bytes", |
|
542
|
|
|
"total_net_output_bytes", |
|
543
|
|
|
"uptime_in_days", |
|
544
|
|
|
"uptime_in_seconds", |
|
545
|
|
|
"used_cpu_sys", |
|
546
|
|
|
"used_cpu_sys_children", |
|
547
|
|
|
"used_cpu_user", |
|
548
|
|
|
"used_cpu_user_children", |
|
549
|
|
|
"used_memory", |
|
550
|
|
|
"used_memory_lua", |
|
551
|
|
|
"used_memory_peak", |
|
552
|
|
|
"used_memory_rss", |
|
553
|
|
|
"master_last_io_seconds_ago", |
|
554
|
|
|
"master_sync_in_progress", |
|
555
|
|
|
"slave_repl_offset", |
|
556
|
|
|
"slave_priority", |
|
557
|
|
|
"slave_read_only", |
|
558
|
|
|
"connected_slaves", |
|
559
|
|
|
"master_repl_offset", |
|
560
|
|
|
"repl_backlog_active", |
|
561
|
|
|
"repl_backlog_size", |
|
562
|
|
|
"repl_backlog_first_byte_offset", |
|
563
|
|
|
"repl_backlog_histlen" |
|
564
|
|
|
"connected_slaves" |
|
565
|
|
|
], |
|
566
|
|
|
'rates': [], |
|
567
|
|
|
'configs': [ |
|
568
|
|
|
'maxmemory' |
|
569
|
|
|
], |
|
570
|
|
|
'keys': { |
|
571
|
|
|
# 'type': { metric_prefix: [key_prefix*, ...] } |
|
572
|
|
|
'lists': {}, |
|
573
|
|
|
'zsets': {}, |
|
574
|
|
|
'sets': {}, |
|
575
|
|
|
'hlls': {} |
|
576
|
|
|
}, |
|
577
|
|
|
'addr': '127.0.0.1:6379', |
|
578
|
|
|
'addr_type': 'inet', |
|
579
|
|
|
'timeout': 10 |
|
580
|
|
|
} |
|
581
|
|
|
|
|
582
|
|
|
def __init__(self, log, config): |
|
583
|
|
|
"""Plugin to record redis metrics. |
|
584
|
|
|
|
|
585
|
|
|
:param log: A logger |
|
586
|
|
|
:type log: logging.RootLogger |
|
587
|
|
|
:param config: a plumd.config.Conf configuration helper instance. |
|
588
|
|
|
:type config: plumd.config.Conf |
|
589
|
|
|
""" |
|
590
|
|
|
super(Redis, self).__init__(log, config) |
|
591
|
|
|
self.config.defaults(Redis.defaults) |
|
592
|
|
|
|
|
593
|
|
|
# metrics to record |
|
594
|
|
|
self.gauges = self.config.get('gauges') |
|
595
|
|
|
self.rates = self.config.get('rates') |
|
596
|
|
|
self.configs = self.config.get('configs') |
|
597
|
|
|
self.keys = self.config.get('keys') |
|
598
|
|
|
|
|
599
|
|
|
# Redis connection - either unix socket or tcp |
|
600
|
|
|
addr = self.config.get('addr') |
|
601
|
|
|
addr_type = self.config.get('addr_type').lower() |
|
602
|
|
|
if addr_type == "unix": |
|
603
|
|
|
sfamily = socket.AF_UNIX |
|
604
|
|
|
elif addr_type == "inet": |
|
605
|
|
|
try: |
|
606
|
|
|
host, port = addr.split(":") |
|
607
|
|
|
except AttributeError as e: |
|
|
|
|
|
|
608
|
|
|
msg = "Redis: invalid address: {0}, (host:port)" |
|
609
|
|
|
raise plumd.ConfigError(msg.format(addr)) |
|
610
|
|
|
addr = (host, int(port)) |
|
611
|
|
|
sfamily = socket.AF_INET |
|
612
|
|
|
else: |
|
613
|
|
|
msg = "Redis: unsupported connection type: {0} (unix, inet)" |
|
614
|
|
|
raise plumd.ConfigError(msg.format(addr_type)) |
|
615
|
|
|
timeout = config.get('timeout') |
|
616
|
|
|
self.client = RedisClient(self.log, addr, sfamily, timeout) |
|
617
|
|
|
self.calc = Differential() |
|
618
|
|
|
|
|
619
|
|
|
def poll(self): |
|
620
|
|
|
"""Query Redis for metrics. |
|
621
|
|
|
|
|
622
|
|
|
:rtype: ResultSet |
|
623
|
|
|
""" |
|
624
|
|
|
# catch exceptions - simply skip the poll on error |
|
625
|
|
|
try: |
|
626
|
|
|
result = plumd.Result("redis") |
|
627
|
|
|
|
|
628
|
|
|
# config values |
|
629
|
|
|
self.record_configs(result) |
|
630
|
|
|
|
|
631
|
|
|
# key sizes |
|
632
|
|
|
self.record_sizes(result) |
|
633
|
|
|
|
|
634
|
|
|
# get server metrics |
|
635
|
|
|
stats = self.client.info() |
|
636
|
|
|
|
|
637
|
|
|
# record gauges, rates |
|
638
|
|
|
self.record_metrics(stats, result) |
|
639
|
|
|
|
|
640
|
|
|
# replication, if any slaves are connected |
|
641
|
|
|
if "slave0" in stats: |
|
642
|
|
|
self.record_slaves(stats, result) |
|
643
|
|
|
|
|
644
|
|
|
# db metrics, maxmem |
|
645
|
|
|
self.record_dbs(stats, result) |
|
646
|
|
|
|
|
647
|
|
|
# record lists, zsets, sets and hll sizes |
|
648
|
|
|
self.record_sizes(result) |
|
649
|
|
|
|
|
650
|
|
|
# and finally command stats - if available |
|
651
|
|
|
self.record_cmdstats(result) |
|
652
|
|
|
|
|
653
|
|
|
except RedisError as e: |
|
654
|
|
|
msg = "Redis: exception during poll: {0}" |
|
655
|
|
|
self.log.error(msg.format(e)) |
|
656
|
|
|
return plumd.ResultSet([result]) |
|
657
|
|
|
|
|
658
|
|
|
def record_cmdstats(self, result): |
|
659
|
|
|
"""Record the stats from info commandstats. |
|
660
|
|
|
|
|
661
|
|
|
:param result: A result object to add metrics to |
|
662
|
|
|
:type result: ResultSet |
|
663
|
|
|
""" |
|
664
|
|
|
ts = time.time() |
|
|
|
|
|
|
665
|
|
|
name = self.name |
|
666
|
|
|
infos = self.client.info("commandstats") |
|
667
|
|
|
for key in sorted(infos.keys()): |
|
668
|
|
|
vals = infos[key].split(",") |
|
669
|
|
|
cstat, cname = key.split("_") |
|
670
|
|
|
for val in vals: |
|
671
|
|
|
mname, mval = val.split("=") |
|
672
|
|
|
metric = "{0}.{1}.{2}.{3}".format(name, cstat, cname, mname) |
|
673
|
|
|
result.add(plumd.Float(metric, mval)) |
|
674
|
|
|
|
|
675
|
|
|
def record_metrics(self, stats, result): |
|
676
|
|
|
"""Record the configured gauges and metrics |
|
677
|
|
|
|
|
678
|
|
|
:param stats: Dictionary returned from info command |
|
679
|
|
|
:type stats: dict |
|
680
|
|
|
:param result: A result object to add metrics to |
|
681
|
|
|
:type result: ResultSet |
|
682
|
|
|
""" |
|
683
|
|
|
ts = time.time() |
|
684
|
|
|
name = self.name |
|
685
|
|
|
|
|
686
|
|
|
# record gauges |
|
687
|
|
|
for stat in self.gauges: |
|
688
|
|
|
if stat in stats: |
|
689
|
|
|
mname = "{0}.{1}".format(name, stat) |
|
690
|
|
|
result.add(plumd.Float(mname, stats[stat])) |
|
691
|
|
|
|
|
692
|
|
|
# record rates |
|
693
|
|
|
for stat in self.rates: |
|
694
|
|
|
if stat in stats: |
|
695
|
|
|
mname = "{0}.{1}".format(name, stat) |
|
696
|
|
|
mval = self.calc.per_second(mname, float(stats[stat]), ts) |
|
697
|
|
|
result.add(plumd.Float(mname, mval)) |
|
698
|
|
|
|
|
699
|
|
|
def record_dbs(self, stats, result): |
|
700
|
|
|
"""Record per database metrics into result. |
|
701
|
|
|
|
|
702
|
|
|
:param stats: Dictionary returned from info command |
|
703
|
|
|
:type stats: dict |
|
704
|
|
|
:param result: A result object to add metrics to |
|
705
|
|
|
:type result: ResultSet |
|
706
|
|
|
""" |
|
707
|
|
|
# db0:keys=1,expires=0,avg_ttl=0 |
|
708
|
|
|
name = self.name |
|
709
|
|
|
db_fmt = "db{0}" |
|
710
|
|
|
metric_fmt = "{0}.db.{1}.{2}" |
|
711
|
|
|
|
|
712
|
|
|
for i in xrange(0, len(stats.keys())): |
|
|
|
|
|
|
713
|
|
|
dbname = db_fmt.format(i) |
|
714
|
|
|
if dbname not in stats: |
|
715
|
|
|
break |
|
716
|
|
|
try: |
|
717
|
|
|
vals = stats[dbname].split(",") |
|
718
|
|
|
dbmetrics = dict((k, v) |
|
719
|
|
|
for k, v in (v.split('=') for v in vals)) |
|
720
|
|
|
for k, v in dbmetrics.items(): |
|
721
|
|
|
metric_str = metric_fmt.format(name, i, k) |
|
722
|
|
|
result.add(plumd.Int(metric_str, v)) |
|
723
|
|
|
except KeyError as E: |
|
|
|
|
|
|
724
|
|
|
self.log.error("Redis: invalid db entry: {0}".format(dbname)) |
|
725
|
|
|
|
|
726
|
|
|
def record_slaves(self, stats, result): |
|
727
|
|
|
"""Record slave metrics into result. |
|
728
|
|
|
|
|
729
|
|
|
:param stats: A dictionary returned from info command |
|
730
|
|
|
:type stats: dict |
|
731
|
|
|
:param result: A ResultSet object to add metrics to |
|
732
|
|
|
:type result: ResultSet |
|
733
|
|
|
""" |
|
734
|
|
|
# slave0:ip=127.0.0.1,port=6399,state=online,offset=239,lag=1 |
|
735
|
|
|
name = self.name |
|
736
|
|
|
slave_str = "slave{0}" |
|
737
|
|
|
moffstr = 'master_repl_offset' |
|
738
|
|
|
moffset = 0 |
|
739
|
|
|
try: |
|
740
|
|
|
moffset = int(stats[moffstr]) |
|
741
|
|
|
except(TypeError, KeyError) as e: |
|
|
|
|
|
|
742
|
|
|
self.log.error("Redis: no {0} value".format(moffstr)) |
|
743
|
|
|
|
|
744
|
|
|
# for each slave entry |
|
745
|
|
|
for i in xrange(0, len(stats.keys())): |
|
|
|
|
|
|
746
|
|
|
sname = slave_str.format(i) |
|
747
|
|
|
if sname not in stats: |
|
748
|
|
|
break |
|
749
|
|
|
try: |
|
750
|
|
|
vals = stats[sname].split(",") |
|
751
|
|
|
smetrics = dict((k, v) |
|
752
|
|
|
for k, v in (v.split('=') for v in vals)) |
|
753
|
|
|
sip = smetrics['ip'].replace(".", "_") |
|
754
|
|
|
smname = "{0}_{1}".format(sip, smetrics['port']) |
|
755
|
|
|
|
|
756
|
|
|
# record offset and lag |
|
757
|
|
|
mname = "{0}.slave.{1}.offset".format(name, smname) |
|
758
|
|
|
soffset = moffset - int(smetrics['offset']) |
|
759
|
|
|
result.add(plumd.Int(mname, soffset)) |
|
760
|
|
|
mname = "{0}.slave.{1}.lag".format(name, sname) |
|
761
|
|
|
result.add(plumd.Int(mname, smetrics['lag'])) |
|
762
|
|
|
|
|
763
|
|
|
# if slave is online set online = 1, otherwise 0 |
|
764
|
|
|
sonline = 1 if smetrics['state'] == "online" else 0 |
|
765
|
|
|
mname = "{0}.slave.{1}.online".format(name, sname) |
|
766
|
|
|
result.add(plumd.Int(mname, sonline)) |
|
767
|
|
|
except(TypeError, KeyError, ValueError) as e: |
|
768
|
|
|
self.log.error("Redis: invalid slave entry: {0}".format(sname)) |
|
769
|
|
|
|
|
770
|
|
|
def record_configs(self, result): |
|
771
|
|
|
"""Record the configured configuration values. |
|
772
|
|
|
|
|
773
|
|
|
:param result: A ResultSet to record max mem to. |
|
774
|
|
|
:type result: plumd.ResultSet |
|
775
|
|
|
""" |
|
776
|
|
|
configs = self.configs |
|
777
|
|
|
if not configs: |
|
778
|
|
|
return |
|
779
|
|
|
name = self.name |
|
780
|
|
|
for config in self.client.config_get_multi(configs): |
|
781
|
|
|
for key, val in config.items(): |
|
782
|
|
|
mstr = "{0}.configs.{1}".format(name, key) |
|
783
|
|
|
result.add(plumd.Float(mstr, val)) |
|
784
|
|
|
|
|
785
|
|
|
def record_sizes(self, result): |
|
786
|
|
|
"""For each type of key (list, zset, set, hyperloglog) |
|
787
|
|
|
scan for a list of keys matching the prefix and record the |
|
788
|
|
|
total number of items for the matching prefix. |
|
789
|
|
|
|
|
790
|
|
|
:param result: A ResultSet to record into. |
|
791
|
|
|
:type result: plumd.ResultSet |
|
792
|
|
|
""" |
|
793
|
|
|
if not self.keys: |
|
794
|
|
|
return |
|
795
|
|
|
keys = self.config.get("keys") |
|
796
|
|
|
if "lists" in keys: |
|
797
|
|
|
self.record_lists(keys['lists'], result) |
|
798
|
|
|
if "zsets" in keys: |
|
799
|
|
|
self.record_zsets(keys['zsets'], result) |
|
800
|
|
|
if "sets" in keys: |
|
801
|
|
|
self.record_sets(keys['sets'], result) |
|
802
|
|
|
if "hlls" in keys: |
|
803
|
|
|
self.record_hlls(keys['hlls'], result) |
|
804
|
|
|
|
|
805
|
|
|
def record_lists(self, lconfig, result): |
|
806
|
|
|
"""Record the total length of the configured lists: |
|
807
|
|
|
|
|
808
|
|
|
eg. lconfig: {"metric_name": [ "list*", "of*", "globs*"]} |
|
809
|
|
|
|
|
810
|
|
|
:param lconfig: A dict of metric name => globs |
|
811
|
|
|
:type lconfig: dict |
|
812
|
|
|
:param result: A ResultSet to record into. |
|
813
|
|
|
:type result: plumd.ResultSet |
|
814
|
|
|
""" |
|
815
|
|
|
name = self.name |
|
816
|
|
|
for mprefix, kprefixes in lconfig.items(): |
|
817
|
|
|
for prefix in kprefixes: |
|
818
|
|
|
# get the total for this prefix |
|
819
|
|
|
total = self.client.llen_multi(self.client.scan(prefix)) |
|
820
|
|
|
mstr = "{0}.sizes.lists.{1}".format(name, mprefix) |
|
821
|
|
|
result.add(plumd.Int(mstr, total)) |
|
822
|
|
|
|
|
823
|
|
|
def record_zsets(self, zconfig, result): |
|
824
|
|
|
"""Record the total length of the configured zsets: |
|
825
|
|
|
|
|
826
|
|
|
eg. zconfig: {"metric_name": [ "list*", "of*", "globs*"]} |
|
827
|
|
|
|
|
828
|
|
|
:param zconfig: A dict of metric name => globs |
|
829
|
|
|
:type zconfig: dict |
|
830
|
|
|
:param result: A ResultSet to record into. |
|
831
|
|
|
:type result: plumd.ResultSet |
|
832
|
|
|
""" |
|
833
|
|
|
name = self.name |
|
834
|
|
|
for mprefix, kprefixes in zconfig.items(): |
|
835
|
|
|
for prefix in kprefixes: |
|
836
|
|
|
# get the total for this prefix |
|
837
|
|
|
total = self.client.zcard_multi(self.client.scan(prefix)) |
|
838
|
|
|
mstr = "{0}.sizes.zset.{1}".format(name, mprefix) |
|
839
|
|
|
result.add(plumd.Int(mstr, total)) |
|
840
|
|
|
|
|
841
|
|
|
def record_sets(self, sconfig, result): |
|
842
|
|
|
"""Record the total length of the configured zsets: |
|
843
|
|
|
|
|
844
|
|
|
eg. sconfig: {"metric_name": [ "list*", "of*", "globs*"]} |
|
845
|
|
|
|
|
846
|
|
|
:param sconfig: A dict of metric name => globs |
|
847
|
|
|
:type sconfig: dict |
|
848
|
|
|
:param result: A ResultSet to record into. |
|
849
|
|
|
:type result: plumd.ResultSet |
|
850
|
|
|
""" |
|
851
|
|
|
name = self.name |
|
852
|
|
|
for mprefix, kprefixes in sconfig.items(): |
|
853
|
|
|
for prefix in kprefixes: |
|
854
|
|
|
# get the total for this prefix |
|
855
|
|
|
total = self.client.scard_multi(self.client.scan(prefix)) |
|
856
|
|
|
mstr = "{0}.sizes.set.{1}".format(name, mprefix) |
|
857
|
|
|
result.add(plumd.Int(mstr, total)) |
|
858
|
|
|
|
|
859
|
|
|
def record_hlls(self, hllconfig, result): |
|
860
|
|
|
"""Record the total length of the configured hlls: |
|
861
|
|
|
|
|
862
|
|
|
eg. sconfig: {"metric_name": [ "list*", "of*", "globs*"]} |
|
863
|
|
|
|
|
864
|
|
|
:param hllconfig: A dict of metric name => globs |
|
865
|
|
|
:type hllconfig: dict |
|
866
|
|
|
:param result: A ResultSet to record into. |
|
867
|
|
|
:type result: plumd.ResultSet |
|
868
|
|
|
""" |
|
869
|
|
|
name = self.name |
|
870
|
|
|
for mprefix, kprefixes in hllconfig.items(): |
|
871
|
|
|
for prefix in kprefixes: |
|
872
|
|
|
# get the total for this prefix |
|
873
|
|
|
total = self.client.pfcount_multi(self.client.scan(prefix)) |
|
874
|
|
|
mstr = "{0}.sizes.hll.{1}".format(name, mprefix) |
|
875
|
|
|
result.add(plumd.Int(mstr, total)) |
|
876
|
|
|
|