Completed
Push — master ( d058c9...ca4b9c )
by Kenny
45s
created

plumd.ResultSet.record()   A

Complexity

Conditions 2

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 14
rs 9.4285
1
# -*- coding: utf-8 -*-
2
"""There are many differences in the various time series databases. Plumd is
3
designed to support multiple backends using the classes defined here to
4
represent metrics in a uniform way:
5
6
    ResultSet - a collection of one or more Results from a call to poll()
7
    Result - a measurement of a single thing eg. disk free=0.0,used=100.0
8
    Meta - metadata associated with a Result eg. dev=sda1
9
    Int/Float/String/Boolean/Gauge/Rate/Counter/Timer - primitives for above
10
11
The ResultSet class is used to encapsulate all of the measurements taken
12
during a plugins poll() call and uses a single timestamp for that poll.
13
14
The Result class is used to encapsulate a single measurement - for example
15
measuring cpu utilization may result in idle, user, system etc values. These
16
would be recorded in a single Result object. In addition the Result class
17
allows for arbitrary metadata to be associated with a Result - eg. when
18
recording disk space there may be used, free, total however also metadata such
19
as dev=sda1 to associate the measurements with a specific block device.
20
21
The Meta class is used to encapsulate arbitrary metadata. The Result class
22
uses it to associate metadata with a measurement however it can also be used
23
for example to describe host level metadata (eg. hostname=<hostname>). The
24
Meta class allows writer plugins such as graphite to render the metadata as
25
part of the metric path (eg. servers.host.<hostname>.disk.used.dev.<device>) -
26
if there was no distiction between metadata key=value and metric key=value this
27
would not be possible. Also, the Meta class ensures that metadata keys are
28
unique.
29
30
The _Metrics classes are used to allow plumd to render metrics in a specific
31
format. For example influxdb wants integer values to end with an i when sending
32
via their line protocol. The various _Metric classes force the plugin to
33
explicitly say what each metric is (eg. Int, Float, String, Boolean, Counter,
34
Gauge, Timer, etc). Metrics are defined as namedtuples for reduced memory
35
utilization.
36
37
The Render class is an abstract base class that writer plugins can subclass. The
38
render() function needs to be defined so that it returns a formatted metric. A
39
writer plugin would then instantiate the subclassed Render and use it to record
40
formatted metrics an each push() call. It can then step through the passed
41
metrics in chunks and send them to its backend. Having the metric types defined
42
explicitly allows the writers renderer to format metrics for arbitrary backends.
43
44
45
.. moduleauthor:: Kenny Freeman <[email protected]>
46
47
"""
48
49
__author__ = 'Kenny Freeman'
50
__email__ = '[email protected]'
51
__version__ = '0.3.3'
52
__license__     = "ISCL"
53
__docformat__   = 'reStructuredText'
54
55
import abc
56
import logging
57
import inspect
58
import collections
59
60
import arrow # pip install arrow
61
62
63
DEFAULT_CONFIG = {
64
    'log.level':        "warn",               # crit, error, warn, info, debug
65
    'config.file':      "/etc/plumd.yaml",    # path to main configuration file
66
    'config.plugins':   "/etc/plumd.conf.d/", # path to the plugin confs
67
    'delay.startup':    1,                    # random delay for plugin start
68
    'delay.poll':       1,                    # random delay for first poll()
69
    'poll.interval':    15,                   # interval of reader poll() calls
70
    'max.queue':        512,                  # maximum size of internal queues
71
    'meta':             {},                   # key=value's for all metrics
72
    'auto.hostname':    True                  # set and use hostname in meta
73
}
74
75
76
LOG_LEVELS = {
77
    'crit':  logging.CRITICAL,
78
    'error': logging.ERROR,
79
    'warn':  logging.WARN,
80
    'info':  logging.INFO,
81
    'debug': logging.DEBUG
82
}
83
84
85
class DuplicatePlugin(Exception):
86
    """Two or more plugins with the same id were defined."""
87
    pass
88
89
90
class PluginNotFoundError(Exception):
91
    """Configured plugin file, module or class not found during load."""
92
    pass
93
94
95
class PluginLoadError(Exception):
96
    """Exception(s) were raised during plugin loading."""
97
    pass
98
99
100
class ConfigError(Exception):
101
    """Invalid configuration."""
102
    pass
103
104
105
#_Metric : the base class of all metric primitives is a named tuple.
106
_Metric = collections.namedtuple('metric', ['name', 'value'])
107
108
109
class Int(_Metric):
110
    """An integer metric - ensures the value passed is an int.
111
112
    raises:
113
        ValueError if the passed value doesn't cast properly to int
114
115
    :param name: The name of the metric
116
    :type name: str
117
    :param value: The recorded value of the metric
118
    :type value: int
119
    :raises: ValueError
120
    """
