Completed
Push — master ( ca4b9c...1a64c8 )
by Kenny
45s
created

plumd.Result.record_metrics()   A

Complexity

Conditions 3

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 3
dl 0
loc 14
rs 9.4285
1
# -*- coding: utf-8 -*-
2
"""There are many differences in the various time series databases. Plumd is
3
designed to support multiple backends using the classes defined here to
4
represent metrics in a uniform way:
5
6
    ResultSet - a collection of one or more Results from a call to poll()
7
    Result - a measurement of a single thing eg. disk free=0.0,used=100.0
8
    Meta - metadata associated with a Result eg. dev=sda1
9
    Int/Float/String/Boolean/Gauge/Rate/Counter/Timer - primitives for above
10
11
The ResultSet class is used to encapsulate all of the measurements taken
12
during a plugins poll() call and uses a single timestamp for that poll.
13
14
The Result class is used to encapsulate a single measurement - for example
15
measuring cpu utilization may result in idle, user, system etc values. These
16
would be recorded in a single Result object. In addition the Result class
17
allows for arbitrary metadata to be associated with a Result - eg. when
18
recording disk space there may be used, free, total however also metadata such
19
as dev=sda1 to associate the measurements with a specific block device.
20
21
The Meta class is used to encapsulate arbitrary metadata. The Result class
22
uses it to associate metadata with a measurement however it can also be used
23
for example to describe host level metadata (eg. hostname=<hostname>). The
24
Meta class allows writer plugins such as graphite to render the metadata as
25
part of the metric path (eg. servers.host.<hostname>.disk.used.dev.<device>) -
26
if there was no distiction between metadata key=value and metric key=value this
27
would not be possible. Also, the Meta class ensures that metadata keys are
28
unique.
29
30
The _Metrics classes are used to allow plumd to render metrics in a specific
31
format. For example influxdb wants integer values to end with an i when sending
32
via their line protocol. The various _Metric classes force the plugin to
33
explicitly say what each metric is (eg. Int, Float, String, Boolean, Counter,
34
Gauge, Timer, etc). Metrics are defined as namedtuples for reduced memory
35
utilization.
36
37
The Render class is an abstract base class that writer plugins can subclass. The
38
render() function needs to be defined so that it returns a formatted metric. A
39
writer plugin would then instantiate the subclassed Render and use it to record
40
formatted metrics an each push() call. It can then step through the passed
41
metrics in chunks and send them to its backend. Having the metric types defined
42
explicitly allows the writers renderer to format metrics for arbitrary backends.
43
44
45
.. moduleauthor:: Kenny Freeman <[email protected]>
46
47
"""
48
49
__author__ = 'Kenny Freeman'
50
__email__ = '[email protected]'
51
__version__ = '0.3.3'
52
__license__     = "ISCL"
53
__docformat__   = 'reStructuredText'
54
55
import abc
56
import logging
57
import inspect
58
import collections
59
60
import arrow # pip install arrow
61
62
63
DEFAULT_CONFIG = {
64
    'log.level':        "warn",               # crit, error, warn, info, debug
65
    'config.file':      "/etc/plumd.yaml",    # path to main configuration file
66
    'config.plugins':   "/etc/plumd.conf.d/", # path to the plugin confs
67
    'delay.startup':    1,                    # random delay for plugin start
68
    'delay.poll':       1,                    # random delay for first poll()
69
    'poll.interval':    15,                   # interval of reader poll() calls
70
    'max.queue':        512,                  # maximum size of internal queues
71
    'meta':             {},                   # key=value's for all metrics
72
    'auto.hostname':    True                  # set and use hostname in meta
73
}
74
75
76
LOG_LEVELS = {
77
    'crit':  logging.CRITICAL,
78
    'error': logging.ERROR,
79
    'warn':  logging.WARN,
80
    'info':  logging.INFO,
81
    'debug': logging.DEBUG
82
}
83
84
85
class DuplicatePlugin(Exception):
86
    """Two or more plugins with the same id were defined."""
87
    pass
88
89
90
class PluginNotFoundError(Exception):
91
    """Configured plugin file, module or class not found during load."""
92
    pass
93
94
95
class PluginLoadError(Exception):
96
    """Exception(s) were raised during plugin loading."""
97
    pass
98
99
100
class ConfigError(Exception):
101
    """Invalid configuration."""
102
    pass
103
104
105
#_Metric : the base class of all metric primitives is a named tuple.
106
_Metric = collections.namedtuple('metric', ['name', 'value'])
107
108
109
class Int(_Metric):
110
    """An integer metric - ensures the value passed is an int.
111
112
    raises:
113
        ValueError if the passed value doesn't cast properly to int
114
115
    :param name: The name of the metric
116
    :type name: str
117
    :param value: The recorded value of the metric
118
    :type value: int
119
    :raises: ValueError
120
    """
