CouchBase.poll()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 1 Features 1
Metric Value
c 2
b 1
f 1
dl 0
loc 11
rs 9.4285
cc 1
1
# -*- coding: utf-8 -*-
2
"""Couchbase metrics reader."""
3
import sys
4
import time
5
import json
6
import urllib2
0 ignored issues
show
Configuration introduced by
The import urllib2 could not be resolved.

This can be caused by one of the following:

1. Missing Dependencies

This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.

# .scrutinizer.yml
before_commands:
    - sudo pip install abc # Python2
    - sudo pip3 install abc # Python3
Tip: We are currently not using virtualenv to run pylint, when installing your modules make sure to use the command for the correct version.

2. Missing __init__.py files

This error could also result from missing __init__.py files in your module folders. Make sure that you place one file in each sub-folder.

Loading history...
7
from collections import deque
8
from threading import Lock
9
10
from plumd import Counter, Gauge, String, Reader, ResultSet, Result
11
12
PY3 = sys.version_info > (3,)
13
14
__author__ = 'Kenny Freeman'
15
__email__ = '[email protected]'
16
__license__ = "ISCL"
17
__docformat__ = 'reStructuredText'
18
19
20
def get_sample_i(arr, timestamp):
21
    """Return the index in arr where arr[i] < ts.
22
23
    This is used to reduce the samples returned from the Couchbase rest api
24
    to the timestamps occouring after our last poll time.
25
26
    :param timestamp: the last poll timestamp to compare against
27
    :type timestamp: long
28
    :rtype: int
29
    """
30
    # arr metrics include a list of timestamps (milliseconds)
31
    # find the sample number to start recording from, start from the end of the list
32
    sample_i = len(arr) - 1
33
    while sample_i > 0:
34
        if arr[sample_i] < timestamp:
35
            break
36
        sample_i -= 1
37
    return sample_i
38
39
40
def avg(arr):
41
    """Return average value from the list of values.
42
43
    :param arr: A list of int or float values
44
    :type arr: list
45
    :rtype: float
46
    """
47
    if not arr:
48
        return 0.0
49
    ret = 0.0
50
    items = len(arr)
51
    if items > 0:
52
        try:
53
            ret = sum(arr)/items
54
        except TypeError as exc:
0 ignored issues
show
Unused Code introduced by
The variable exc seems to be unused.
Loading history...
55
            ret = 0.0
56
    return ret
57
58
59
def stats(arr):
60
    """Return the min, max and average value from the list of values.
61
62
    :param arr: A list of int or float values
63
    :type arr: list
64
    :rtype: tuple(float, float, float)
65
    """
66
    ret = 0.0
67
    items = len(arr)
68
    if items > 0:
69
        ret = sum(arr)/items
70
    return (min(arr), max(arr), ret)
71
72
73
class CouchBase(Reader):
74
    """Plugin to record nginx stub_status metrics."""
75
76
    # default config values