121
122
    def __new__(self, name, value):
123
        value = int(value)
124
        self = super(Int, self).__new__(self, name, value)
125
        return self
126
127
128
class Float(_Metric):
129
    """A float metric.
130
131
    raises:
132
        ValueError if the passed value doesn't cast properly to float
133
134
    :param name: The name of the metric
135
    :type name: str
136
    :param value: The recorded value of the metric
137
    :type value: float
138
    :raises: ValueError
139
    """
140
141
    def __new__(self, name, value):
142
        value = float(value)
143
        self = super(Float, self).__new__(self, name, value)
144
        return self
145
146
147
class String(_Metric):
148
    """A string metric.
149
150
    raises:
151
        ValueError if the passed value doesn't cast properly to str
152
153
    :param name: The name of the metric
154
    :type name: str
155
    :param value: The recorded value of the metric
156
    :type value: str
157
    :raises: ValueError
158
    """
159
160
    def __new__(self, name, value):
161
        value = str(value)
162
        self = super(String, self).__new__(self, name, value)
163
        return self
164
165
166
class Boolean(_Metric):
167
    """A boolean metric.
168
169
    raises:
170
        ValueError if the passed value doesn't cast properly to bool
171
172
    :param name: The name of the metric
173
    :type name: str
174
    :param value: The recorded value of the metric
175
    :type value: bool
176
    :raises: ValueError
177
    """
178
179
    def __new__(self, name, value):
180
        value = bool(value)
181
        self = super(Boolean, self).__new__(self, name, value)
182
        return self
183
184
185
class Gauge(Float):
186
    """A gauge value - this is always a Float value.
187
188
189
    raises:
190
        ValueError if the passed value doesn't cast properly to int
191
192
    :param name: The name of the metric
193
    :type name: str
194
    :param value: The recorded value of the metric
195
    :type value: float
196
    :raises: ValueError
197
    """
198
    pass
199
200
201
class Counter(Int):
202
    """A metric that counts things - this is always an Integer value.
203
204
205
    raises:
206
        ValueError if the passed value doesn't cast properly to int
207
208
    :param name: The name of the metric
209
    :type name: str
210
    :param value: The recorded value of the metric
211
    :type value: int
212
    :raises: ValueError
213
    """
214
    pass
215
216
217
class Rate(Float):
218
    """A metric that describes a rate - this is always a Float.
219
220
    raises:
221
        ValueError if the passed value doesn't cast properly to float
222
223
    :param name: The name of the metric
224
    :type name: str
225
    :param value: The recorded value of the metric
226
    :type value: float
227
    :raises: ValueError
228
    """
229
    pass
230
231
232
class Timer(Float):
233
    """A metric that describes a timer value - this is always a Float.
234
235
    raises:
236
        ValueError if the passed value doesn't cast properly to float
237
238
    :param name: The name of the metric
239
    :type name: str
240
    :param value: The recorded value of the metric
241
    :type value: float
242
    :raises: ValueError
243
    """
244
    pass
245
246
247
class Meta(object):
248
    """Encapsulates generic key=value metadata however ensures that the
249
    values recorded are one of the four supported base types of Int, Float,
250
    String and Boolean. Also ensures keys are unique.
251
252
    :param metas: Variable list of values.
253
    :type metas: Int or Float or String or Boolean
254
    :raises: ValueError if metas is not one of the expected types or if any of
255
        the keys already exist.
256
    """
257
    __slots__ = [ '_meta', 'types' ]
258
    types = [Int, Float, String, Boolean]
259
260
    def __init__(self):
261
        self._meta = {}
262
263
264
    def __str__(self):
265
        """Return a human readable str.
266
        :rtype: str"""
267
        s = "Meta(items={0})"
268
        return s.format(sorted(self._meta.keys()))
269
270
271
    def __repr__(self):
272
        """Return a human readable str.
273
        :rtype: str"""
274
        s = "Meta(items={0})"
275
        return s.format(sorted(self._meta.keys()))
276
277
278
    @property
279
    def nkeys(self):
280
        """Return the number of key=value pairs."""
281
        return len(self._meta.keys())
282
283
284
    @property
285
    def keys(self):
286
        """Return a (by default) sorted list of metadata keys.
287
288
        :rtype: :class:`list`"""
289
        return sorted(self._meta.keys())
290
291
    @property
292
    def items(self):
293
        """Return a (by default) sorted list of (key, value) tuples.
294
295
        :rtype: :class:`list`"""
296
        mlist = [(k, self._meta[k]) for k in sorted(self._meta.keys())]
297
        return mlist
298
299
300
    def add(self, val):
301
        """Record a value to our metadata.
302
303
        :param val: The value to add.
304
        :type val: Int or Float or String or Boolean
305
        :raises: ValueError if val is an unexpected type or the
306
            keys already exist.
307
        """
