Completed
Push — master ( 8d751b...7e5eed )
by Kenny
58s
created

plumd.ResultSet.add_list()   A

Complexity

Conditions 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 8
rs 9.4285
1
# -*- coding: utf-8 -*-
2
"""There are many differences in the various time series databases. Plumd is
3
designed to support multiple backends using the classes defined here to
4
represent metrics in a uniform way:
5
6
    ResultSet - a collection of one or more Results from a call to poll()
7
    Result - a measurement of a single thing eg. disk free=0.0,used=100.0
8
    Meta - metadata associated with a Result eg. dev=sda1
9
    Int/Float/String/Boolean/Gauge/Rate/Counter/Timer - primitives for above
10
11
The ResultSet class is used to encapsulate all of the measurements taken
12
during a plugins poll() call and uses a single timestamp for that poll.
13
14
The Result class is used to encapsulate a single measurement - for example
15
measuring cpu utilization may result in idle, user, system etc values. These
16
would be recorded in a single Result object. In addition the Result class
17
allows for arbitrary metadata to be associated with a Result - eg. when
18
recording disk space there may be used, free, total however also metadata such
19
as dev=sda1 to associate the measurements with a specific block device.
20
21
The Meta class is used to encapsulate arbitrary metadata. The Result class
22
uses it to associate metadata with a measurement however it can also be used
23
for example to describe host level metadata (eg. hostname=<hostname>). The
24
Meta class allows writer plugins such as graphite to render the metadata as
25
part of the metric path (eg. servers.host.<hostname>.disk.used.dev.<device>) -
26
if there was no distiction between metadata key=value and metric key=value this
27
would not be possible. Also, the Meta class ensures that metadata keys are
28
unique.
29
30
The _Metrics classes are used to allow plumd to render metrics in a specific
31
format. For example influxdb wants integer values to end with an i when sending
32
via their line protocol. The various _Metric classes force the plugin to
33
explicitly say what each metric is (eg. Int, Float, String, Boolean, Counter,
34
Gauge, Timer, etc). Metrics are defined as namedtuples for reduced memory
35
utilization.
36
37
The Render class is an abstract base class that writer plugins can subclass. The
38
render() function needs to be defined so that it returns a formatted metric. A
39
writer plugin would then instantiate the subclassed Render and use it to record
40
formatted metrics an each push() call. It can then step through the passed
41
metrics in chunks and send them to its backend. Having the metric types defined
42
explicitly allows the writers renderer to format metrics for arbitrary backends.
43
44
45
.. moduleauthor:: Kenny Freeman <[email protected]>
46
47
"""
48
49
__author__ = 'Kenny Freeman'
50
__email__ = '[email protected]'
51
__version__ = '0.3.3'
52
__license__     = "ISCL"
53
__docformat__   = 'reStructuredText'
54
55
import abc
56
import logging
57
import collections
58
59
import arrow # pip install arrow
60
61
62
DEFAULT_CONFIG = {
63
    'log.level':        "warn",               # crit, error, warn, info, debug
64
    'config.file':      "/etc/plumd.yaml",    # path to main configuration file
65
    'config.plugins':   "/etc/plumd.conf.d/", # path to the plugin confs
66
    'delay.startup':    1,                    # random delay for plugin start
67
    'delay.poll':       1,                    # random delay for first poll()
68
    'poll.interval':    15,                   # interval of reader poll() calls
69
    'max.queue':        512,                  # maximum size of internal queues
70
    'meta':             {},                   # key=value's for all metrics
71
    'meta.hostname':    True                  # set and use hostname in meta
72
}
73
74
75
LOG_LEVELS = {                                # map config level to python level
76
    'crit':  logging.CRITICAL,
77
    'error': logging.ERROR,
78
    'warn':  logging.WARN,
79
    'info':  logging.INFO,
80
    'debug': logging.DEBUG
81
}
82
83
84
class DuplicatePlugin(Exception):
85
    """Two or more plugins with the same id were defined."""
86
    pass
87
88
89
class PluginNotFoundError(Exception):
90
    """A plugin file, module or class was not found."""
91
    pass
92
93
94
class PluginLoadError(Exception):
95
    """Exception(s) were raised while loading a plugin."""
96
    pass
97
98
99
class ConfigError(Exception):
100
    """Invalid configuration encountered."""
101
    pass
102
103
104
#_Metric : the base class of all metric primitives is a named tuple.
105
_Metric = collections.namedtuple('metric', ['name', 'value'])
106
107
108
class Int(_Metric):
109
    """An integer metric - ensures the value passed is an int.
110
111
    raises:
112
        ValueError if the passed value doesn't cast properly to int
113
114
    :param name: The name of the metric
115
    :type name: str
116
    :param value: The recorded value of the metric
117
    :type value: int
118
    :raises: ValueError
119
    """