77
    defaults = {
78
        'poll.interval': 10,
79
        'host': 'localhost',
80
        'port': 8091,
81
        'username': 'Administrator',
82
        'password': '',
83
        'proto': 'http',
84
        'timeout': 10,
85
        'metrics_index': [
86
            'indexerThreads',
87
            'maxRollbackPoints',
88
            'memorySnapshotInterval',
89
            'stableSnapshotInterval'
90
        ],
91
        'metrics_bucket': [
92
            "avg_bg_wait_time",
93
            "avg_disk_commit_time",
94
            "avg_disk_update_time",
95
            "bg_wait_count",
96
            "bg_wait_total",
97
            "bytes_read",
98
            "bytes_written",
99
            "cas_badval",
100
            "cas_hits",
101
            "cas_misses",
102
            "cmd_get",
103
            "cmd_set",
104
            "couch_docs_actual_disk_size",
105
            "couch_docs_data_size",
106
            "couch_docs_disk_size",
107
            "couch_docs_fragmentation",
108
            "couch_spatial_data_size",
109
            "couch_spatial_disk_size",
110
            "couch_spatial_ops",
111
            "couch_total_disk_size",
112
            "couch_views_actual_disk_size",
113
            "couch_views_data_size",
114
            "couch_views_disk_size",
115
            "couch_views_fragmentation",
116
            "couch_views_ops",
117
            "cpu_idle_ms",
118
            "cpu_local_ms",
119
            "cpu_utilization_rate",
120
            "curr_connections",
121
            "curr_items",
122
            "curr_items_tot",
123
            "decr_hits",
124
            "decr_misses",
125
            "delete_hits",
126
            "delete_misses",
127
            "disk_commit_count",
128
            "disk_commit_total",
129
            "disk_update_count",
130
            "disk_update_total",
131
            "disk_write_queue",
132
            "ep_bg_fetched",
133
            "ep_cache_miss_rate",
134
            "ep_dcp_2i_backoff",
135
            "ep_dcp_2i_count",
136
            "ep_dcp_2i_items_remaining",
137
            "ep_dcp_2i_items_sent",
138
            "ep_dcp_2i_producer_count",
139
            "ep_dcp_2i_total_backlog_size",
140
            "ep_dcp_2i_total_bytes",
141
            "ep_dcp_fts_backoff",
142
            "ep_dcp_fts_count",
143
            "ep_dcp_fts_items_remaining",
144
            "ep_dcp_fts_items_sent",
145
            "ep_dcp_fts_producer_count",
146
            "ep_dcp_fts_total_backlog_size",
147
            "ep_dcp_fts_total_bytes",
148
            "ep_dcp_other_backoff",
149
            "ep_dcp_other_count",
150
            "ep_dcp_other_items_remaining",
151
            "ep_dcp_other_items_sent",
152
            "ep_dcp_other_producer_count",
153
            "ep_dcp_other_total_backlog_size",
154
            "ep_dcp_other_total_bytes",
155
            "ep_dcp_replica_backoff",
156
            "ep_dcp_replica_count",
157
            "ep_dcp_replica_items_remaining",
158
            "ep_dcp_replica_items_sent",
159
            "ep_dcp_replica_producer_count",
160
            "ep_dcp_replica_total_backlog_size",
161
            "ep_dcp_replica_total_bytes",
162
            "ep_dcp_views_backoff",
163
            "ep_dcp_views_count",
164
            "ep_dcp_views_items_remaining",
165
            "ep_dcp_views_items_sent",
166
            "ep_dcp_views_producer_count",
167
            "ep_dcp_views_total_backlog_size",
168
            "ep_dcp_views_total_bytes",
169
            "ep_dcp_views+indexes_backoff",
170
            "ep_dcp_views+indexes_count",
171
            "ep_dcp_views+indexes_items_remaining",
172
            "ep_dcp_views+indexes_items_sent",
173
            "ep_dcp_views+indexes_producer_count",
174
            "ep_dcp_views+indexes_total_backlog_size",
175
            "ep_dcp_views+indexes_total_bytes",
176
            "ep_dcp_xdcr_backoff",
177
            "ep_dcp_xdcr_count",
178
            "ep_dcp_xdcr_items_remaining",
179
            "ep_dcp_xdcr_items_sent",
180
            "ep_dcp_xdcr_producer_count",
181
            "ep_dcp_xdcr_total_backlog_size",
182
            "ep_dcp_xdcr_total_bytes",
183
            "ep_diskqueue_drain",
184
            "ep_diskqueue_fill",
185
            "ep_diskqueue_items",
186
            "ep_flusher_todo",
187
            "ep_item_commit_failed",
188
            "ep_kv_size",
189
            "ep_max_size",
190
            "ep_mem_high_wat",
191
            "ep_mem_low_wat",
192
            "ep_meta_data_memory",
193
            "ep_num_non_resident",
194
            "ep_num_ops_del_meta",
195
            "ep_num_ops_del_ret_meta",
196
            "ep_num_ops_get_meta",
197
            "ep_num_ops_set_meta",
198
            "ep_num_ops_set_ret_meta",
199
            "ep_num_value_ejects",
200
            "ep_oom_errors",
201
            "ep_ops_create",
202
            "ep_ops_update",
203
            "ep_overhead",
204
            "ep_queue_size",
205
            "ep_resident_items_rate",
206
            "ep_tap_rebalance_count",
207
            "ep_tap_rebalance_qlen",
208
            "ep_tap_rebalance_queue_backfillremaining",
209
            "ep_tap_rebalance_queue_backoff",
210
            "ep_tap_rebalance_queue_drain",
211
            "ep_tap_rebalance_queue_fill",
212
            "ep_tap_rebalance_queue_itemondisk",
213
            "ep_tap_rebalance_total_backlog_size",
214
            "ep_tap_replica_count",
215
            "ep_tap_replica_qlen",
216
            "ep_tap_replica_queue_backfillremaining",
217
            "ep_tap_replica_queue_backoff",
218
            "ep_tap_replica_queue_drain",
219
            "ep_tap_replica_queue_fill",
220
            "ep_tap_replica_queue_itemondisk",
221
            "ep_tap_replica_total_backlog_size",
222
            "ep_tap_total_count",
223
            "ep_tap_total_qlen",
224
            "ep_tap_total_queue_backfillremaining",
225
            "ep_tap_total_queue_backoff",
226
            "ep_tap_total_queue_drain",
227
            "ep_tap_total_queue_fill",
228
            "ep_tap_total_queue_itemondisk",
229
            "ep_tap_total_total_backlog_size",
230
            "ep_tap_user_count",
231
            "ep_tap_user_qlen",
232
            "ep_tap_user_queue_backfillremaining",
233
            "ep_tap_user_queue_backoff",
234
            "ep_tap_user_queue_drain",
235
            "ep_tap_user_queue_fill",
236
            "ep_tap_user_queue_itemondisk",
237
            "ep_tap_user_total_backlog_size",
238
            "ep_tmp_oom_errors",
239
            "ep_vb_total",
240
            "evictions",
241
            "get_hits",
242
            "get_misses",
243
            "hibernated_requests",
244
            "hibernated_waked",
245
            "hit_ratio",
246
            "incr_hits",
247
            "incr_misses",
248
            "mem_actual_free",
249
            "mem_actual_used",
250
            "mem_free",
251
            "mem_total",
252
            "mem_used",
253
            "mem_used_sys",
254
            "misses",
255
            "ops",
256
            "rest_requests",
257
            "swap_total",
258
            "swap_used",
259
            "timestamp",
260
            "vb_active_eject",
261
            "vb_active_itm_memory",
262
            "vb_active_meta_data_memory",
263
            "vb_active_num",
264
            "vb_active_num_non_resident",
265
            "vb_active_ops_create",
266
            "vb_active_ops_update",
267
            "vb_active_queue_age",
268
            "vb_active_queue_drain",
269
            "vb_active_queue_fill",
270
            "vb_active_queue_size",
271
            "vb_active_resident_items_ratio",
272
            "vb_avg_active_queue_age",
273
            "vb_avg_pending_queue_age",
274
            "vb_avg_replica_queue_age",
275
            "vb_avg_total_queue_age",
276
            "vb_pending_curr_items",
277
            "vb_pending_eject",
278
            "vb_pending_itm_memory",
279
            "vb_pending_meta_data_memory",
280
            "vb_pending_num",
281
            "vb_pending_num_non_resident",
282
            "vb_pending_ops_create",
283
            "vb_pending_ops_update",
284
            "vb_pending_queue_age",
285
            "vb_pending_queue_drain",
286
            "vb_pending_queue_fill",
287
            "vb_pending_queue_size",
288
            "vb_pending_resident_items_ratio",
289
            "vb_replica_curr_items",
290
            "vb_replica_eject",
291
            "vb_replica_itm_memory",
292
            "vb_replica_meta_data_memory",
293
            "vb_replica_num",
294
            "vb_replica_num_non_resident",
295
            "vb_replica_ops_create",
296
            "vb_replica_ops_update",
297
            "vb_replica_queue_age",
298
            "vb_replica_queue_drain",
299
            "vb_replica_queue_fill",
300
            "vb_replica_queue_size",
301
            "vb_replica_resident_items_ratio",
302
            "vb_total_queue_age",
303
            "xdc_ops"
304
        ],
305
        'limit_bucket': False
306
    }