308
        if val.__class__ not in self.types:
309
            cname = val.__class__.__name__
310
            err = "Class not supported: class: {0} : value: {1}"
311
            raise ValueError(err.format(cname, val))
312
        # using indexes: 0 => name, 1 => value
313
        if val[0] not in self._meta:
314
            self._meta[val[0]] = val
315
            return
316
        raise ValueError("key exists: {0}".format(val[0]))
317
318
319
    def add_list(self, vals):
320
        """Record value(s) to our metadata.
321
322
        :param vals: The value(s) to add - variable list of values.
323
        :type vals: Int or Float or String or Boolean
324
        :raises: ValueError if any values are unexpected types or any of the
325
            keys already exist.
326
        """
327
        for val in vals:
328
            if val.__class__ not in self.types:
329
                cname = val.__class__.__name__
330
                err = "Class not supported: class: {0} : value: {1}"
331
                raise ValueError(err.format(cname, val))
332
            # using indexes: 0 => name, 1 => value
333
            if val[0] not in self._meta:
334
                self._meta[val[0]] = val
335
                continue
336
            raise ValueError("key exists: {0}".format(val[0]))
337
338
339
class Result(object):
340
    """Encapsulates a set of metrics and an optional Meta metadata object. Does
341
    not check for duplicate metrics in the same Result.
342
343
    :param name: The name of the result eg. cpu
344
    :type name: str
345
    :param metrics: Variable list of optional metrics
346
    :type metrics: Int or Float or String or Boolean or Gauge or Counter
347
        or Rate or Timer
348
    """
349
    __slots__ = [ '_name', '_metrics', '_meta', 'types' ]
350
    types = [Int, Float, String, Boolean, Gauge, Counter, Rate, Timer]
351
352
    def __init__(self, name, metrics):
353
        """Create a new Result."""
354
        self._name = name
355
        for metric in metrics:
356
            mclass = metric.__class__
357
            if mclass not in self.types:
358
                err = "Class not supported: class: {0} : metric: {1}"
359
                raise ValueError(err.format(mclass.__name__, metric))
360
        self._metrics = list(metrics)
361
        self._meta = Meta()
362
363
364
    def __str__(self):
365
        """Return a human readable str.
366
        :rtype: str"""
367
        s = "Result(name={0}, metrics={1})"
368
        return s.format(len(self._name), self._metrics)
369
370
371
    def __repr__(self):
372
        """Return a human readable str.
373
        :rtype: str"""
374
        s = "Result(name={0}, metrics={1})"
375
        return s.format(len(self._name), self._metrics)
376
377
378
    @property
379
    def name(self):
380
        """Return the Result name."""
381
        return self._name
382
383
384
    @property
385
    def metrics(self):
386
        """Return the Results list of metrics."""
387
        return self._metrics
388
389
390
    @property
391
    def meta(self):
392
        """Return the results metadata object."""
393
        return self._meta
394
395
396
    def record_metrics(self, *vals):
397
        """Record a variable list of metrics to the Results list of
398
        metrics.
399
400
        :param vals: the metric(s) to add (one of the eight base types)
401
        :type vals: Int or Float or String or Boolean or Gauge or Counter or
402
            Rate or Timer
403
        """
404
        for metric in vals:
405
            if metric.__class__ not in self.types:
406
                cname = metric.__class__.__name__
407
                err = "Class not supported: {0}".format(cname)
408
                raise ValueError(err)
409
            self._metrics.append(val)
410
411
412
class ResultSet(object):
413
    """A class to encapsulate a series of measurement Results during a plugins
414
    poll() call. Each poll() must return a ResultSet object.
415
416
    The class has a list of Result objects and a timestamp. The timestamp
417
    is set to the current UTC time when the object is created. This is done
418
    since each poll() call should normally complete within ms or us and the
419
    target audience for the recorded metrics are eg. grafana graphs and
420
    system alerts with resolutions down to 1 second typically. The difference
421
    in time between the first recorded metric and last is generally going to
422
    be much less than 1 second.
423
424
    :Example:
425
426
    >>>import plumd
427
    >>>metrics = [ plumd.Float("idle", 0.0), plumd.Float("user", 100.0) ]
428
    >>>res = Result("cpu", metrics)
429
    >>>rset = ResultSet(res)
430
431
    >>>metrics = [ plumd.Float("free", 0.0), plumd.Float("used", 100.0) ]
432
    >>>res = Result("disk", metrics)
433
    >>>res.meta.add(plumd.String("dev", "sda1"))
434
    >>>rset = ResultSet(res)
435
    """
436
    __slots__ = [ '_results', '_time' ] # do this to save memory
437
438
    def __init__(self, *results):
439
        """Class to encapsulate a collection of metrics eg. from poll()."""
440
        self._results = list(results)