120
121
    def __new__(self, name, value):
122
        value = int(value)
123
        self = super(Int, self).__new__(self, name, value)
124
        return self
125
126
127
class Float(_Metric):
128
    """A float metric.
129
130
    raises:
131
        ValueError if the passed value doesn't cast properly to float
132
133
    :param name: The name of the metric
134
    :type name: str
135
    :param value: The recorded value of the metric
136
    :type value: float
137
    :raises: ValueError
138
    """
139
140
    def __new__(self, name, value):
141
        value = float(value)
142
        self = super(Float, self).__new__(self, name, value)
143
        return self
144
145
146
class String(_Metric):
147
    """A string metric.
148
149
    raises:
150
        ValueError if the passed value doesn't cast properly to str
151
152
    :param name: The name of the metric
153
    :type name: str
154
    :param value: The recorded value of the metric
155
    :type value: str
156
    :raises: ValueError
157
    """
158
159
    def __new__(self, name, value):
160
        value = str(value)
161
        self = super(String, self).__new__(self, name, value)
162
        return self
163
164
165
class Boolean(_Metric):
166
    """A boolean metric.
167
168
    raises:
169
        ValueError if the passed value doesn't cast properly to bool
170
171
    :param name: The name of the metric
172
    :type name: str
173
    :param value: The recorded value of the metric
174
    :type value: bool
175
    :raises: ValueError
176
    """
177
178
    def __new__(self, name, value):
179
        value = bool(value)
180
        self = super(Boolean, self).__new__(self, name, value)
181
        return self
182
183
184
class Gauge(Float):
185
    """A gauge value - this is always a Float value.
186
187
188
    raises:
189
        ValueError if the passed value doesn't cast properly to int
190
191
    :param name: The name of the metric
192
    :type name: str
193
    :param value: The recorded value of the metric
194
    :type value: float
195
    :raises: ValueError
196
    """
197
    pass
198
199
200
class Counter(Int):
201
    """A metric that counts things - this is always an Integer value.
202
203
204
    raises:
205
        ValueError if the passed value doesn't cast properly to int
206
207
    :param name: The name of the metric
208
    :type name: str
209
    :param value: The recorded value of the metric
210
    :type value: int
211
    :raises: ValueError
212
    """
213
    pass
214
215
216
class Rate(Float):
217
    """A metric that describes a rate - this is always a Float.
218
219
    raises:
220
        ValueError if the passed value doesn't cast properly to float
221
222
    :param name: The name of the metric
223
    :type name: str
224
    :param value: The recorded value of the metric
225
    :type value: float
226
    :raises: ValueError
227
    """
228
    pass
229
230
231
class Timer(Float):
232
    """A metric that describes a timer value - this is always a Float.
233
234
    raises:
235
        ValueError if the passed value doesn't cast properly to float
236
237
    :param name: The name of the metric
238
    :type name: str
239
    :param value: The recorded value of the metric
240
    :type value: float
241
    :raises: ValueError
242
    """
243
    pass
244
245
246
class Meta(object):
247
    """Encapsulates generic key=value metadata however ensures that the
248
    values recorded are one of the four supported base types of Int, Float,
249
    String and Boolean. Also ensures keys are unique.
250
251
    :param metas: Variable list of values.
252
    :type metas: Int or Float or String or Boolean
253
    :raises: ValueError if metas is not one of the expected types or if any of
254
        the keys already exist.
255
    """
256
    __slots__ = [ '_meta', 'types' ]
257
    types = [Int, Float, String, Boolean]
258
259
    def __init__(self):
260
        self._meta = {}
261
262
263
    def __str__(self):
264
        """Return a human readable str.
265
        :rtype: str"""
266
        s = "Meta(items={0})"
267
        return s.format(sorted(self._meta.keys()))
268
269
270
    def __repr__(self):
271
        """Return a human readable str.
272
        :rtype: str"""
273
        s = "Meta(items={0})"
274
        return s.format(sorted(self._meta.keys()))
275
276
277
    @property
278
    def nkeys(self):
279
        """Return the number of key=value pairs."""
280
        return len(self._meta.keys())
281
282
283
    @property
284
    def keys(self):
285
        """Return a (by default) sorted list of metadata keys.
286
287
        :rtype: :class:`list`"""
288
        return sorted(self._meta.keys())
289
290
    @property
291
    def items(self):
292
        """Return a (by default) sorted list of (key, value) tuples.
293
294
        :rtype: :class:`list`"""
295
        mlist = [(k, self._meta[k]) for k in sorted(self._meta.keys())]
296
        return mlist
297
298
299
    def add(self, val):
300
        """Record a value to our metadata.
301
302
        :param val: The value to add.
303
        :type val: Int or Float or String or Boolean
304
        :raises: ValueError if val is an unexpected type or the
305
            keys already exist.
306
        """
