Completed
Push — master ( f161ca...d2dc5c )
by Kenny
01:12
created

Render   A

Complexity

Total Complexity 7

Size/Duplication

Total Lines 86
Duplicated Lines 0 %

Importance

Changes 3
Bugs 2 Features 2
Metric Value
c 3
b 2
f 2
dl 0
loc 86
rs 10
wmc 7

4 Methods

Rating   Name   Duplication   Size   Complexity  
A process() 0 9 1
A __next__() 0 9 2
A __init__() 0 22 3
A __iter__() 0 7 1
1
# -*- coding: utf-8 -*-
2
"""There are many differences in the various time series databases. Plumd is
3
designed to support multiple backends using the classes defined here to
4
represent metrics in a uniform way:
5
6
    ResultSet - a collection of one or more Results from a call to poll()
7
    Result - a measurement of a single thing eg. disk free=0.0,used=100.0
8
    Meta - metadata associated with a Result eg. dev=sda1
9
    Int/Float/String/Boolean/Gauge/Rate/Counter/Timer - primitives for above
10
11
The ResultSet class is used to encapsulate all of the measurements taken
12
during a plugins poll() call and uses a single timestamp for that poll.
13
14
The Result class is used to encapsulate a single measurement - for example
15
measuring cpu utilization may result in idle, user, system etc values. These
16
would be recorded in a single Result object. In addition the Result class
17
allows for arbitrary metadata to be associated with a Result - eg. when
18
recording disk space there may be used, free, total however also metadata such
19
as dev=sda1 to associate the measurements with a specific block device.
20
21
The Meta class is used to encapsulate arbitrary metadata. The Result class
22
uses it to associate metadata with a measurement however it can also be used
23
for example to describe host level metadata (eg. hostname=<hostname>). The
24
Meta class allows writer plugins such as graphite to render the metadata as
25
part of the metric path (eg. servers.host.<hostname>.disk.used.dev.<device>) -
26
if there was no distiction between metadata key=value and metric key=value this
27
would not be possible. Also, the Meta class ensures that metadata keys are
28
unique.
29
30
The _Metrics classes are used to allow plumd to render metrics in a specific
31
format. For example influxdb wants integer values to end with an i when sending
32
via their line protocol. The various _Metric classes force the plugin to
33
explicitly say what each metric is (eg. Int, Float, String, Boolean, Counter,
34
Gauge, Timer, etc). Metrics are defined as namedtuples for reduced memory
35
utilization.
36
37
The Render class is an abstract base class that writer plugins can subclass. The
38
render() function needs to be defined so that it returns a formatted metric. A
39
writer plugin would then instantiate the subclassed Render and use it to record
40
formatted metrics an each push() call. It can then step through the passed
41
metrics in chunks and send them to its backend. Having the metric types defined
42
explicitly allows the writers renderer to format metrics for arbitrary backends.
43
44
45
.. moduleauthor:: Kenny Freeman <[email protected]>
46
47
"""
48
49
__author__ = 'Kenny Freeman'
50
__email__ = '[email protected]'
51
__version__ = '0.3.3'
52
__license__     = "ISCL"
53
__docformat__   = 'reStructuredText'
54
55
import abc
56
import time
57
import logging
58
import collections
59
60
61
DEFAULT_CONFIG_FILE = "/etc/plumd.yaml",    # path to main configuration file
62
63
DEFAULT_CONFIG = {
64
    'log.level':        "warn",               # crit, error, warn, info, debug
65
    'log.format':       "[%(asctime)s] %(levelname)s %(process)d %(message)s",
66
    'config.plugins':   "/etc/plumd.conf.d/", # path to the plugin confs
67
    'delay.startup':    1,                    # random delay for plugin start
68
    'delay.poll':       1,                    # random delay for first poll()
69
    'poll.interval':    15,                   # interval of reader poll() calls
70
    'max.queue':        512,                  # maximum size of internal queues
71
    'meta':             {},                   # key=value's for all metrics
72
    'meta.hostname':    True,                 # set and use hostname in meta
73
    'shutdown_timeout': 10                    # thread shutdown timeout
74
}
75
76
77
LOG_LEVELS = {                                # map config level to python level
78
    'crit':  logging.CRITICAL,
79
    'error': logging.ERROR,
80
    'warn':  logging.WARN,
81
    'info':  logging.INFO,
82
    'debug': logging.DEBUG
83
}
84
85
86
class DuplicatePlugin(Exception):
87
    """Two or more plugins with the same id were defined."""
