Completed
Branch master (2f3d56)
by Kenny
01:12
created

Render.__init__()   B

Complexity

Conditions 4

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 1 Features 1
Metric Value
cc 4
c 2
b 1
f 1
dl 0
loc 23
rs 8.7972
1
# -*- coding: utf-8 -*-
2
"""There are many differences in the various time series databases. Plumd is
3
designed to support multiple backends using the classes defined here to
4
represent metrics in a uniform way:
5
6
    ResultSet - a collection of one or more Results from a call to poll()
7
    Result - a measurement of a single thing eg. disk free=0.0,used=100.0
8
    Meta - metadata associated with a Result eg. dev=sda1
9
    Int/Float/String/Boolean/Gauge/Rate/Counter/Timer - primitives for above
10
11
The ResultSet class is used to encapsulate all of the measurements taken
12
during a plugins poll() call and uses a single timestamp for that poll.
13
14
The Result class is used to encapsulate a single measurement - for example
15
measuring cpu utilization may result in idle, user, system etc values. These
16
would be recorded in a single Result object. In addition the Result class
17
allows for arbitrary metadata to be associated with a Result - eg. when
18
recording disk space there may be used, free, total however also metadata such
19
as dev=sda1 to associate the measurements with a specific block device.
20
21
The Meta class is used to encapsulate arbitrary metadata. The Result class
22
uses it to associate metadata with a measurement however it can also be used
23
for example to describe host level metadata (eg. hostname=<hostname>). The
24
Meta class allows writer plugins such as graphite to render the metadata as
25
part of the metric path (eg. servers.host.<hostname>.disk.used.dev.<device>) -
26
if there was no distiction between metadata key=value and metric key=value this
27
would not be possible. Also, the Meta class ensures that metadata keys are
28
unique.
29
30
The _Metrics classes are used to allow plumd to render metrics in a specific
31
format. For example influxdb wants integer values to end with an i when sending
32
via their line protocol. The various _Metric classes force the plugin to
33
explicitly say what each metric is (eg. Int, Float, String, Boolean, Counter,
34
Gauge, Timer, etc). Metrics are defined as namedtuples for reduced memory
35
utilization.
36
37
The Render class is an abstract base class that writer plugins can subclass. The
38
render() function needs to be defined so that it returns a formatted metric. A
39
writer plugin would then instantiate the subclassed Render and use it to record
40
formatted metrics an each push() call. It can then step through the passed
41
metrics in chunks and send them to its backend. Having the metric types defined
42
explicitly allows the writers renderer to format metrics for arbitrary backends.
43
44
45
.. moduleauthor:: Kenny Freeman <[email protected]>
46
47
"""
48
49
__author__ = 'Kenny Freeman'
50
__email__ = '[email protected]'
51
__version__ = '0.3.3'
52
__license__     = "ISCL"
53
__docformat__   = 'reStructuredText'
54
55
import abc
56
import time
57
import logging
58
import collections
59
60
61
DEFAULT_CONFIG_FILE = "/etc/plumd.yaml",    # path to main configuration file
62
63
DEFAULT_CONFIG = {
64
    'log.level':        "warn",               # crit, error, warn, info, debug
65
    'log.format':       "[%(asctime)s] %(levelname)s %(process)d %(message)s",
66
    'config.plugins':   "/etc/plumd.conf.d/", # path to the plugin confs
67
    'delay.startup':    1,                    # random delay for plugin start
68
    'delay.poll':       1,                    # random delay for first poll()
69
    'poll.interval':    15,                   # interval of reader poll() calls
70
    'max.queue':        512,                  # maximum size of internal queues
71
    'meta':             {},                   # key=value's for all metrics
72
    'meta.hostname':    True,                 # set and use hostname in meta
73
    'shutdown_timeout': 10                    # thread shutdown timeout
74
}
75
76
77
LOG_LEVELS = {                                # map config level to python level
78
    'crit':  logging.CRITICAL,
79
    'error': logging.ERROR,
80
    'warn':  logging.WARN,
81
    'info':  logging.INFO,
82
    'debug': logging.DEBUG
83
}
84
85
86
class DuplicatePlugin(Exception):
87
    """Two or more plugins with the same id were defined."""