307
308
    def __init__(self, log, config):
309
        """Plugin to record nginx stub_status metrics.
310
311
        :param log: A logger
312
        :type log: logging.RootLogger
313
        :param config: a plumd.config.Conf configuration helper instance.
314
        :type config: plumd.config.Conf
315
        """
316
        super(CouchBase, self).__init__(log, config)
317
        self.config.defaults(CouchBase.defaults)
318
319
        # Nginx connection
320
        self.base_url = "{0}://{1}:{2}".format(self.config.get("proto"),
321
                                               self.config.get("host"),
322
                                               self.config.get("port"))
323
        self.auth = (self.config.get('username'), self.config.get('password'))
324
        self.http_pm = urllib2.HTTPPasswordMgrWithDefaultRealm()
325
        self.urls = dict()
326
        self.timeout = config.get('timeout')
327
        self.lock = Lock()
328
        self.last_poll = 0
329
330
    def authenticate(self, url):
331
        """Setup authentication to the given url.
332
333
        :param url: The URL to setup authentication for.
334
        :type url: str
335
        """
336
        auth = self.auth
337
        with self.lock:
338
            self.http_pm.add_password(None, url, auth[0], auth[1])
339
            handler = urllib2.HTTPBasicAuthHandler(self.http_pm)
340
            opener = urllib2.build_opener(handler)