88
    pass
89
90
91
class PluginNotFoundError(Exception):
92
    """A plugin file, module or class was not found."""
93
    pass
94
95
96
class PluginLoadError(Exception):
97
    """Exception(s) were raised while loading a plugin."""
98
    pass
99
100
101
class PluginRuntimeError(Exception):
102
    """Exception(s) were raised while running a plugin."""
103
    pass
104
105
106
class ConfigError(Exception):
107
    """Invalid configuration encountered."""
108
    pass
109
110
111
#_Metric : the base class of all metric primitives is a named tuple.
112
_Metric = collections.namedtuple('metric', ['name', 'value'])
113
114
115
class Int(_Metric):
116
    """An integer metric - ensures the value passed is an int.
117
118
    raises:
119
        ValueError if the passed value doesn't cast properly to int
120
121
    :param name: The name of the metric
122
    :type name: str
123
    :param value: The recorded value of the metric
124
    :type value: int
125
    :raises: ValueError
126
    """
127
128
    def __new__(self, name, value):
129
        value = int(value)
130
        self = super(Int, self).__new__(self, name, value)
131
        return self
132
133
134
class Float(_Metric):
135
    """A float metric.
136
137
    raises:
138
        ValueError if the passed value doesn't cast properly to float
139
140
    :param name: The name of the metric
141
    :type name: str
142
    :param value: The recorded value of the metric
143
    :type value: float
144
    :raises: ValueError
145
    """
146
147
    def __new__(self, name, value):
148
        value = float(value)
149
        self = super(Float, self).__new__(self, name, value)
150
        return self
151
152
153
class String(_Metric):
154
    """A string metric.
155
156
    raises:
157
        ValueError if the passed value doesn't cast properly to str
158
159
    :param name: The name of the metric
160
    :type name: str
161
    :param value: The recorded value of the metric
162
    :type value: str
163
    :raises: ValueError
164
    """
165
166
    def __new__(self, name, value):
167
        value = str(value)
168
        self = super(String, self).__new__(self, name, value)
169
        return self
170
171
172
class Boolean(_Metric):
173
    """A boolean metric.
174
175
    raises:
176
        ValueError if the passed value doesn't cast properly to bool
177
178
    :param name: The name of the metric
179
    :type name: str
180
    :param value: The recorded value of the metric
181
    :type value: bool
182
    :raises: ValueError
183
    """
184
185
    def __new__(self, name, value):
186
        value = bool(value)
187
        self = super(Boolean, self).__new__(self, name, value)
188
        return self
189
190
191
class Gauge(Float):
192
    """A gauge value - this is always a Float value.
193
194
195
    raises:
196
        ValueError if the passed value doesn't cast properly to int
197
198
    :param name: The name of the metric
199
    :type name: str
200
    :param value: The recorded value of the metric
201
    :type value: float
202
    :raises: ValueError
203
    """
204
    pass
205
206
207
class Counter(Int):
208
    """A metric that counts things - this is always an Integer value.
209
210
211
    raises:
212
        ValueError if the passed value doesn't cast properly to int
213
214
    :param name: The name of the metric
215
    :type name: str
216
    :param value: The recorded value of the metric
217
    :type value: int
218
    :raises: ValueError
219
    """
220
    pass
221
222
223
class Rate(Float):
224
    """A metric that describes a rate - this is always a Float.
225
226
    raises:
227
        ValueError if the passed value doesn't cast properly to float
228
229
    :param name: The name of the metric
230
    :type name: str
231
    :param value: The recorded value of the metric
232
    :type value: float
233
    :raises: ValueError
234
    """