121
122
    def __new__(self, name, value):
123
        value = int(value)
124
        self = super(Int, self).__new__(self, name, value)
125
        return self
126
127
128
class Float(_Metric):
129
    """A float metric.
130
131
    raises:
132
        ValueError if the passed value doesn't cast properly to float
133
134
    :param name: The name of the metric
135
    :type name: str
136
    :param value: The recorded value of the metric
137
    :type value: float
138
    :raises: ValueError
139
    """
140
141
    def __new__(self, name, value):
142
        value = float(value)
143
        self = super(Float, self).__new__(self, name, value)
144
        return self
145
146
147
class String(_Metric):
148
    """A string metric.
149
150
    raises:
151
        ValueError if the passed value doesn't cast properly to str
152
153
    :param name: The name of the metric
154
    :type name: str
155
    :param value: The recorded value of the metric
156
    :type value: str
157
    :raises: ValueError
158
    """
159
160
    def __new__(self, name, value):
161
        value = str(value)
162
        self = super(String, self).__new__(self, name, value)
163
        return self
164
165
166
class Boolean(_Metric):
167
    """A boolean metric.
168
169
    raises:
170
        ValueError if the passed value doesn't cast properly to bool
171
172
    :param name: The name of the metric
173
    :type name: str
174
    :param value: The recorded value of the metric
175
    :type value: bool
176
    :raises: ValueError
177
    """
178
179
    def __new__(self, name, value):
180
        value = bool(value)
181
        self = super(Boolean, self).__new__(self, name, value)
182
        return self
183
184
185
class Gauge(Float):
186
    """A gauge value - this is always a Float value.
187
188
189
    raises:
190
        ValueError if the passed value doesn't cast properly to int
191
192
    :param name: The name of the metric
193
    :type name: str
194
    :param value: The recorded value of the metric
195
    :type value: float
196
    :raises: ValueError
197
    """
198
    pass
199
200
201
class Counter(Int):
202
    """A metric that counts things - this is always an Integer value.
203
204
205
    raises:
206
        ValueError if the passed value doesn't cast properly to int
207
208
    :param name: The name of the metric
209
    :type name: str
210
    :param value: The recorded value of the metric
211
    :type value: int
212
    :raises: ValueError
213
    """
214
    pass
215
216
217
class Rate(Float):
218
    """A metric that describes a rate - this is always a Float.
219
220
    raises:
221
        ValueError if the passed value doesn't cast properly to float
222
223
    :param name: The name of the metric
224
    :type name: str
225
    :param value: The recorded value of the metric
226
    :type value: float
227
    :raises: ValueError
228
    """
229
    pass
230
231
232
class Timer(Float):
233
    """A metric that describes a timer value - this is always a Float.
234
235
    raises:
236
        ValueError if the passed value doesn't cast properly to float
237
238
    :param name: The name of the metric
239
    :type name: str
240
    :param value: The recorded value of the metric
241
    :type value: float
242
    :raises: ValueError
243
    """
244
    pass
245
246
247
class Meta(object):
248
    """Encapsulates generic key=value metadata however ensures that the
249
    values recorded are one of the four supported base types of Int, Float,
250
    String and Boolean. Also ensures keys are unique.
251
252
    :param metas: Variable list of values.
253
    :type metas: Int or Float or String or Boolean
254
    :raises: ValueError if metas is not one of the expected types or if any of
255
        the keys already exist.
256
    """
257
    __slots__ = [ '_meta', 'types' ]
258
    types = [Int, Float, String, Boolean]
259
260
    def __init__(self):
261
        self._meta = {}
262
263
264
    def __str__(self):
265
        """Return a human readable str.
266
        :rtype: str"""
267
        s = "Meta(items={0})"
268
        return s.format(sorted(self._meta.keys()))
269
270
271
    def __repr__(self):
272
        """Return a human readable str.
273
        :rtype: str"""
274
        s = "Meta(items={0})"
275
        return s.format(sorted(self._meta.keys()))
276
277
278
    @property
279
    def nkeys(self):
280
        """Return the number of key=value pairs."""
281
        return len(self._meta.keys())
282
283
284
    @property
285
    def keys(self):
286
        """Return a (by default) sorted list of metadata keys.
287
288
        :rtype: :class:`list`"""
289
        return sorted(self._meta.keys())
290
291
    @property
292
    def items(self):
293
        """Return a (by default) sorted list of (key, value) tuples.
294
295
        :rtype: :class:`list`"""
296
        mlist = [(k, self._meta[k]) for k in sorted(self._meta.keys())]
