Completed
Push — master ( 17da33...be8f19 )
by Kenny
03:14
created

Redis.record_keys()   B

Complexity

Conditions 7

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
c 0
b 0
f 0
dl 0
loc 18
rs 7.3333
1
# -*- coding: utf-8 -*-
2
3
__author__ = 'Kenny Freeman'
4
__email__ = '[email protected]'
5
__license__ = "ISCL"
6
__docformat__ = 'reStructuredText'
7
8
import sys
9
10
PY3 = sys.version_info > (3,)
11
12
import time
13
import socket
14
15
import plumd
16
import plumd.plugins
17
from plumd.calc import Differential
18
19
20
# -*- coding: utf-8 -*-
21
22
__author__ = 'Kenny Freeman'
23
__email__ = '[email protected]'
24
__license__ = "ISCL"
25
__docformat__ = 'reStructuredText'
26
27
import re
28
import os
29
import sys
30
31
PY3 = sys.version_info > (3,)
32
33
import plumd
34
import plumd.plugins
35
from plumd.util import get_file_list
36
37
38
class Redis(plumd.plugins.Reader):
39
    """Plugin to record redis metrics."""
40
41
    INFO_CMD = "info\r\n"
42
    RECV_SIZE = 4096
43
44
    # default config values
45
    defaults = {
46
        'poll.interval': 10,
47
        'gauges': [
48
            "aof_current_rewrite_time_sec",
49
            "aof_enabled",
50
            "aof_last_rewrite_time_sec",
51
            "aof_rewrite_in_progress",
52
            "aof_rewrite_scheduled",
53
            "blocked_clients",
54
            "client_biggest_input_buf",
55
            "client_longest_output_list",
56
            "connected_clients",
57
            "connected_slaves",
58
            "evicted_keys",
59
            "expired_keys",
60
            "instantaneous_input_kbps",
61
            "instantaneous_ops_per_sec",
62
            "instantaneous_output_kbps",
63
            "keyspace_hits",
64
            "keyspace_misses",
65
            "latest_fork_usec",
66
            "loading",
67
            "master_repl_offset",
68
            "mem_fragmentation_ratio",
69
            "pubsub_channels",
70
            "pubsub_patterns",
71
            "rdb_bgsave_in_progress",
72
            "rdb_changes_since_last_save",
73
            "rdb_current_bgsave_time_sec",
74
            "rdb_last_bgsave_time_sec",
75
            "rdb_last_save_time",
76
            "rejected_connections",
77
            "repl_backlog_active",
78
            "repl_backlog_first_byte_offset",
79
            "repl_backlog_histlen",
80
            "repl_backlog_size",
81
            "sync_full",
82
            "sync_partial_err",
83
            "sync_partial_ok",
84
            "total_commands_processed",
85
            "total_connections_received",
86
            "total_net_input_bytes",
87
            "total_net_output_bytes",
88
            "uptime_in_days",
89
            "uptime_in_seconds",
90
            "used_cpu_sys",
91
            "used_cpu_sys_children",
92
            "used_cpu_user",
93
            "used_cpu_user_children",
94
            "used_memory",
95
            "used_memory_lua",
96
            "used_memory_peak",
97
            "used_memory_rss",
98
            "master_last_io_seconds_ago",
99
            "master_sync_in_progress",
100
            "slave_repl_offset",
101
            "slave_priority",
102
            "slave_read_only",
103
            "connected_slaves",
104
            "master_repl_offset",
105
            "repl_backlog_active",
106
            "repl_backlog_size",
107
            "repl_backlog_first_byte_offset",
108
            "repl_backlog_histlen"
109
            "connected_slaves"        ],
110
        'rates': [],
111
        'host': '127.0.0.1', # Redis server hostname/ip
112
        'port': 6379,        # Redis server port
113
        'timeout': 10,       # connection timeouts
114
        'retry_time': 30,    # pause n seconds between reconnects
115
        'retry_cnt': 3       # retry connection n times
116
    }
117
118
    def __init__(self, log, config):
119
        """Plugin to record redis metrics.
120
121
        :param log: A logger
122
        :type log: logging.RootLogger
123
        :param config: a plumd.config.Conf configuration helper instance.
124
        :type config: plumd.config.Conf
125
        """
126
        super(Redis, self).__init__(log, config)
127
        self.config.defaults(Redis.defaults)
128
129
        # metrics to record
130
        self.gauges = self.config.get('gauges')
131
        self.rates = self.config.get('rates')
132
133
        # Redis connection
134
        host = self.config.get('host')
135
        port = self.config.get('port')
136
        self.addr = (host, port)
137
        self.socket = None
138
        self.timeout = config.get('timeout')
139
        self.retry_time = config.get('retry_time')
140
        self.retry_cnt = config.get('retry_cnt')
141
        self.calc = Differential()
142
143
144 View Code Duplication
    def poll(self):
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
145
        """Query Redis for metrics.
146
147
        :rtype: ResultSet
148
        """
149
        result = plumd.Result("redis")
150
151
        stats = self.get_metrics()
152
        ts = time.time()
153
        name = self.name
154
155
        # record gauges
156
        for stat in self.gauges:
157
            if stat in stats:
158
                mname = "{0}.{1}".format(name, stat)
159
                result.add(plumd.Float(mname, stats[stat]))
160
161
        # record rates
162
        for stat in self.rates:
163
            if stat in stats:
164
                mname = "{0}.{1}".format(name, stat)
165
                mval = self.calc.per_second(mname, float(stats[stat]), ts)
166
                result.add(plumd.Float(mname, mval))
167
168
        # replication
169
        if "slave0" in stats:
170
            self.record_slave(stats, result)
171
172
        # record db metrics
173
        self.record_keys(stats, result)
174
175
        return plumd.ResultSet([result])
176
177
178
    def record_keys(self, stats, result):
179
        #db0:keys=1,expires=0,avg_ttl=0
180
        name = self.name
181
        db_fmt = "db{0}"
182
        metric_fmt = "{0}.db.{1}.{2}"
183
184
        for i in xrange(0, len(stats.keys())):
185
            dbname = db_fmt.format(i)
186
            if dbname not in stats:
187
                break
188
            try:
189
                vals = stats[dbname].split(",")
190
                dbmetrics = dict((k,v) for k,v in (v.split('=') for v in vals))
191
                for k, v in dbmetrics.items():
192
                    metric_str = metric_fmt.format(name, i,k)
193
                    result.add(plumd.Int(metric_str, v))
194
            except KeyError as E:
195
                self.log.error("Redis: invalid db entry: {0}".format(dbname))
196
197
198
    def record_slave(self, stats, result):
199
        #slave0:ip=127.0.0.1,port=6399,state=online,offset=239,lag=1
200
        name = self.name
201
        slave_str = "slave{0}"
202
        moffstr = 'master_repl_offset'
203
        try:
204
            moffset = int(stats[moffstr])
205
        except(TypeError, KeyError) as e:
206
            self.log.error("Redis: invalid slave entry: {0}".format(sname))
207
208
        for i in xrange(0, len(stats.keys())):
209
            sname = slave_str.format(i)
210
            if sname not in stats:
211
                break
212 View Code Duplication
            try:
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
213
                vals = stats[sname].split(",")
214
                smetrics = dict((k,v) for k,v in (v.split('=') for v in vals))
215
                sip = smetrics['ip'].replace(".", "_")
216
                smname = "{0}_{1}".format(sip, smetrics['port'])
217
218
                # record offset and lag
219
                mname = "{0}.slave.{1}.offset".format(name, sname)
220
                soffset = moffset - int(smetrics['offset'])
221
                result.add(plumd.Int(mname, soffset))
222
                mname = "{0}.slave.{1}.lag".format(name, sname)
223
                result.add(plumd.Int(mname, smetrics['lag']))
224
225
                # if slave is online set online = 1, otherwise 0
226
                sonline = 1 if smetrics['state'] == "online" else 0
227
                mname = "{0}.slave.{1}.online".format(name, sname)
228
                result.add(plumd.Int(mname, sonline))
229
            except(TypeError, KeyError, ValueError) as e:
230
                self.log.error("Redis: invalid slave entry: {0}".format(sname))
231
232
233
    def get_metrics(self):
234
        """Request and read stats from Redis socket."""
235
        stats = {}
236
237
        if not self.socket and not self.connect():
238
            return {}
239
240
        try:
241
            if PY3:
242
                self.socket.sendall(bytes(Redis.INFO_CMD, 'utf8'))
243
            else:
244
                self.socket.sendall(Redis.INFO_CMD)
245
246
            st_str = self.socket.recv(Redis.RECV_SIZE)
247
248
            for line in st_str.split("\n"):
249
                if line.startswith("#") or not line:
250
                    continue
251
                vals = line.split(":")
252
                if len(vals) != 2:
253
                    continue
254
                sname, sval = vals
255
                msg = "Redis: {0} = {1}"
256
                self.log.debug(msg.format(sname, sval))
257
                stats[sname] = sval
258
        except Exception as e:
259
            msg = "Redis: {0}: poll: exception: {1}"
260
            self.log.error(msg.format(self.addr, e))
261
            self.disconnect()
262
263
        return stats
264
265
266
    def connect(self):
267
        """Connect to Redis, returns True if sucessful, False otherwise.
268
269
        :rtype: bool
270
        """
271
        self.log.debug("Redis: connecting: {0}".format(self.addr))
272
        for i in xrange(0, self.retry_cnt):
273
            if self.socket:
274
                self.disconnect()
275
            try:
276
                # create the socket
277
                self.socket = socket.socket(socket.AF_INET,
278
                                            socket.SOCK_STREAM)
279
                # set timeout for socket operations
280
                self.socket.settimeout(self.timeout)
281
                # connect
282
                self.socket.connect(self.addr)
283
                # log and return
284
                msg = "Redis: connected: {0}"
285
                self.log.info(msg.format(self.addr))
286
                return True
287
            except Exception as e:
288
                # log exception, ensure cleanup is done (disconnect)
289
                msg = "Redis: {0}: connect: excception: {1}"
290
                self.log.error(msg.format(self.addr, e))
291
                # pause before reconnecting
292
                time.sleep(self.retry_time)
293
                self.disconnect()
294
        return False
295
296
297
    def disconnect(self):
298
        """Severe the Redis connection."""
299
        if self.socket:
300
            try:
301
                self.socket.close()
302
                self.socket = None
303
            except Exception as e:
304
                msg = "Redis: dicsonnect: exception {0}".format(e)
305
                self.log.error(msg)
306
                self.socket = None
307