88
    pass
89
90
91
class PluginNotFoundError(Exception):
92
    """A plugin file, module or class was not found."""
93
    pass
94
95
96
class PluginLoadError(Exception):
97
    """Exception(s) were raised while loading a plugin."""
98
    pass
99
100
101
class PluginRuntimeError(Exception):
102
    """Exception(s) were raised while running a plugin."""
103
    pass
104
105
106
class ConfigError(Exception):
107
    """Invalid configuration encountered."""
108
    pass
109
110
111
#_Metric : the base class of all metric primitives is a named tuple.
112
_Metric = collections.namedtuple('metric', ['name', 'value'])
113
114
115
class Int(_Metric):
116
    """An integer metric - ensures the value passed is an int.
117
118
    raises:
119
        ValueError if the passed value doesn't cast properly to int
120
121
    :param name: The name of the metric
122
    :type name: str
123
    :param value: The recorded value of the metric
124
    :type value: int
125
    :raises: ValueError
126
    """
127
128
    def __new__(self, name, value):
129
        value = int(value)
130
        self = super(Int, self).__new__(self, name, value)
131
        return self
132
133
134
class Float(_Metric):
135
    """A float metric.
136
137
    raises:
138
        ValueError if the passed value doesn't cast properly to float
139
140
    :param name: The name of the metric
141
    :type name: str
142
    :param value: The recorded value of the metric
143
    :type value: float
144
    :raises: ValueError
145
    """
146
147
    def __new__(self, name, value):
148
        value = float(value)
149
        self = super(Float, self).__new__(self, name, value)
150
        return self
151
152
153
class String(_Metric):
154
    """A string metric.
155
156
    raises:
157
        ValueError if the passed value doesn't cast properly to str
158
159
    :param name: The name of the metric
160
    :type name: str
161
    :param value: The recorded value of the metric
162
    :type value: str
163
    :raises: ValueError
164
    """
165
166
    def __new__(self, name, value):
167
        value = str(value)
168
        self = super(String, self).__new__(self, name, value)
169
        return self
170
171
172
class Boolean(_Metric):
173
    """A boolean metric.
174
175
    raises:
176
        ValueError if the passed value doesn't cast properly to bool
177
178
    :param name: The name of the metric
179
    :type name: str
180
    :param value: The recorded value of the metric
181
    :type value: bool
182
    :raises: ValueError
183
    """
184
185
    def __new__(self, name, value):
186
        value = bool(value)
187
        self = super(Boolean, self).__new__(self, name, value)
188
        return self
189
190
191
class Gauge(Float):
192
    """A gauge value - this is always a Float value.
193
194
195
    raises:
196
        ValueError if the passed value doesn't cast properly to int
197
198
    :param name: The name of the metric
199
    :type name: str
200
    :param value: The recorded value of the metric
201
    :type value: float
202
    :raises: ValueError
203
    """
204
    pass
205
206
207
class Counter(Int):
208
    """A metric that counts things - this is always an Integer value.
209
210
211
    raises:
212
        ValueError if the passed value doesn't cast properly to int
213
214
    :param name: The name of the metric
215
    :type name: str
216
    :param value: The recorded value of the metric
217
    :type value: int
218
    :raises: ValueError
219
    """
220
    pass
221
222
223
class Rate(Float):
224
    """A metric that describes a rate - this is always a Float.
225
226
    raises:
227
        ValueError if the passed value doesn't cast properly to float
228
229
    :param name: The name of the metric
230
    :type name: str
231
    :param value: The recorded value of the metric
232
    :type value: float
233
    :raises: ValueError
234
    """
235
    pass