297
        return mlist
298
299
300
    def add(self, val):
301
        """Record a value to our metadata.
302
303
        :param val: The value to add.
304
        :type val: Int or Float or String or Boolean
305
        :raises: ValueError if val is an unexpected type or the
306
            keys already exist.
307
        """
308
        if val.__class__ not in self.types:
309
            cname = val.__class__.__name__
310
            err = "Class not supported: class: {0} : value: {1}"
311
            raise ValueError(err.format(cname, val))
312
        # using indexes: 0 => name, 1 => value
313
        if val[0] not in self._meta:
314
            self._meta[val[0]] = val
315
            return
316
        raise ValueError("key exists: {0}".format(val[0]))
317
318
319
    def add_list(self, vals):
320
        """Record value(s) to our metadata.
321
322
        :param vals: The value(s) to add - variable list of values.
323
        :type vals: Int or Float or String or Boolean
324
        :raises: ValueError if any values are unexpected types or any of the
325
            keys already exist.
326
        """
327
        for val in vals:
328
            if val.__class__ not in self.types:
329
                cname = val.__class__.__name__
330
                err = "Class not supported: class: {0} : value: {1}"
331
                raise ValueError(err.format(cname, val))
332
            # using indexes: 0 => name, 1 => value
333
            if val[0] not in self._meta:
334
                self._meta[val[0]] = val
335
                continue
336
            raise ValueError("key exists: {0}".format(val[0]))
337
338
339
class Result(object):
340
    """Encapsulates a set of metrics and an optional Meta metadata object. Does
341
    not check for duplicate metrics in the same Result.
342
343
    :param name: The name of the result eg. cpu
344
    :type name: str
345
    :param metrics: Variable list of optional metrics
346
    :type metrics: Int or Float or String or Boolean or Gauge or Counter
347
        or Rate or Timer
348
    """
349
    __slots__ = [ '_name', '_metrics', '_meta', 'types' ]
350
    types = [Int, Float, String, Boolean, Gauge, Counter, Rate, Timer]
351
352
    def __init__(self, name, metrics):
353
        """Create a new Result."""
354
        self._name = name
355
        for metric in metrics:
356
            mclass = metric.__class__
357
            if mclass not in self.types:
358
                err = "Class not supported: class: {0} : metric: {1}"
359
                raise ValueError(err.format(mclass.__name__, metric))
360
        self._metrics = list(metrics)
361
        self._meta = Meta()
362
363
364
    def __str__(self):
365
        """Return a human readable str.
366
        :rtype: str"""
367
        s = "Result(name={0}, metrics={1})"
368
        return s.format(len(self._name), self._metrics)
369
370
371
    def __repr__(self):
372
        """Return a human readable str.
373
        :rtype: str"""
374
        s = "Result(name={0}, metrics={1})"
375
        return s.format(len(self._name), self._metrics)
376
377
378
    @property
379
    def name(self):
380
        """Return the Result name."""
381
        return self._name
382
383
384
    @property
385
    def metrics(self):
386
        """Return the Results list of metrics."""
387
        return self._metrics
388
389
390
    @property
391
    def meta(self):
392
        """Return the results metadata object."""
393
        return self._meta
394
395
396
class ResultSet(object):
397
    """A class to encapsulate a series of measurement Results during a plugins
398
    poll() call. Each poll() must return a ResultSet object.
399
400
    The class has a list of Result objects and a timestamp. The timestamp
401
    is set to the current UTC time when the object is created. This is done
402
    since each poll() call should normally complete within ms or us and the
403
    target audience for the recorded metrics are eg. grafana graphs and
404
    system alerts with resolutions down to 1 second typically. The difference
405
    in time between the first recorded metric and last is generally going to
406
    be much less than 1 second.
407
408
    :Example:
409
410
    >>>import plumd
411
    >>>metrics = [ plumd.Float("idle", 0.0), plumd.Float("user", 100.0) ]
412
    >>>res = Result("cpu", metrics)
413
    >>>rset = ResultSet(res)
414
415
    >>>metrics = [ plumd.Float("free", 0.0), plumd.Float("used", 100.0) ]
416
    >>>res = Result("disk", metrics)
417
    >>>res.meta.add(plumd.String("dev", "sda1"))
418
    >>>rset = ResultSet(res)
419
    """
420
    __slots__ = [ '_results', '_time' ] # do this to save memory
421
422
    def __init__(self, *results):
423
        """Class to encapsulate a collection of metrics eg. from poll()."""
424
        self._results = list(results)
425
        self._time = arrow.utcnow()
426
427
428
    @property
429
    def time(self):
430
        """Return our timestamp.
431
        :rtype: float
432
        """
