| Total Complexity | 38 |
| Total Lines | 269 |
| Duplicated Lines | 23.42 % |
| Changes | 1 | ||
| Bugs | 1 | Features | 1 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
| 1 | # -*- coding: utf-8 -*- |
||
| 38 | class Redis(plumd.plugins.Reader): |
||
| 39 | """Plugin to record redis metrics.""" |
||
| 40 | |||
| 41 | INFO_CMD = "info\r\n" |
||
| 42 | RECV_SIZE = 4096 |
||
| 43 | |||
| 44 | # default config values |
||
| 45 | defaults = { |
||
| 46 | 'poll.interval': 10, |
||
| 47 | 'gauges': [ |
||
| 48 | "aof_current_rewrite_time_sec", |
||
| 49 | "aof_enabled", |
||
| 50 | "aof_last_rewrite_time_sec", |
||
| 51 | "aof_rewrite_in_progress", |
||
| 52 | "aof_rewrite_scheduled", |
||
| 53 | "blocked_clients", |
||
| 54 | "client_biggest_input_buf", |
||
| 55 | "client_longest_output_list", |
||
| 56 | "connected_clients", |
||
| 57 | "connected_slaves", |
||
| 58 | "evicted_keys", |
||
| 59 | "expired_keys", |
||
| 60 | "instantaneous_input_kbps", |
||
| 61 | "instantaneous_ops_per_sec", |
||
| 62 | "instantaneous_output_kbps", |
||
| 63 | "keyspace_hits", |
||
| 64 | "keyspace_misses", |
||
| 65 | "latest_fork_usec", |
||
| 66 | "loading", |
||
| 67 | "master_repl_offset", |
||
| 68 | "mem_fragmentation_ratio", |
||
| 69 | "pubsub_channels", |
||
| 70 | "pubsub_patterns", |
||
| 71 | "rdb_bgsave_in_progress", |
||
| 72 | "rdb_changes_since_last_save", |
||
| 73 | "rdb_current_bgsave_time_sec", |
||
| 74 | "rdb_last_bgsave_time_sec", |
||
| 75 | "rdb_last_save_time", |
||
| 76 | "rejected_connections", |
||
| 77 | "repl_backlog_active", |
||
| 78 | "repl_backlog_first_byte_offset", |
||
| 79 | "repl_backlog_histlen", |
||
| 80 | "repl_backlog_size", |
||
| 81 | "sync_full", |
||
| 82 | "sync_partial_err", |
||
| 83 | "sync_partial_ok", |
||
| 84 | "total_commands_processed", |
||
| 85 | "total_connections_received", |
||
| 86 | "total_net_input_bytes", |
||
| 87 | "total_net_output_bytes", |
||
| 88 | "uptime_in_days", |
||
| 89 | "uptime_in_seconds", |
||
| 90 | "used_cpu_sys", |
||
| 91 | "used_cpu_sys_children", |
||
| 92 | "used_cpu_user", |
||
| 93 | "used_cpu_user_children", |
||
| 94 | "used_memory", |
||
| 95 | "used_memory_lua", |
||
| 96 | "used_memory_peak", |
||
| 97 | "used_memory_rss", |
||
| 98 | "master_last_io_seconds_ago", |
||
| 99 | "master_sync_in_progress", |
||
| 100 | "slave_repl_offset", |
||
| 101 | "slave_priority", |
||
| 102 | "slave_read_only", |
||
| 103 | "connected_slaves", |
||
| 104 | "master_repl_offset", |
||
| 105 | "repl_backlog_active", |
||
| 106 | "repl_backlog_size", |
||
| 107 | "repl_backlog_first_byte_offset", |
||
| 108 | "repl_backlog_histlen" |
||
| 109 | "connected_slaves" ], |
||
| 110 | 'rates': [], |
||
| 111 | 'host': '127.0.0.1', # Redis server hostname/ip |
||
| 112 | 'port': 6379, # Redis server port |
||
| 113 | 'timeout': 10, # connection timeouts |
||
| 114 | 'retry_time': 30, # pause n seconds between reconnects |
||
| 115 | 'retry_cnt': 3 # retry connection n times |
||
| 116 | } |
||
| 117 | |||
| 118 | def __init__(self, log, config): |
||
| 119 | """Plugin to record redis metrics. |
||
| 120 | |||
| 121 | :param log: A logger |
||
| 122 | :type log: logging.RootLogger |
||
| 123 | :param config: a plumd.config.Conf configuration helper instance. |
||
| 124 | :type config: plumd.config.Conf |
||
| 125 | """ |
||
| 126 | super(Redis, self).__init__(log, config) |
||
| 127 | self.config.defaults(Redis.defaults) |
||
| 128 | |||
| 129 | # metrics to record |
||
| 130 | self.gauges = self.config.get('gauges') |
||
| 131 | self.rates = self.config.get('rates') |
||
| 132 | |||
| 133 | # Redis connection |
||
| 134 | host = self.config.get('host') |
||
| 135 | port = self.config.get('port') |
||
| 136 | self.addr = (host, port) |
||
| 137 | self.socket = None |
||
| 138 | self.timeout = config.get('timeout') |
||
| 139 | self.retry_time = config.get('retry_time') |
||
| 140 | self.retry_cnt = config.get('retry_cnt') |
||
| 141 | self.calc = Differential() |
||
| 142 | |||
| 143 | |||
| 144 | View Code Duplication | def poll(self): |
|
|
1 ignored issue
–
show
|
|||
| 145 | """Query Redis for metrics. |
||
| 146 | |||
| 147 | :rtype: ResultSet |
||
| 148 | """ |
||
| 149 | result = plumd.Result("redis") |
||
| 150 | |||
| 151 | stats = self.get_metrics() |
||
| 152 | ts = time.time() |
||
| 153 | name = self.name |
||
| 154 | |||
| 155 | # record gauges |
||
| 156 | for stat in self.gauges: |
||
| 157 | if stat in stats: |
||
| 158 | mname = "{0}.{1}".format(name, stat) |
||
| 159 | result.add(plumd.Float(mname, stats[stat])) |
||
| 160 | |||
| 161 | # record rates |
||
| 162 | for stat in self.rates: |
||
| 163 | if stat in stats: |
||
| 164 | mname = "{0}.{1}".format(name, stat) |
||
| 165 | mval = self.calc.per_second(mname, float(stats[stat]), ts) |
||
| 166 | result.add(plumd.Float(mname, mval)) |
||
| 167 | |||
| 168 | # replication |
||
| 169 | if "slave0" in stats: |
||
| 170 | self.record_slave(stats, result) |
||
| 171 | |||
| 172 | # record db metrics |
||
| 173 | self.record_keys(stats, result) |
||
| 174 | |||
| 175 | return plumd.ResultSet([result]) |
||
| 176 | |||
| 177 | |||
| 178 | def record_keys(self, stats, result): |
||
| 179 | #db0:keys=1,expires=0,avg_ttl=0 |
||
| 180 | name = self.name |
||
| 181 | db_fmt = "db{0}" |
||
| 182 | metric_fmt = "{0}.db.{1}.{2}" |
||
| 183 | |||
| 184 | for i in xrange(0, len(stats.keys())): |
||
| 185 | dbname = db_fmt.format(i) |
||
| 186 | if dbname not in stats: |
||
| 187 | break |
||
| 188 | try: |
||
| 189 | vals = stats[dbname].split(",") |
||
| 190 | dbmetrics = dict((k,v) for k,v in (v.split('=') for v in vals)) |
||
| 191 | for k, v in dbmetrics.items(): |
||
| 192 | metric_str = metric_fmt.format(name, i,k) |
||
| 193 | result.add(plumd.Int(metric_str, v)) |
||
| 194 | except KeyError as E: |
||
| 195 | self.log.error("Redis: invalid db entry: {0}".format(dbname)) |
||
| 196 | |||
| 197 | |||
| 198 | def record_slave(self, stats, result): |
||
| 199 | #slave0:ip=127.0.0.1,port=6399,state=online,offset=239,lag=1 |
||
| 200 | name = self.name |
||
| 201 | slave_str = "slave{0}" |
||
| 202 | moffstr = 'master_repl_offset' |
||
| 203 | try: |
||
| 204 | moffset = int(stats[moffstr]) |
||
| 205 | except(TypeError, KeyError) as e: |
||
| 206 | self.log.error("Redis: invalid slave entry: {0}".format(sname)) |
||
| 207 | |||
| 208 | for i in xrange(0, len(stats.keys())): |
||
| 209 | sname = slave_str.format(i) |
||
| 210 | if sname not in stats: |
||
| 211 | break |
||
| 212 | View Code Duplication | try: |
|
|
1 ignored issue
–
show
|
|||
| 213 | vals = stats[sname].split(",") |
||
| 214 | smetrics = dict((k,v) for k,v in (v.split('=') for v in vals)) |
||
| 215 | sip = smetrics['ip'].replace(".", "_") |
||
| 216 | smname = "{0}_{1}".format(sip, smetrics['port']) |
||
| 217 | |||
| 218 | # record offset and lag |
||
| 219 | mname = "{0}.slave.{1}.offset".format(name, sname) |
||
| 220 | soffset = moffset - int(smetrics['offset']) |
||
| 221 | result.add(plumd.Int(mname, soffset)) |
||
| 222 | mname = "{0}.slave.{1}.lag".format(name, sname) |
||
| 223 | result.add(plumd.Int(mname, smetrics['lag'])) |
||
| 224 | |||
| 225 | # if slave is online set online = 1, otherwise 0 |
||
| 226 | sonline = 1 if smetrics['state'] == "online" else 0 |
||
| 227 | mname = "{0}.slave.{1}.online".format(name, sname) |
||
| 228 | result.add(plumd.Int(mname, sonline)) |
||
| 229 | except(TypeError, KeyError, ValueError) as e: |
||
| 230 | self.log.error("Redis: invalid slave entry: {0}".format(sname)) |
||
| 231 | |||
| 232 | |||
| 233 | def get_metrics(self): |
||
| 234 | """Request and read stats from Redis socket.""" |
||
| 235 | stats = {} |
||
| 236 | |||
| 237 | if not self.socket and not self.connect(): |
||
| 238 | return {} |
||
| 239 | |||
| 240 | try: |
||
| 241 | if PY3: |
||
| 242 | self.socket.sendall(bytes(Redis.INFO_CMD, 'utf8')) |
||
| 243 | else: |
||
| 244 | self.socket.sendall(Redis.INFO_CMD) |
||
| 245 | |||
| 246 | st_str = self.socket.recv(Redis.RECV_SIZE) |
||
| 247 | |||
| 248 | for line in st_str.split("\n"): |
||
| 249 | if line.startswith("#") or not line: |
||
| 250 | continue |
||
| 251 | vals = line.split(":") |
||
| 252 | if len(vals) != 2: |
||
| 253 | continue |
||
| 254 | sname, sval = vals |
||
| 255 | msg = "Redis: {0} = {1}" |
||
| 256 | self.log.debug(msg.format(sname, sval)) |
||
| 257 | stats[sname] = sval |
||
| 258 | except Exception as e: |
||
| 259 | msg = "Redis: {0}: poll: exception: {1}" |
||
| 260 | self.log.error(msg.format(self.addr, e)) |
||
| 261 | self.disconnect() |
||
| 262 | |||
| 263 | return stats |
||
| 264 | |||
| 265 | |||
| 266 | def connect(self): |
||
| 267 | """Connect to Redis, returns True if sucessful, False otherwise. |
||
| 268 | |||
| 269 | :rtype: bool |
||
| 270 | """ |
||
| 271 | self.log.debug("Redis: connecting: {0}".format(self.addr)) |
||
| 272 | for i in xrange(0, self.retry_cnt): |
||
| 273 | if self.socket: |
||
| 274 | self.disconnect() |
||
| 275 | try: |
||
| 276 | # create the socket |
||
| 277 | self.socket = socket.socket(socket.AF_INET, |
||
| 278 | socket.SOCK_STREAM) |
||
| 279 | # set timeout for socket operations |
||
| 280 | self.socket.settimeout(self.timeout) |
||
| 281 | # connect |
||
| 282 | self.socket.connect(self.addr) |
||
| 283 | # log and return |
||
| 284 | msg = "Redis: connected: {0}" |
||
| 285 | self.log.info(msg.format(self.addr)) |
||
| 286 | return True |
||
| 287 | except Exception as e: |
||
| 288 | # log exception, ensure cleanup is done (disconnect) |
||
| 289 | msg = "Redis: {0}: connect: excception: {1}" |
||
| 290 | self.log.error(msg.format(self.addr, e)) |
||
| 291 | # pause before reconnecting |
||
| 292 | time.sleep(self.retry_time) |
||
| 293 | self.disconnect() |
||
| 294 | return False |
||
| 295 | |||
| 296 | |||
| 297 | def disconnect(self): |
||
| 298 | """Severe the Redis connection.""" |
||
| 299 | if self.socket: |
||
| 300 | try: |
||
| 301 | self.socket.close() |
||
| 302 | self.socket = None |
||
| 303 | except Exception as e: |
||
| 304 | msg = "Redis: dicsonnect: exception {0}".format(e) |
||
| 305 | self.log.error(msg) |
||
| 306 | self.socket = None |
||
| 307 |