441
        self._time = arrow.utcnow()
442
443
444
    @property
445
    def time(self):
446
        """Return our timestamp.
447
        :rtype: float
448
        """
449
        return self._time
450
451
452
    @property
453
    def nresults(self):
454
        """Return the number of results recorded so far.
455
        :rtype: int
456
        """
457
        return len(self._results)
458
459
460
    @property
461
    def results(self):
462
        """Yields a tuple for each Result recorded in the format:
463
464
        ( time, result_name, result_meta, [metric, metric, metric] )
465
466
        :rtype: generator
467
        """
468
        for robj in self._results:
469
            yield (self._time, robj.name, robj.meta, robj.metrics)
470
471
472
    def __str__(self):
473
        """Return a human readable str.
474
        :rtype: str"""
475
        s = "ResultSet(results={0}, time={1})"
476
        return s.format(len(self._results), self._time.timestamp)
477
478
479
    def __repr__(self):
480
        """Return a human readable str.
481
        :rtype: str"""
482
        s = "ResultSet(results={0}, time={1})"
483
        return s.format(len(self._results), self._time.timestamp)
484
485
486
    def record(self, *results):
487
        """Record a variable list of results in this ResultSet. This design is
488
        based around the Influxdb concept of measurements and fields and
489
        diffrentiates between a metric key=value and a metadata key=value.
490
491
        Each result has a list of metric (key=value)s and a list of metadata
492
        (key=value)s. The ResultSet has a list of Results and an Arrow
493
        time object.
494
495
        :param results: variable list of Result objects
496
        :type results: Result
497
        """
498
        for robj in results:
499
            self._results.append(robj)
500
501
502
class Render(object):
503
    """A helper class to render metrics, buffer them and return chunks of
504
    metrics. Used as a superclass for the various metric writers (eg.
505
    influxdb, graphite, etc).
506
507
    The idea is to feed this object a ResultSet on each call to a writer
508
    plugins push() and then consume chunks of metrics from it until a full
509
    chunk cannot be returned.
510
511
    To use this, a writer plugin subclasses Render and defines the process
512
    method.The plugins implementation of process() should format the metrics it
513
    gets passed in a format suitable for sending to its backend and store them
514
    in the metrics dequeue.
515
516
    The plugin can then create an instance of it's render object and in push()
517
    simply call instance.process(results) and instance.get_chunk() to get the
518
    next chunk of metrics.
519
520
    On shutdown it can also call instance.get_chunk(partial=True) to get the
521
    remaining metrics. Normally it would want to fetch full batches of metrics.
522
523
    :param rconfig: config.Conf object passed from a writer plugin instance
524
    :type rconfig: :class:`config.Conf`
525
    """
526
    __metaclass__ = abc.ABCMeta
527
528
    def __init__(self, rconfig):
529
        # Use a double ended queue with a maximum size
530
        maxlen = rconfig.get('maxqueue', default=8192)
531
        self.metrics = collections.deque(maxlen=maxlen)
532
        # define a metadata object and copy in any configured host metadata
533
        self.meta = Meta()
534
        clookup = {
535
            int: Int,
536
            float: Float,
537
            str: String,
538
            bool: Boolean
539
        }
540
        for key, val in rconfig.get('meta', default={}).items():
541
            if val.__class__ not in clookup:
542
                args = ( key, val.__class__.__name__ )
543
                err = "host metadata: unsupported type: {0}={1}".format(*args)
544
                raise ValueError(err)
545
            self.meta.add(clookup[val.__class__](key, val))
546
547
548
    @abc.abstractmethod
549
    def process(self, rset):
550
        """Record a result set in our backends format. Subclasses for specific
551
        backends must define this method.
552
553
        :param rset: A :class:`ResultSet` object
554
        :type rset: :class:`ResultSet`
555
        """
556
        raise NotImplementedError("process must be defined by a subclass")
557
558
559
    def next_batch(self, bsize, partial=None):
560
        """Return the next full batch of metrics - if partial is True and
561
        self.metrics contains less than nmetrics metrics then return
562
        the remaining metrics.
563
564
        Returns a tuple of (queue size, the number of metrics returned,
565
        list of metrics).
566
567
        :param bsize: the batch size of metrics to return
568
        :type bsize: int
569
        :param partial: return less than chunk_size metrics if not enough
570
        :type partial: bool
571
        :rtype: tuple(int, int, list)
572
        """
573
        qlen = len(self.metrics)
574
        mlist = []
575
        if qlen >= bsize:
576
            mlist = [ self.metrics.popleft() for i in range(0, bsize) ]
577
            qlen = len(self.metrics)
578
        elif partial:
579
            mlist = [ self.metrics.popleft() for i in range(0, qlen) ]
580
            qlen = len(self.metrics)
581
        return ( qlen, len(mlist), mlist )
582