235
    pass
236
237
238
class Timer(Float):
239
    """A metric that describes a timer value - this is always a Float.
240
241
    raises:
242
        ValueError if the passed value doesn't cast properly to float
243
244
    :param name: The name of the metric
245
    :type name: str
246
    :param value: The recorded value of the metric
247
    :type value: float
248
    :raises: ValueError
249
    """
250
    pass
251
252
253
class Meta(object):
254
    """Encapsulates generic key=value metadata however ensures that the
255
    values recorded are one of the four supported base types of Int, Float,
256
    String and Boolean. Also ensures keys are unique.
257
258
    :param metas: Variable list of values.
259
    :type metas: Int or Float or String or Boolean
260
    :raises: ValueError if metas is not one of the expected types or if any of
261
        the keys already exist.
262
    """
263
    #__slots__ = [ '_meta', 'types' ]
264
    types = [Int, Float, String, Boolean]
265
266
    def __init__(self):
267
        self._meta = {}
268
269
270
    def __str__(self):
271
        """Return a human readable str.
272
        :rtype: str"""
273
        s = "Meta(items={0})"
274
        return s.format(sorted(self._meta.keys()))
275
276
277
    def __repr__(self):
278
        """Return a human readable str.
279
        :rtype: str"""
280
        s = "Meta(items={0})"
281
        return s.format(sorted(self._meta.keys()))
282
283
284
    @property
285
    def nkeys(self):
286
        """Return the number of key=value pairs."""
287
        return len(list(self._meta.keys()))
288
289
290
    @property
291
    def keys(self):
292
        """Return a (by default) sorted list of metadata keys.
293
294
        :rtype: :class:`list`"""
295
        return sorted(self._meta.keys())
296
297
    @property
298
    def items(self):
299
        """Return a (by default) sorted list of (key, value) tuples.
300
301
        :rtype: :class:`list`"""
302
        mlist = [(k, self._meta[k]) for k in sorted(self._meta.keys())]
303
        return mlist
304
305
306
    def add(self, val):
307
        """Record a value to our metadata.
308
309
        :param val: The value to add.
310
        :type val: Int or Float or String or Boolean
311
        :raises: ValueError if val is an unexpected type or the
312
            keys already exist.
313
        """
314
        if val.__class__ not in self.types:
315
            cname = val.__class__.__name__
316
            err = "Class not supported: class: {0} : value: {1}"
317
            raise ValueError(err.format(cname, val))
318
        # using indexes: 0 => name, 1 => value
319
        if val[0] not in self._meta:
320
            self._meta[val[0]] = val
321
            return
322
        raise ValueError("key exists: {0}".format(val[0]))
323
324
325
class Result(object):
326
    """Encapsulates a set of metrics and an optional Meta metadata object. Does
327
    not check for duplicate metrics in the same Result.
328
329
    :param name: The name of the result eg. cpu
330
    :type name: str
331
    :param metrics: optional list of metrics
332
    :type metrics: Int or Float or String or Boolean or Gauge or Counter
333
        or Rate or Timer
334
    """
335
    #__slots__ = [ '_name', '_metrics', '_meta', 'types' ]
336
    types = [Int, Float, String, Boolean, Gauge, Counter, Rate, Timer]
337
338
    def __init__(self, name, metrics=None):
339
        """Create a new Result."""
340
        self._name = name
341
        self._metrics = list()
342
        if metrics:
343
            for metric in metrics:
344
                mclass = metric.__class__
345
                if mclass not in self.types:
346
                    err = "Class not supported: class: {0} : metric: {1}"
347
                    raise ValueError(err.format(mclass.__name__, metric))
348
                self._metrics.append(metric)
349
        self._meta = Meta()
350
351
352
    def __str__(self):
353
        """Return a human readable str.
354
        :rtype: str"""
355
        s = "Result(name={0}, metrics={1})"
356
        return s.format(self._name, self._metrics)