236
237
238
class Timer(Float):
239
    """A metric that describes a timer value - this is always a Float.
240
241
    raises:
242
        ValueError if the passed value doesn't cast properly to float
243
244
    :param name: The name of the metric
245
    :type name: str
246
    :param value: The recorded value of the metric
247
    :type value: float
248
    :raises: ValueError
249
    """
250
    pass
251
252
253
class Meta(object):
254
    """Encapsulates generic key=value metadata however ensures that the
255
    values recorded are one of the four supported base types of Int, Float,
256
    String and Boolean. Also ensures keys are unique.
257
258
    :param metas: Variable list of values.
259
    :type metas: Int or Float or String or Boolean
260
    :raises: ValueError if metas is not one of the expected types or if any of
261
        the keys already exist.
262
    """
263
    #__slots__ = [ '_meta', 'types' ]
264
    types = [Int, Float, String, Boolean]
265
266
    def __init__(self):
267
        self._meta = collections.OrderedDict()
268
269
270
    def __str__(self):
271
        """Return a human readable str.
272
        :rtype: str"""
273
        s = "Meta(items={0})"
274
        return s.format(sorted(self._meta.keys()))
275
276
277
    def __repr__(self):
278
        """Return a human readable str.
279
        :rtype: str"""
280
        s = "Meta(items={0})"
281
        return s.format(sorted(self._meta.keys()))
282
283
284
    @property
285
    def nkeys(self):
286
        """Return the number of key=value pairs."""
287
        return len(list(self._meta.keys()))
288
289
290
    @property
291
    def keys(self):
292
        """Return a (by default) sorted list of metadata keys.
293
294
        :rtype: :class:`list`"""
295
        return sorted(self._meta.keys())
296
297
    @property
298
    def items(self):
299
        """Return a (by default) sorted list of (key, value) tuples.
300
301
        :rtype: :class:`list`"""
302
        return self._meta.items()
303
304
305
    def add(self, val):
306
        """Record a value to our metadata.
307
308
        :param val: The value to add.
309
        :type val: Int or Float or String or Boolean
310
        :raises: ValueError if val is an unexpected type or the
311
            keys already exist.
312
        """
313
        if val.__class__ not in self.types:
314
            cname = val.__class__.__name__
315
            err = "Class not supported: class: {0} : value: {1}"
316
            raise ValueError(err.format(cname, val))
317
        # using indexes: 0 => name, 1 => value
318
        if val[0] not in self._meta:
319
            self._meta[val[0]] = val
320
            return
321
        raise ValueError("key exists: {0}".format(val[0]))
322
323
324
class Result(object):
325
    """Encapsulates a set of metrics and an optional Meta metadata object. Does
326
    not check for duplicate metrics in the same Result.
327
328
    :param name: The name of the result eg. cpu
329
    :type name: str
330
    :param metrics: optional list of metrics
331
    :type metrics: Int or Float or String or Boolean or Gauge or Counter
332
        or Rate or Timer
333
    """
334
    #__slots__ = [ '_name', '_metrics', '_meta', 'types' ]
335
    types = [Int, Float, String, Boolean, Gauge, Counter, Rate, Timer]
336
337
    def __init__(self, name, metrics=None):
338
        """Create a new Result."""
339
        self._name = name
340
        self._metrics = list()
341
        if metrics:
342
            for metric in metrics:
343
                mclass = metric.__class__
344
                if mclass not in self.types:
345
                    err = "Class not supported: class: {0} : metric: {1}"
346
                    raise ValueError(err.format(mclass.__name__, metric))
347
                self._metrics.append(metric)
348
        self._meta = Meta()
349
350
351
    def __str__(self):
352
        """Return a human readable str.
353
        :rtype: str"""
354
        s = "Result(name={0}, metrics={1})"
355
        return s.format(self._name, self._metrics)
356
357
358
    def __repr__(self):
359
        """Return a human readable str.
360
        :rtype: str"""
361
        s = "Result(name={0}, metrics={1})"
362
        return s.format(self._name, self._metrics)
363
364
365
    @property
366
    def name(self):
367
        """Return the Result name."""
368
        return self._name
369
370
371
    @property