433
        return self._time
434
435
436
    @property
437
    def nresults(self):
438
        """Return the number of results recorded so far.
439
        :rtype: int
440
        """
441
        return len(self._results)
442
443
444
    @property
445
    def results(self):
446
        """Yields a tuple for each Result recorded in the format:
447
448
        ( time, result_name, result_meta, [metric, metric, metric] )
449
450
        :rtype: generator
451
        """
452
        for robj in self._results:
453
            yield (self._time, robj.name, robj.meta, robj.metrics)
454
455
456
    def __str__(self):
457
        """Return a human readable str.
458
        :rtype: str"""
459
        s = "ResultSet(results={0}, time={1})"
460
        return s.format(len(self._results), self._time.timestamp)
461
462
463
    def __repr__(self):
464
        """Return a human readable str.
465
        :rtype: str"""
466
        s = "ResultSet(results={0}, time={1})"
467
        return s.format(len(self._results), self._time.timestamp)
468
469
470
    def record(self, *results):
471
        """Record a variable list of results in this ResultSet. This design is
472
        based around the Influxdb concept of measurements and fields and
473
        diffrentiates between a metric key=value and a metadata key=value.
474
475
        Each result has a list of metric (key=value)s and a list of metadata
476
        (key=value)s. The ResultSet has a list of Results and an Arrow
477
        time object.
478
479
        :param results: variable list of Result objects
480
        :type results: Result
481
        """
482
        for robj in results:
483
            self._results.append(robj)
484
485
486
class Render(object):
487
    """A helper class to render metrics, buffer them and return chunks of
488
    metrics. Used as a superclass for the various metric writers (eg.
489
    influxdb, graphite, etc).
490
491
    The idea is to feed this object a ResultSet on each call to a writer
492
    plugins push() and then consume chunks of metrics from it until a full
493
    chunk cannot be returned.
494
495
    To use this, a writer plugin subclasses Render and defines the process
496
    method.The plugins implementation of process() should format the metrics it
497
    gets passed in a format suitable for sending to its backend and store them
498
    in the metrics dequeue.
499
500
    The plugin can then create an instance of it's render object and in push()
501
    simply call instance.process(results) and instance.get_chunk() to get the
502
    next chunk of metrics.
503
504
    On shutdown it can also call instance.get_chunk(partial=True) to get the
505
    remaining metrics. Normally it would want to fetch full batches of metrics.
506
507
    :param rconfig: config.Conf object passed from a writer plugin instance
508
    :type rconfig: :class:`config.Conf`
509
    """
510
    __metaclass__ = abc.ABCMeta
511
512
    def __init__(self, rconfig):
513
        # Use a double ended queue with a maximum size
514
        maxlen = rconfig.get('maxqueue', default=8192)
515
        self.metrics = collections.deque(maxlen=maxlen)
516
        # define a metadata object and copy in any configured host metadata
517
        self.meta = Meta()
518
        clookup = {
519
            int: Int,
520
            float: Float,
521
            str: String,
522
            bool: Boolean
523
        }
524
        for key, val in rconfig.get('meta', default={}).items():
525
            if val.__class__ not in clookup:
526
                args = ( key, val.__class__.__name__ )
527
                err = "host metadata: unsupported type: {0}={1}".format(*args)
528
                raise ValueError(err)
529
            self.meta.add(clookup[val.__class__](key, val))
530
531
532
    @abc.abstractmethod
533
    def process(self, rset):
534
        """Record a result set in our backends format. Subclasses for specific
535
        backends must define this method.
536
537
        :param rset: A :class:`ResultSet` object
538
        :type rset: :class:`ResultSet`
539
        """
540
        raise NotImplementedError("process must be defined by a subclass")
541
542
543
    def next_batch(self, bsize, partial=None):
544
        """Return the next full batch of metrics - if partial is True and
545
        self.metrics contains less than nmetrics metrics then return
546
        the remaining metrics.
547
548
        Returns a tuple of (queue size, the number of metrics returned,
549
        list of metrics).
550
551
        :param bsize: the batch size of metrics to return
552
        :type bsize: int
553
        :param partial: return less than chunk_size metrics if not enough
554
        :type partial: bool
555
        :rtype: tuple(int, int, list)
556
        """
557
        qlen = len(self.metrics)
558
        mlist = []
559
        if qlen >= bsize:
560
            mlist = [ self.metrics.popleft() for i in range(0, bsize) ]
561
            qlen = len(self.metrics)
562
        elif partial:
563
            mlist = [ self.metrics.popleft() for i in range(0, qlen) ]
564
            qlen = len(self.metrics)
565
        return ( qlen, len(mlist), mlist )
566