357
358
359
    def __repr__(self):
360
        """Return a human readable str.
361
        :rtype: str"""
362
        s = "Result(name={0}, metrics={1})"
363
        return s.format(self._name, self._metrics)
364
365
366
    @property
367
    def name(self):
368
        """Return the Result name."""
369
        return self._name
370
371
372
    @property
373
    def metrics(self):
374
        """Return the Results list of metrics."""
375
        return self._metrics
376
377
378
    @property
379
    def meta(self):
380
        """Return the results metadata object."""
381
        return self._meta
382
383
384
    def add(self, metric):
385
        """Add a metric to our list of metrics.
386
387
        raises:
388
            ValueError if the metric is not a supported type
389
390
        :param metric: One of the metric types (Int, Float, String, etc)
391
        :type metric: Int or Float or String or Boolean or Counter or Gauge
392
        :raises: ValueError
393
        """
394
        mclass = metric.__class__
395
        if mclass not in self.types:
396
            err = "Class not supported: class: {0} : metric: {1}"
397
            raise ValueError(err.format(mclass.__name__, metric))
398
        self._metrics.append(metric)
399
400
401
    def add_list(self, metrics):
402
        """Add a list of metrics to our list of metrics.
403
404
        raises:
405
            ValueError if any metrics are not a supported type
406
407
        :param metrics: list of metrics (Int, Float, String, etc)
408
        :type metrics: list
409
        :raises: ValueError
410
        """
411
        for metric in metrics:
412
            mclass = metric.__class__
413
            if mclass not in self.types:
414
                err = "Class not supported: class: {0} : metric: {1}"
415
                raise ValueError(err.format(mclass.__name__, metric))
416
            self._metrics.append(metric)
417
418
419
class ResultSet(object):
420
    """A class to encapsulate a series of measurement Results during a plugins
421
    poll() call. Each poll() must return a ResultSet object.
422
423
    The class has a list of Result objects and a timestamp. The timestamp
424
    is set to the current UTC time when the object is created. This is done
425
    since each poll() call should normally complete within ms or us and the
426
    target audience for the recorded metrics are eg. grafana graphs and
427
    system alerts with resolutions down to 1 second typically. The difference
428
    in time between the first recorded metric and last is generally going to
429
    be much less than 1 second.
430
431
    :Example:
432
433
    >>>import plumd
434
    >>>metrics = [ plumd.Float("idle", 0.0), plumd.Float("user", 100.0) ]
435
    >>>res = Result("cpu", metrics)
436
    >>>rset = ResultSet(results=[res])
437
438
    >>>metrics = [ plumd.Float("free", 80.0), plumd.Float("used", 20.0) ]
439
    >>>res = Result("disk", metrics)
440
    >>>res.meta.add(plumd.String("dev", "sda1"))
441
    >>>rset = ResultSet(results=[res])
442
    """
443
    #__slots__ = [ '_results', '_time' ] # do this to save memory
444
445
    def __init__(self, results=None):
446
        """Class to encapsulate a collection of metrics eg. from poll()."""
447
        self._results = list() if results is None else list(results)
448
        self._time = time.time()
449
450
451
    @property
452
    def time(self):
453
        """Return our timestamp.
454
        :rtype: float
455
        """
456
        return self._time
457
458
459
    @property
460
    def nresults(self):
461
        """Return the number of results recorded so far.
462
        :rtype: int
463
        """
464
        return len(self._results)
465
466
467
    @property
468
    def results(self):
469
        """Yields a tuple for each Result recorded in the format:
470
471
        ( time, result_name, result_meta, [metric, metric, metric] )
472
473
        :rtype: generator
474
        """
475
        for robj in self._results:
476
            yield (self._time, robj.name, robj.meta, robj.metrics)
477
478
479
    def __str__(self):
480
        """Return a human readable str.
481
        :rtype: str"""
482
        s = "ResultSet(results={0}, time={1})"
483
        return s.format(len(self._results), self._time)
484
485
486
    def __repr__(self):