372
    def metrics(self):
373
        """Return the Results list of metrics."""
374
        return self._metrics
375
376
377
    @property
378
    def meta(self):
379
        """Return the results metadata object."""
380
        return self._meta
381
382
383
    def add(self, metric):
384
        """Add a metric to our list of metrics.
385
386
        raises:
387
            ValueError if the metric is not a supported type
388
389
        :param metric: One of the metric types (Int, Float, String, etc)
390
        :type metric: Int or Float or String or Boolean or Counter or Gauge
391
        :raises: ValueError
392
        """
393
        mclass = metric.__class__
394
        if mclass not in self.types:
395
            err = "Class not supported: class: {0} : metric: {1}"
396
            raise ValueError(err.format(mclass.__name__, metric))
397
        self._metrics.append(metric)
398
399
400
    def add_list(self, metrics):
401
        """Add a list of metrics to our list of metrics.
402
403
        raises:
404
            ValueError if any metrics are not a supported type
405
406
        :param metrics: list of metrics (Int, Float, String, etc)
407
        :type metrics: list
408
        :raises: ValueError
409
        """
410
        for metric in metrics:
411
            mclass = metric.__class__
412
            if mclass not in self.types:
413
                err = "Class not supported: class: {0} : metric: {1}"
414
                raise ValueError(err.format(mclass.__name__, metric))
415
            self._metrics.append(metric)
416
417
418
class ResultSet(object):
419
    """A class to encapsulate a series of measurement Results during a plugins
420
    poll() call. Each poll() must return a ResultSet object.
421
422
    The class has a list of Result objects and a timestamp. The timestamp
423
    is set to the current UTC time when the object is created. This is done
424
    since each poll() call should normally complete within ms or us and the
425
    target audience for the recorded metrics are eg. grafana graphs and
426
    system alerts with resolutions down to 1 second typically. The difference
427
    in time between the first recorded metric and last is generally going to
428
    be much less than 1 second.
429
430
    :Example:
431
432
    >>>import plumd
433
    >>>metrics = [ plumd.Float("idle", 0.0), plumd.Float("user", 100.0) ]
434
    >>>res = Result("cpu", metrics)
435
    >>>rset = ResultSet(results=[res])
436
437
    >>>metrics = [ plumd.Float("free", 80.0), plumd.Float("used", 20.0) ]
438
    >>>res = Result("disk", metrics)
439
    >>>res.meta.add(plumd.String("dev", "sda1"))
440
    >>>rset = ResultSet(results=[res])
441
    """
442
    #__slots__ = [ '_results', '_time' ] # do this to save memory
443
444
    def __init__(self, results=None):
445
        """Class to encapsulate a collection of metrics eg. from poll()."""
446
        self._results = list() if results is None else list(results)
447
        self._time = time.time()
448
449
450
    @property
451
    def time(self):
452
        """Return our timestamp.
453
        :rtype: float
454
        """
455
        return self._time
456
457
458
    @property
459
    def nresults(self):
460
        """Return the number of results recorded so far.
461
        :rtype: int
462
        """
463
        return len(self._results)
464
465
466
    @property
467
    def results(self):
468
        """Yields a tuple for each Result recorded in the format:
469
470
        ( time, result_name, result_meta, [metric, metric, metric] )
471
472
        :rtype: generator
473
        """
474
        for robj in self._results:
475
            yield (self._time, robj.name, robj.meta, robj.metrics)
476
477
478
    def __str__(self):
479
        """Return a human readable str.
480
        :rtype: str"""
481
        s = "ResultSet(results={0}, time={1})"
482
        return s.format(len(self._results), self._time)
483
484
485
    def __repr__(self):
486
        """Return a human readable str.
487
        :rtype: str"""
488
        s = "ResultSet(results={0}, time={1})"
489
        return s.format(len(self._results), self._time)
490
491
492
    def add(self, result):
493
        """Record a result in this ResultSet.
494
495
        :param result: a Result object
496
        :type results: Result
497
        """