307
        if val.__class__ not in self.types:
308
            cname = val.__class__.__name__
309
            err = "Class not supported: class: {0} : value: {1}"
310
            raise ValueError(err.format(cname, val))
311
        # using indexes: 0 => name, 1 => value
312
        if val[0] not in self._meta:
313
            self._meta[val[0]] = val
314
            return
315
        raise ValueError("key exists: {0}".format(val[0]))
316
317
318
class Result(object):
319
    """Encapsulates a set of metrics and an optional Meta metadata object. Does
320
    not check for duplicate metrics in the same Result.
321
322
    :param name: The name of the result eg. cpu
323
    :type name: str
324
    :param metrics: optional list of metrics
325
    :type metrics: Int or Float or String or Boolean or Gauge or Counter
326
        or Rate or Timer
327
    """
328
    __slots__ = [ '_name', '_metrics', '_meta', 'types' ]
329
    types = [Int, Float, String, Boolean, Gauge, Counter, Rate, Timer]
330
331
    def __init__(self, name, metrics=None):
332
        """Create a new Result."""
333
        self._name = name
334
        self._metrics = list()
335
        if metrics:
336
            for metric in metrics:
337
                mclass = metric.__class__
338
                if mclass not in self.types:
339
                    err = "Class not supported: class: {0} : metric: {1}"
340
                    raise ValueError(err.format(mclass.__name__, metric))
341
                self._metrics.append(metric)
342
        self._meta = Meta()
343
344
345
    def __str__(self):
346
        """Return a human readable str.
347
        :rtype: str"""
348
        s = "Result(name={0}, metrics={1})"
349
        return s.format(len(self._name), self._metrics)
350
351
352
    def __repr__(self):
353
        """Return a human readable str.
354
        :rtype: str"""
355
        s = "Result(name={0}, metrics={1})"
356
        return s.format(len(self._name), self._metrics)
357
358
359
    @property
360
    def name(self):
361
        """Return the Result name."""
362
        return self._name
363
364
365
    @property
366
    def metrics(self):
367
        """Return the Results list of metrics."""
368
        return self._metrics
369
370
371
    @property
372
    def meta(self):
373
        """Return the results metadata object."""
374
        return self._meta
375
376
377
    def add(self, metric):
378
        """Add a metric to our list of metrics.
379
380
        raises:
381
            ValueError if the metric is not a supported type
382
383
        :param metric: One of the metric types (Int, Float, String, etc)
384
        :type metric: Int or Float or String or Boolean or Counter or Gauge
385
        :raises: ValueError
386
        """
387
        mclass = metric.__class__
388
        if mclass not in self.types:
389
            err = "Class not supported: class: {0} : metric: {1}"
390
            raise ValueError(err.format(mclass.__name__, metric))
391
        self._metrics.append(metric)
392
393
394
    def add_list(self, metrics):
395
        """Add a list of metrics to our list of metrics.
396
397
        raises:
398
            ValueError if any metrics are not a supported type
399
400
        :param metrics: list of metrics (Int, Float, String, etc)
401
        :type metrics: list
402
        :raises: ValueError
403
        """
404
        for metric in metrics:
405
            mclass = metric.__class__
406
            if mclass not in self.types:
407
                err = "Class not supported: class: {0} : metric: {1}"
408
                raise ValueError(err.format(mclass.__name__, metric))
409
            self._metrics.append(metric)
410
411
412
class ResultSet(object):
413
    """A class to encapsulate a series of measurement Results during a plugins
414
    poll() call. Each poll() must return a ResultSet object.
415
416
    The class has a list of Result objects and a timestamp. The timestamp
417
    is set to the current UTC time when the object is created. This is done
418
    since each poll() call should normally complete within ms or us and the
419
    target audience for the recorded metrics are eg. grafana graphs and
420
    system alerts with resolutions down to 1 second typically. The difference
421
    in time between the first recorded metric and last is generally going to
422
    be much less than 1 second.
423
424
    :Example:
425
426
    >>>import plumd
427
    >>>metrics = [ plumd.Float("idle", 0.0), plumd.Float("user", 100.0) ]
428
    >>>res = Result("cpu", metrics)
429
    >>>rset = ResultSet(results=[res])
430
431
    >>>metrics = [ plumd.Float("free", 80.0), plumd.Float("used", 20.0) ]
432
    >>>res = Result("disk", metrics)
433
    >>>res.meta.add(plumd.String("dev", "sda1"))
434
    >>>rset = ResultSet(results=[res])
435
    """
436
    __slots__ = [ '_results', '_time' ] # do this to save memory
437
438
    def __init__(self, results=None):
439
        """Class to encapsulate a collection of metrics eg. from poll()."""