487
        """Return a human readable str.
488
        :rtype: str"""
489
        s = "ResultSet(results={0}, time={1})"
490
        return s.format(len(self._results), self._time)
491
492
493
    def add(self, result):
494
        """Record a result in this ResultSet.
495
496
        :param result: a Result object
497
        :type results: Result
498
        """
499
        if not isinstance(result, Result):
500
            raise ValueError("Invalid result: {0}".format(result))
501
        self._results.append(result)
502
503
504
    def add_list(self, results):
505
        """Record a list of results in this ResultSet.
506
507
        :param results: a list of Result objects
508
        :type results: list
509
        """
510
        for result in results:
511
            if not isinstance(result, Result):
512
                raise ValueError("Invalid result: {0}".format(result))
513
            self._results.append(result)
514
515
516
class Render(object):
517
    """A helper class to render metrics, buffer them and return chunks of
518
    metrics. Used as a superclass for the various metric writers (eg.
519
    influxdb, graphite, etc).
520
521
    The idea is to feed this object a ResultSet on each call to a writer
522
    plugins push() and then consume chunks of metrics from it until a full
523
    chunk cannot be returned.
524
525
    To use this, a writer plugin subclasses Render and defines the process
526
    method.The plugins implementation of process() should format the metrics it
527
    gets passed in a format suitable for sending to its backend and store them
528
    in the metrics dequeue.
529
530
    The plugin can then create an instance of it's render object and in push()
531
    simply call instance.process(results) and instance.get_chunk() to get the
532
    next chunk of metrics.
533
534
    On shutdown it can also call instance.get_chunk(partial=True) to get the
535
    remaining metrics. Normally it would want to fetch full batches of metrics.
536
537
    :param rconfig: config.Conf object passed from a writer plugin instance
538
    :type rconfig: :class:`config.Conf`
539
    """
540
    __metaclass__ = abc.ABCMeta
541
542
    defaults = {
543
        'max_queue': 8192,      # maxlen of deque used for metrics
544
        'max_items': 64,        # maximum number of items to send at once
545
        'max_bytes': 1024,      # maximum number of bytes to send at once
546
        'meta': {}              # metadata to add to each metric
547
    }
548
549
    def __init__(self, rconfig):
550
        # ensure default configuration is set
551
        rconfig.defaults(Render.defaults)
552
        self.max_bytes = rconfig.get('max_bytes')
553
        self.max_items = rconfig.get('max_items')
554
        # Use a double ended queue with a maximum size
555
        maxlen = rconfig.get('max_queue')
556
        self.metrics = collections.deque(maxlen=maxlen)
557
        # define a metadata object and copy in any configured host metadata
558
        self.meta = Meta()
559
        clookup = {
560
            int: Int,
561
            float: Float,
562
            str: String,
563
            bool: Boolean
564
        }
565
        for key, val in rconfig.get('meta').items():
566
            if val.__class__ not in clookup:
567
                args = ( key, val.__class__.__name__ )
568
                err = "host metadata: unsupported type: {0}={1}".format(*args)
569
                raise ValueError(err)
570
            self.meta.add(clookup[val.__class__](key, val))
571
572
573
    def __iter__(self):
574
        """A Render iterator returns rendered metrics where the metrics are
575
        each as close to self.max_bytes in size as possible.
576
577
        :rtype: iterator
578
        """
579
        return self
580
581
582
    def __next__(self):
583
        """Return the next metric from the deque, or raise StopIteration() if
584
        there are no more metrics.
585
586
        :rtype: object
587
        """
588
        if len(self.metrics) < 1:
589
            raise StopIteration()
590
        return self.metrics.popleft()
591
592
593
    @abc.abstractmethod
594
    def process(self, rset):
595
        """Record a result set in our backends format. Subclasses for specific
596
        backends must define this method.
597
598
        :param rset: A :class:`ResultSet` object
599
        :type rset: :class:`ResultSet`
600
        """
601
        raise NotImplementedError("process must be defined by a subclass")
602