341
            try:
342
                opener.open(url)
343
            except ValueError as exc:
344
                err = "Couchbase: invalid url: {0}: {1}"
345
                self.log.error(err.format(url, exc))
346
            urllib2.install_opener(opener)
347
            self.urls[url] = True
348
349
    def request(self, url):
350
        """Make an HTTP request and return decoded JSON.
351
352
        :param url: The URL to request.
353
        :type url: str
354
        :rtype: object
355
        """
356
        if url not in self.urls:
357
            self.authenticate(url)
358
359
        resp = ""
360
        ret = dict()
361
        try:
362
            self.lock.acquire(True)
363
            resp = urllib2.urlopen(url, timeout=self.timeout)
364
        except (urllib2.URLError, urllib2.HTTPError, ValueError, AttributeError, IOError) as exc:
365
            err = "Couchbase: HTTP Error: {0}: {1}"
366
            self.log.error(err.format(url, exc))
367
            return ret
368
        finally:
369
            self.lock.release()
370
        if resp is None or resp.getcode() != 200:
0 ignored issues
show
Bug introduced by
The Instance of str does not seem to have a member named getcode.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
371
            return ret
372
        try:
373
            ret = json.load(resp)
374
        except ValueError as exc:
375
            err = "Couchbase: JSON decode error: {0}: {1}: {2}"
376
            self.log.error(err.format(url, resp, exc))
377
            return ret
378
        return ret
379
380
    def get_index_settings(self, rset):
381
        """Return index settings.
382
383
        :param rset: A ResultSet to add results to
384
        :type rset: ResultSet
385
        """
386
        url = "{0}/settings/indexes".format(self.base_url)
387
        data = self.request(url)
388
        metric_names = self.config.get('metrics_index')
389
        metrics = deque()
390
        for name in metric_names:
391
            if name in data:
392
                metrics.append(Gauge(name, data[name]))
393
        res = Result("couchbase.index", metrics)
394
        rset.add(res)
395
        return
396
397
    def get_pool_stats_all(self, rset):
398
        """Get basic pool metrics, add to the ResultSet.
399
400
        todo: reduce cyclo complexity
401
402
        :param rset: A ResultSet to add results to
403
        :type rset: ResultSet
404
        """
405
        # get basic metrics for the given pool
406
        pool_url = "{0}/pools/default/buckets/".format(self.base_url)
407
        data = self.request(pool_url)
408
409
        # response should be an array of dicts - we want basicStats from each
410
        mnames = None
411
        if self.config.get("limit_bucket"):
412
            mnames = deque(self.config.get("metrics_bucket"))
413
        for bucket in data:
414
            try:
415
                # record misc pool metrics
416
                self.get_pool_stats_misc(data, rset)
417
                # also record detailed bucket metrics
418
                self.get_bucket_stats(bucket['name'], rset, mnames)
419
            except KeyError as exc:
420
                err = "Couchbase: get_pool_stats_all: server response missing key: {0}"
421
                self.log.error(err.format(exc))
422
        return
423
424
    def get_pool_stats_misc(self, data, rset):
425
        """Get basic pool metrics, add to the ResultSet.
426
427
        todo: reduce cyclo complexity
428
429
        :param data: Couchbase metrics returned from api call
430
        :type data: dict
431
        :param rset: A ResultSet to add results to
432
        :type rset: ResultSet
433
        """
434
        # data should be an array of dicts - we want basicStats from each
435
        for bucket in data:
436
            cnt_healthy = 0
437
            cnt_repl = 0
438
            cnt_members = 0
439
            metrics = deque()
440
            for entry in bucket['nodes']:
441
                if entry["status"] == "healthy":
442
                    cnt_healthy += 1
443
                if entry["replication"] == 1:
444
                    cnt_repl += 1
445
                if entry["clusterMembership"] == "active":
446
                    cnt_members += 1
447
                if "thisNode" in entry and entry["thisNode"]:
448
                    for key, val in entry["interestingStats"].items():
449
                        # record metrics
450
                        metrics.append(Gauge(key, val))
451
            # other various metrics
452
            metrics.append(Gauge("nodes_healthy", cnt_healthy))
453
            metrics.append(Gauge("nodes_replicating", cnt_repl))
454
            metrics.append(Gauge("nodes_members", cnt_members))
455
            # record metrics into a Result with meta data for the pool infos
456
            res = Result("couchbase.bucket_basic", metrics)
457
            res.meta.add(String("bucket", bucket["name"]))
458
            rset.add(res)
459
        return