440
        self._results = list() if results is None else list(results)
441
        self._time = arrow.utcnow()
442
443
444
    @property
445
    def time(self):
446
        """Return our timestamp.
447
        :rtype: float
448
        """
449
        return self._time
450
451
452
    @property
453
    def nresults(self):
454
        """Return the number of results recorded so far.
455
        :rtype: int
456
        """
457
        return len(self._results)
458
459
460
    @property
461
    def results(self):
462
        """Yields a tuple for each Result recorded in the format:
463
464
        ( time, result_name, result_meta, [metric, metric, metric] )
465
466
        :rtype: generator
467
        """
468
        for robj in self._results:
469
            yield (self._time, robj.name, robj.meta, robj.metrics)
470
471
472
    def __str__(self):
473
        """Return a human readable str.
474
        :rtype: str"""
475
        s = "ResultSet(results={0}, time={1})"
476
        return s.format(len(self._results), self._time.timestamp)
477
478
479
    def __repr__(self):
480
        """Return a human readable str.
481
        :rtype: str"""
482
        s = "ResultSet(results={0}, time={1})"
483
        return s.format(len(self._results), self._time.timestamp)
484
485
486
    def add(self, result):
487
        """Record a result in this ResultSet.
488
489
        :param result: a Result object
490
        :type results: Result
491
        """
492
        self._results.append(result)
493
494
495
    def add_list(self, results):
496
        """Record a list of results in this ResultSet.
497
498
        :param results: a list of Result objects
499
        :type results: list
500
        """
501
        for result in results:
502
            self._results.append(result)
503
504
505
class Render(object):
506
    """A helper class to render metrics, buffer them and return chunks of
507
    metrics. Used as a superclass for the various metric writers (eg.
508
    influxdb, graphite, etc).
509
510
    The idea is to feed this object a ResultSet on each call to a writer
511
    plugins push() and then consume chunks of metrics from it until a full
512
    chunk cannot be returned.
513
514
    To use this, a writer plugin subclasses Render and defines the process
515
    method.The plugins implementation of process() should format the metrics it
516
    gets passed in a format suitable for sending to its backend and store them
517
    in the metrics dequeue.
518
519
    The plugin can then create an instance of it's render object and in push()
520
    simply call instance.process(results) and instance.get_chunk() to get the
521
    next chunk of metrics.
522
523
    On shutdown it can also call instance.get_chunk(partial=True) to get the
524
    remaining metrics. Normally it would want to fetch full batches of metrics.
525
526
    :param rconfig: config.Conf object passed from a writer plugin instance
527
    :type rconfig: :class:`config.Conf`
528
    """
529
    __metaclass__ = abc.ABCMeta
530
531
    def __init__(self, rconfig):
532
        # Use a double ended queue with a maximum size
533
        maxlen = rconfig.get('maxqueue', default=8192)
534
        self.metrics = collections.deque(maxlen=maxlen)
535
        # define a metadata object and copy in any configured host metadata
536
        self.meta = Meta()
537
        clookup = {
538
            int: Int,
539
            float: Float,
540
            str: String,
541
            bool: Boolean
542
        }
543
        for key, val in rconfig.get('meta', default={}).items():
544
            if val.__class__ not in clookup:
545
                args = ( key, val.__class__.__name__ )
546
                err = "host metadata: unsupported type: {0}={1}".format(*args)
547
                raise ValueError(err)
548
            self.meta.add(clookup[val.__class__](key, val))
549
550
551
    @abc.abstractmethod
552
    def process(self, rset):
553
        """Record a result set in our backends format. Subclasses for specific
554
        backends must define this method.
555
556
        :param rset: A :class:`ResultSet` object
557
        :type rset: :class:`ResultSet`
558
        """
559
        raise NotImplementedError("process must be defined by a subclass")
560
561
562
    def next_batch(self, bsize, partial=None):
563
        """Return the next full batch of metrics - if partial is True and
564
        self.metrics contains less than nmetrics metrics then return
565
        the remaining metrics.
566
567
        Returns a tuple of (queue size, the number of metrics returned,
568
        list of metrics).
569
570
        todo: also support number of bytes to return.
571
572
        :param bsize: the batch size of metrics to return
573
        :type bsize: int
574
        :param partial: return less than chunk_size metrics if not enough
575
        :type partial: bool
576
        :rtype: tuple(int, int, list)
577
        """
578
        qlen = len(self.metrics)
579
        mlist = []
580
        if qlen >= bsize:
581
            mlist = [ self.metrics.popleft() for i in range(0, bsize) ]
582
            qlen = len(self.metrics)
583
        elif partial:
584
            mlist = [ self.metrics.popleft() for i in range(0, qlen) ]
585
            qlen = len(self.metrics)
586
        return ( qlen, len(mlist), mlist )
587