498
        if not isinstance(result, Result):
499
            raise ValueError("Invalid result: {0}".format(result))
500
        self._results.append(result)
501
502
503
    def add_list(self, results):
504
        """Record a list of results in this ResultSet.
505
506
        :param results: a list of Result objects
507
        :type results: list
508
        """
509
        for result in results:
510
            if not isinstance(result, Result):
511
                raise ValueError("Invalid result: {0}".format(result))
512
            self._results.append(result)
513
514
515
class Render(object):
516
    """A helper class to render metrics, buffer them and return chunks of
517
    metrics. Used as a superclass for the various metric writers (eg.
518
    influxdb, graphite, etc).
519
520
    The idea is to feed this object a ResultSet on each call to a writer
521
    plugins push() and then consume chunks of metrics from it until a full
522
    chunk cannot be returned.
523
524
    To use this, a writer plugin subclasses Render and defines the process
525
    method.The plugins implementation of process() should format the metrics it
526
    gets passed in a format suitable for sending to its backend and store them
527
    in the metrics dequeue.
528
529
    The plugin can then create an instance of it's render object and in push()
530
    simply call instance.process(results) and instance.get_chunk() to get the
531
    next chunk of metrics.
532
533
    On shutdown it can also call instance.get_chunk(partial=True) to get the
534
    remaining metrics. Normally it would want to fetch full batches of metrics.
535
536
    :param rconfig: config.Conf object passed from a writer plugin instance
537
    :type rconfig: :class:`config.Conf`
538
    """
539
    __metaclass__ = abc.ABCMeta
540
541
    defaults = {
542
        'max_queue': 8192,      # maxlen of deque used for metrics
543
        'max_items': 64,        # maximum number of items to send at once
544
        'max_bytes': 1400,      # maximum number of bytes to send at once
545
        'meta': {}              # metadata to add to each metric
546
    }
547
548
    def __init__(self, rconfig):
549
        # ensure default configuration is set
550
        rconfig.defaults(Render.defaults)
551
        self.max_bytes = rconfig.get('max_bytes')
552
        self.max_items = rconfig.get('max_items')
553
        # Use a double ended queue with a maximum size
554
        maxlen = rconfig.get('max_queue')
555
        self.metrics = collections.deque(maxlen=maxlen)
556
        # define a metadata object and copy in any configured host metadata
557
        self.meta = Meta()
558
        clookup = {
559
            int: Int,
560
            float: Float,
561
            str: String,
562
            bool: Boolean
563
        }
564
        for entry in rconfig.get('meta'):
565
            for key, val in entry.items():
566
                if val.__class__ not in clookup:
567
                    args = ( key, val.__class__.__name__ )
568
                    err = "host metadata: unsupported type: {0}={1}".format(*args)
569
                    raise ValueError(err)
570
                self.meta.add(clookup[val.__class__](key, val))
571
572
573
    def __iter__(self):
574
        """A Render iterator returns rendered metrics where the metrics are
575
        each as close to self.max_bytes in size as possible.
576
577
        :rtype: iterator
578
        """
579
        return self
580
581
582
    def __next__(self):
583
        """Return the next metric from the deque, or raise StopIteration() if
584
        there are no more metrics.
585
586
        :rtype: object
587
        """
588
        if len(self.metrics) < 1:
589
            raise StopIteration()
590
        return self.metrics.popleft()
591
592
593
    def next(self):
594
        """Return the next metric from the deque, or raise StopIteration() if
595
        there are no more metrics.
596
597
        :rtype: object
598
        """
599
        if len(self.metrics) < 1:
600
            raise StopIteration()
601
        return self.metrics.popleft()
602
603
604
    @abc.abstractmethod
605
    def process(self, rset):
606
        """Record a result set in our backends format. Subclasses for specific
607
        backends must define this method.
608
609
        :param rset: A :class:`ResultSet` object
610
        :type rset: :class:`ResultSet`
611
        """
612
        raise NotImplementedError("process must be defined by a subclass")
613