460
461
    def get_bucket_stats(self, bucket, rset, mnames):
462
        """Get bucket metrics, add to the ResultSet.
463
464
        :param bucket: The name of the bucket to query
465
        :type bucket: str
466
        :param rset: A ResultSet to add results to
467
        :param mnames: A deque/iterable of metric names to record
468
        :type mnames: deque
469
        :type rset: ResultSet
470
        """
471
        bucket_url = "{0}/pools/default/buckets/{1}/stats".format(self.base_url, bucket)
472
        data = self.request(bucket_url)
473
        try:
474
            metrics = deque()
475
            # arr metrics include a list of timestamps (milliseconds)
476
            # find the sample number to start recording from, start from the end of the list
477
            times = data["op"]["samples"]["timestamp"]
478
            sample_i = get_sample_i(times, self.last_poll)
479
480
            if mnames is not None:
481
                # record average values from the samples
482
                for metric, val in data["op"]["samples"].items():
483
                    if metric not in mnames:
484
                        continue
485
                    avg_val = avg(val[sample_i:])
486
                    metrics.append(Gauge(metric, avg_val))
487
            else:
488
                # record average values from the samples
489
                for metric, val in data["op"]["samples"].items():
490
                    avg_val = avg(val[sample_i:])
491
                    metrics.append(Gauge(metric, avg_val))
492
493
        except KeyError as exc:
494
            err = "Couchbase: get_bucket_stats: server response missing key: {0}"
495
            self.log.error(err.format(exc))
496
        # record metrics into a Result with meta data for the pool infos
497
        res = Result("couchbase.bucket_basic", metrics)
498
        res.meta.add(String("bucket", bucket))
499
        rset.add(res)
500
        return
501
502
    def get_node_stats_all(self, rset):
503
        """Get node metrics, add to the ResultSet.
504
505
        todo:
506
            - record compaction settings
507
            - reduce cyclo complexity
508
509
        :param rset: A ResultSet to add metrics to.
510
        :type rset: ResultSet
511
        """
512
        # fix me - pool is name: url dict
513
        node_url = "{0}/pools/nodes/".format(self.base_url)
514
        data = self.request(node_url)
515
        try:
516
            self.get_node_stats_misc(data, rset)
517
518
            stat = "nodes"
519
            metrics = deque()
520
            for node in data[stat]:
521
                # only record metrics for the current node
522
                if "thisNode" in node and node["thisNode"]:
523
                    # record all key,val in interestingStats
524
                    for metric, val in node["interestingStats"].items():
525
                        metrics.append(Gauge(metric, val))
526
                    for name in ["mcdMemoryAllocated", "mcdMemoryReserved"]:
527
                        metrics.append(Gauge(name, node[name]))
528
        except KeyError as exc:
529
            err = "Couchbase: get_node_stats: server response missing key: {0}"
530
            self.log.error(err.format(exc))
531
        res = Result("couchbase.nodes", metrics)
532
        rset.add(res)
533
        return
534
535
    def get_node_stats_misc(self, data, rset):
536
        """Get node metrics, add to the ResultSet.
537
538
        todo:
539
            - record compaction settings
540
            - reduce cyclo complexity
541
542
        :param data: A dict from couchbase api call containing metrics.
543
        :type data: dict
544
        :param rset: A ResultSet to add metrics to.
545
        :type rset: ResultSet
546
        """
547
        metrics = deque()
548
        for key, val in data["counters"].items():
549
            metrics.append(Counter(key, val))
550
        res = Result("couchbase.counters", metrics)
551
        rset.add(res)
552
553
        metrics = deque()
554
        for name in ["ftsMemoryQuota", "indexMemoryQuota", "maxBucketCount",
555
                     "memoryQuota"]:
556
            metrics.append(Gauge(name, data[name]))
557
        res = Result("couchbase.quotas", metrics)
558
        rset.add(res)
559
560
        stat = "storageTotals"
561
        metrics = deque()
562
        for key in data[stat].keys():
563
            for metric, val in data[stat][key].items():
564
                name = "{0}_{1}".format(key, metric)
565
                metrics.append(Gauge(name, val))
566
        res = Result("couchbase.storage", metrics)
567
        rset.add(res)
568
        return
569
570
    def poll(self):
571
        """Query Couchbase for metrics.
572
573
        :rtype: ResultSet
574
        """
575
        rset = ResultSet()
576
        self.get_index_settings(rset)
577
        self.get_pool_stats_all(rset)
578
        self.get_node_stats_all(rset)
579
        self.last_poll = time.time() * 1000
580
        return rset
581
582