Completed
Push — master ( c60d6f...54bb83 )
by Kenny
01:41
created

Reader.poll()   A

Complexity

Conditions 1

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 12
rs 9.4285
cc 1
1
# -*- coding: utf-8 -*-
2
"""There are many differences in the various time series databases. Plumd is
3
designed to support multiple backends using the classes defined here to
4
represent metrics in a uniform way:
5
6
    ResultSet - a collection of one or more Results from a call to poll()
7
    Result - a measurement of a single thing eg. disk free=0.0,used=100.0
8
    Meta - metadata associated with a Result eg. dev=sda1
9
    Int/Float/String/Boolean/Gauge/Rate/Counter/Timer - primitives for above
10
11
The ResultSet class is used to encapsulate all of the measurements taken
12
during a plugins poll() call and uses a single timestamp for that poll.
13
14
The Result class is used to encapsulate a single measurement - for example
15
measuring cpu utilization may result in idle, user, system etc values. These
16
would be recorded in a single Result object. In addition the Result class
17
allows for arbitrary metadata to be associated with a Result - eg. when
18
recording disk space there may be used, free, total however also metadata such
19
as dev=sda1 to associate the measurements with a specific block device.
20
21
The Meta class is used to encapsulate arbitrary metadata. The Result class
22
uses it to associate metadata with a measurement however it can also be used
23
for example to describe host level metadata (eg. hostname=<hostname>). The
24
Meta class allows writer plugins such as graphite to render the metadata as
25
part of the metric path (eg. servers.host.<hostname>.disk.used.dev.<device>) -
26
if there was no distiction between metadata key=value and metric key=value this
27
would not be possible. Also, the Meta class ensures that metadata keys are
28
unique.
29
30
The _Metrics classes are used to allow plumd to render metrics in a specific
31
format. For example influxdb wants integer values to end with an i when sending
32
via their line protocol. The various _Metric classes force the plugin to
33
explicitly say what each metric is (eg. Int, Float, String, Boolean, Counter,
34
Gauge, Timer, etc). Metrics are defined as namedtuples for reduced memory
35
utilization.
36
37
The Render class is an abstract base class that writer plugins can subclass. The
38
render() function needs to be defined so that it returns a formatted metric. A
39
writer plugin would then instantiate the subclassed Render and use it to record
40
formatted metrics an each push() call. It can then step through the passed
41
metrics in chunks and send them to its backend. Having the metric types defined
42
explicitly allows the writers renderer to format metrics for arbitrary backends.
43
44
45
.. moduleauthor:: Kenny Freeman <[email protected]>
46
47
"""
48
49
import os
50
import time
51
import random
52
import logging
53
import threading
54
import collections
55
from collections import deque
56
from abc import ABCMeta, abstractmethod
57
58
from plumd.util import Interval, load_all_plugins, config_plugin_writers
59
60
__author__ = 'Kenny Freeman'
61
__email__ = '[email protected]'
62
__version__ = '0.15'
63
__license__ = "ISCL"
64
__docformat__ = 'reStructuredText'
65
66
67
DEFAULT_CONFIG_FILE = "/etc/plumd.yaml"    # path to main configuration file
68
69
DEFAULT_CONFIG = {
70
    'log.level':        "warn",               # crit, error, warn, info, debug
71
    'log.format':       "[%(asctime)s] %(levelname)s %(process)d %(message)s",
72
    'config.plugins':   "/etc/plumd.conf.d/",  # path to the plugin confs
73
    'delay.startup':    1,                    # random delay for plugin start
74
    'delay.poll':       1,                    # random delay for first poll()
75
    'poll.interval':    15,                   # interval of reader poll() calls
76
    'max.queue':        512,                  # maximum size of internal queues
77
    'meta':             [],                   # key=value's for all metrics
78
    'meta.hostname':    True,                 # set and use hostname in meta
79
    'shutdown_timeout': 10                    # thread shutdown timeout
80
}
81
82
83
LOG_LEVELS = {                                # map config level to python level
84
    'crit':  logging.CRITICAL,
85
    'error': logging.ERROR,
86
    'warn':  logging.WARN,
87
    'info':  logging.INFO,
88
    'debug': logging.DEBUG
89
}
90
91
92
class DuplicatePlugin(Exception):
93
    """Two or more plugins with the same id were defined."""
94
95
    pass
96
97
98
class PluginNotFoundError(Exception):
99
    """A plugin file, module or class was not found."""
100
101
    pass
102
103
104
class PluginLoadError(Exception):
105
    """Exception(s) were raised while loading a plugin."""
106
107
    pass
108
109
110
class PluginRuntimeError(Exception):
111
    """Exception(s) were raised while running a plugin."""
112
113
    pass
114
115
116
class ConfigError(Exception):
117
    """Invalid configuration encountered."""
118
119
    pass
120
121
122
# _Metric : the base class of all metric primitives is a named tuple.
123
_Metric = collections.namedtuple('metric', ['name', 'value'])
124
125
126
class Int(_Metric):
127
    """An integer metric - ensures the value passed is an int.
128
129
    raises:
130
        ValueError if the passed value doesn't cast properly to int
131
132
    :param name: The name of the metric
133
    :type name: str
134
    :param value: The recorded value of the metric
135
    :type value: int
136
    :raises: ValueError
137
    """
138
139
    def __new__(cls, name, value):
140
        """An integer metric - ensures the value passed is an int.
141
142
        raises:
143
            ValueError if the passed value doesn't cast properly to int
144
145
        :param name: The name of the metric
146
        :type name: str
147
        :param value: The recorded value of the metric
148
        :type value: int
149
        :raises: ValueError
150
        """
151
        value = int(value)
152
        cls = super(Int, cls).__new__(cls, name, value)
153
        return cls
154
155
156
class Float(_Metric):
157
    """A float metric.
158
159
    raises:
160
        ValueError if the passed value doesn't cast properly to float
161
162
    :param name: The name of the metric
163
    :type name: str
164
    :param value: The recorded value of the metric
165
    :type value: float
166
    :raises: ValueError
167
    """
168
169
    def __new__(cls, name, value):
170
        """A float metric.
171
172
        raises:
173
            ValueError if the passed value doesn't cast properly to float
174
175
        :param name: The name of the metric
176
        :type name: str
177
        :param value: The recorded value of the metric
178
        :type value: float
179
        :raises: ValueError
180
        """
181
        value = float(value)
182
        cls = super(Float, cls).__new__(cls, name, value)
183
        return cls
184
185
186
class String(_Metric):
187
    """A string metric.
188
189
    raises:
190
        ValueError if the passed value doesn't cast properly to str
191
192
    :param name: The name of the metric
193
    :type name: str
194
    :param value: The recorded value of the metric
195
    :type value: str
196
    :raises: ValueError
197
    """
198
199
    def __new__(cls, name, value):
200
        """A string metric.
201
202
        raises:
203
            ValueError if the passed value doesn't cast properly to str
204
205
        :param name: The name of the metric
206
        :type name: str
207
        :param value: The recorded value of the metric
208
        :type value: str
209
        :raises: ValueError
210
        """
211
        value = str(value)
212
        cls = super(String, cls).__new__(cls, name, value)
213
        return cls
214
215
216
class Boolean(_Metric):
217
    """A boolean metric.
218
219
    raises:
220
        ValueError if the passed value doesn't cast properly to bool
221
222
    :param name: The name of the metric
223
    :type name: str
224
    :param value: The recorded value of the metric
225
    :type value: bool
226
    :raises: ValueError
227
    """
228
229
    def __new__(cls, name, value):
230
        """A boolean metric.
231
232
        raises:
233
            ValueError if the passed value doesn't cast properly to bool
234
235
        :param name: The name of the metric
236
        :type name: str
237
        :param value: The recorded value of the metric
238
        :type value: bool
239
        :raises: ValueError
240
        """
241
        value = bool(value)
242
        cls = super(Boolean, cls).__new__(cls, name, value)
243
        return cls
244
245
246
class Gauge(Float):
247
    """A gauge value - this is always a Float value.
248
249
250
    raises:
251
        ValueError if the passed value doesn't cast properly to int
252
253
    :param name: The name of the metric
254
    :type name: str
255
    :param value: The recorded value of the metric
256
    :type value: float
257
    :raises: ValueError
258
    """
259
    pass
260
261
262
class Counter(Int):
263
    """A metric that counts things - this is always an Integer value.
264
265
266
    raises:
267
        ValueError if the passed value doesn't cast properly to int
268
269
    :param name: The name of the metric
270
    :type name: str
271
    :param value: The recorded value of the metric
272
    :type value: int
273
    :raises: ValueError
274
    """
275
    pass
276
277
278
class Rate(Float):
279
    """A metric that describes a rate - this is always a Float.
280
281
    raises:
282
        ValueError if the passed value doesn't cast properly to float
283
284
    :param name: The name of the metric
285
    :type name: str
286
    :param value: The recorded value of the metric
287
    :type value: float
288
    :raises: ValueError
289
    """
290
    pass
291
292
293
class Timer(Float):
294
    """A metric that describes a timer value - this is always a Float.
295
296
    raises:
297
        ValueError if the passed value doesn't cast properly to float
298
299
    :param name: The name of the metric
300
    :type name: str
301
    :param value: The recorded value of the metric
302
    :type value: float
303
    :raises: ValueError
304
    """
305
    pass
306
307
308
class Meta(object):
309
    """Encapsulate generic key=value metadata however ensure that the
310
    values recorded are one of the four supported base types of Int, Float,
311
    String and Boolean. Also ensures keys are unique.
312
313
    :param metas: Variable list of values.
314
    :type metas: Int or Float or String or Boolean
315
    :raises: ValueError if metas is not one of the expected types or if any of
316
        the keys already exist.
317
    """
318
319
    types = [Int, Float, String, Boolean]
320
321
    def __init__(self):
322
        """Encapsulate generic key=value metadata however ensure that the
323
        values recorded are one of the four supported base types of Int, Float,
324
        String and Boolean. Also ensures keys are unique.
325
326
        :param metas: Variable list of values.
327
        :type metas: Int or Float or String or Boolean
328
        :raises: ValueError if metas is not one of the expected types or if any
329
            of the keys already exist.
330
        """
331
        self._meta = collections.OrderedDict()
332
333
    def __str__(self):
334
        """Return a human readable str.
335
336
        :rtype: str
337
        """
338
        sval = "Meta(items={0})"
339
        vals = [ "{0}={1}".format(val[0], val[1]) for val in self._meta.items() ]
340
        return sval.format(",".join(vals))
341
342
    def __repr__(self):
343
        """Return a human readable str.
344
345
        :rtype: str
346
        """
347
        sval = "Meta(items={0})"
348
        vals = [ "{0}={1}".format(val[0], val[1]) for val in self._meta.items() ]
349
        return sval.format(",".join(vals))
350
351
    @property
352
    def nkeys(self):
353
        """Return the number of key=value pairs."""
354
        return len(list(self._meta.keys()))
355
356
    @property
357
    def keys(self):
358
        """Return a list of metadata keys.
359
360
        :rtype: :class:`list`
361
        """
362
        return self._meta.keys()
363
364
    @property
365
    def items(self):
366
        """Return a (by default) sorted list of (key, value) tuples.
367
368
        :rtype: :class:`list`
369
        """
370
        return self._meta.items()
371
372
    def add(self, val):
373
        """Record a value to our metadata.
374
375
        :param val: The value to add.
376
        :type val: Int or Float or String or Boolean
377
        :raises: ValueError if val is an unexpected type or the
378
            keys already exist.
379
        """
380
        if val.__class__ not in self.types:
381
            cname = val.__class__.__name__
382
            err = "Class not supported: class: {0} : value: {1}"
383
            raise ValueError(err.format(cname, val))
384
        # using indexes: 0 => name, 1 => value
385
        if val[0] not in self._meta:
386
            self._meta[val[0]] = val
387
            return
388
        raise ValueError("key exists: {0}".format(val[0]))
389
390
391
class Result(object):
392
    """Encapsulate a set of metrics and an optional Meta metadata object.
393
394
    :param name: The name of the result eg. cpu
395
    :type name: str
396
    :param metrics: optional list of metrics
397
    :type metrics: Int or Float or String or Boolean or Gauge or Counter
398
        or Rate or Timer
399
    """
400
401
    types = [Int, Float, String, Boolean, Gauge, Counter, Rate, Timer]
402
403
    def __init__(self, name, metrics=None):
404
        """Encapsulate a set of metrics and an optional Meta metadata object.
405
406
        :param name: The name of the result eg. cpu
407
        :type name: str
408
        :param metrics: optional list of metrics
409
        :type metrics: Int or Float or String or Boolean or Gauge or Counter
410
            or Rate or Timer
411
        """
412
        self._name = name
413
        self._metrics = list()
414
        if metrics:
415
            for metric in metrics:
416
                mclass = metric.__class__
417
                if mclass not in self.types:
418
                    err = "Class not supported: class: {0} : metric: {1}"
419
                    raise ValueError(err.format(mclass.__name__, metric))
420
                self._metrics.append(metric)
421
        self._meta = Meta()
422
423
    def __str__(self):
424
        """Return a human readable str.
425
426
        :rtype: str
427
        """
428
        sval = "Result(name={0}, metrics={1})"
429
        return sval.format(self._name, self._metrics)
430
431
    def __repr__(self):
432
        """Return a human readable str.
433
434
        :rtype: str
435
        """
436
        sval = "Result(name={0}, metrics={1})"
437
        return sval.format(self._name, self._metrics)
438
439
    @property
440
    def name(self):
441
        """Return the Result name."""
442
        return self._name
443
444
    @property
445
    def metrics(self):
446
        """Return the Results list of metrics."""
447
        return self._metrics
448
449
    @property
450
    def meta(self):
451
        """Return the results metadata object."""
452
        return self._meta
453
454
    def add(self, metric):
455
        """Add a metric to our list of metrics.
456
457
        raises:
458
            ValueError if the metric is not a supported type
459
460
        :param metric: One of the metric types (Int, Float, String, etc)
461
        :type metric: Int or Float or String or Boolean or Counter or Gauge
462
        :raises: ValueError
463
        """
464
        mclass = metric.__class__
465
        if mclass not in self.types:
466
            err = "Class not supported: class: {0} : metric: {1}"
467
            raise ValueError(err.format(mclass.__name__, metric))
468
        self._metrics.append(metric)
469
470
    def add_list(self, metrics):
471
        """Add a list of metrics to our list of metrics.
472
473
        raises:
474
            ValueError if any metrics are not a supported type
475
476
        :param metrics: list of metrics (Int, Float, String, etc)
477
        :type metrics: list
478
        :raises: ValueError
479
        """
480
        for metric in metrics:
481
            mclass = metric.__class__
482
            if mclass not in self.types:
483
                err = "Class not supported: class: {0} : metric: {1}"
484
                raise ValueError(err.format(mclass.__name__, metric))
485
            self._metrics.append(metric)
486
487
488
class ResultSet(object):
489
    """A class to encapsulate a series of measurement Results during a plugins
490
    poll() call. Each poll() must return a ResultSet object.
491
492
    The class has a list of Result objects and a timestamp. The timestamp
493
    is set to the current UTC time when the object is created. This is done
494
    since each poll() call should normally complete within ms or us and the
495
    target audience for the recorded metrics are eg. grafana graphs and
496
    system alerts with resolutions down to 1 second typically. The difference
497
    in time between the first recorded metric and last is generally going to
498
    be much less than 1 second.
499
500
    :Example:
501
502
    >>>import plumd
503
    >>>metrics = [ plumd.Float("idle", 0.0), plumd.Float("user", 100.0) ]
504
    >>>res = Result("cpu", metrics)
505
    >>>rset = ResultSet(results=[res])
506
507
    >>>metrics = [ plumd.Float("free", 80.0), plumd.Float("used", 20.0) ]
508
    >>>res = Result("disk", metrics)
509
    >>>res.meta.add(plumd.String("dev", "sda1"))
510
    >>>rset = ResultSet(results=[res])
511
    """
512
513
    def __init__(self, results=None):
514
        """Class to encapsulate a collection of metrics eg. from poll()."""
515
        self._results = list() if results is None else list(results)
516
        self._time = time.time()
517
518
    @property
519
    def time(self):
520
        """Return our timestamp.
521
522
        :rtype: float
523
        """
524
        return self._time
525
526
    @property
527
    def nresults(self):
528
        """Return the number of results recorded so far.
529
530
        :rtype: int
531
        """
532
        return len(self._results)
533
534
    @property
535
    def results(self):
536
        """Yield a tuple for each Result recorded in the format:
537
538
        ( time, result_name, result_meta, [metric, metric, metric] )
539
540
        :rtype: generator
541
        """
542
        for robj in self._results:
543
            yield (self._time, robj.name, robj.meta, robj.metrics)
544
545
    def __str__(self):
546
        """Return a human readable str.
547
548
        :rtype: str
549
        """
550
        sval = "ResultSet(results={0}, time={1})"
551
        return sval.format(len(self._results), self._time)
552
553
    def __repr__(self):
554
        """Return a human readable str.
555
556
        :rtype: str
557
        """
558
        sval = "ResultSet(results={0}, time={1})"
559
        return sval.format(len(self._results), self._time)
560
561
    def add(self, result):
562
        """Record a result in this ResultSet.
563
564
        :param result: a Result object
565
        :type results: Result
566
        """
567
        if not isinstance(result, Result):
568
            raise ValueError("Invalid result: {0}".format(result))
569
        self._results.append(result)
570
571
    def add_list(self, results):
572
        """Record a list of results in this ResultSet.
573
574
        :param results: a list of Result objects
575
        :type results: list
576
        """
577
        for result in results:
578
            if not isinstance(result, Result):
579
                raise ValueError("Invalid result: {0}".format(result))
580
            self._results.append(result)
581
582
583
class Render(object):
584
    """A helper class to render metrics, buffer them and return chunks of
585
    metrics. Used as a superclass for the various metric writers (eg.
586
    influxdb, graphite, etc).
587
588
    The idea is to feed this object a ResultSet on each call to a writer
589
    plugins push() and then consume chunks of metrics from it until a full
590
    chunk cannot be returned.
591
592
    To use this, a writer plugin subclasses Render and defines the process
593
    method.The plugins implementation of process() should format the
594
    metrics it gets passed in a format suitable for sending to its backend
595
    and store them in the metrics dequeue.
596
597
    The plugin can then create an instance of it's render object and in
598
    push() simply call instance.process(results) and instance.get_chunk()
599
    to get the next chunk of metrics.
600
601
    On shutdown it can also call instance.get_chunk(partial=True) to get
602
    the remaining metrics. Normally it would want to fetch full batches
603
    of metrics.
604
605
    :param rconfig: config.Conf object passed from a writer plugin instance
606
    :type rconfig: :class:`config.Conf`
607
    """
608
609
    __metaclass__ = ABCMeta
610
611
    defaults = {
612
        'max_queue': 8192,      # maxlen of deque used for metrics
613
        'max_items': 64,        # maximum number of items to send at once
614
        'max_bytes': 1400,      # maximum number of bytes to send at once
615
        'meta': {}              # metadata to add to each metric
616
    }
617
618
    def __init__(self, rconfig):
619
        """A helper class to render metrics, buffer them and return chunks of
620
        metrics. Used as a superclass for the various metric writers (eg.
621
        influxdb, graphite, etc).
622
623
        The idea is to feed this object a ResultSet on each call to a writer
624
        plugins push() and then consume chunks of metrics from it until a full
625
        chunk cannot be returned.
626
627
        To use this, a writer plugin subclasses Render and defines the process
628
        method.The plugins implementation of process() should format the
629
        metrics it gets passed in a format suitable for sending to its backend
630
        and store them in the metrics dequeue.
631
632
        The plugin can then create an instance of it's render object and in
633
        push() simply call instance.process(results) and instance.get_chunk()
634
        to get the next chunk of metrics.
635
636
        On shutdown it can also call instance.get_chunk(partial=True) to get
637
        the remaining metrics. Normally it would want to fetch full batches
638
        of metrics.
639
640
        :param rconfig: config.Conf object passed from a writer plugin instance
641
        :type rconfig: :class:`config.Conf`
642
        """
643
        # ensure default configuration is set
644
        rconfig.defaults(Render.defaults)
645
        self.max_bytes = rconfig.get('max_bytes')
646
        self.max_items = rconfig.get('max_items')
647
        # Use a double ended queue with a maximum size
648
        maxlen = rconfig.get('max_queue')
649
        self.metrics = collections.deque(maxlen=maxlen)
650
        # define a metadata object and copy in any configured host metadata
651
        self.meta = Meta()
652
        clookup = {
653
            int: Int,
654
            float: Float,
655
            str: String,
656
            bool: Boolean
657
        }
658
        for entry in rconfig.get('meta'):
659
            for key, val in entry.items():
660
                if val.__class__ not in clookup:
661
                    args = (key, val.__class__.__name__)
662
                    err = "host metadata: unsupported type: {0}={1}".format(
663
                        *args)
664
                    raise ValueError(err)
665
                self.meta.add(clookup[val.__class__](key, val))
666
667
    def __iter__(self):
668
        """A Render iterator returns rendered metrics where the metrics are
669
        each as close to self.max_bytes in size as possible.
670
671
        :rtype: iterator
672
        """
673
        return self
674
675
    def __next__(self):
676
        """Return the next metric from the deque, or raise StopIteration() if
677
        there are no more metrics.
678
679
        :rtype: object
680
        """
681
        if len(self.metrics) < 1:
682
            raise StopIteration()
683
        return self.metrics.popleft()
684
685
    def next(self):
686
        """Return the next metric from the deque, or raise StopIteration() if
687
        there are no more metrics.
688
689
        :rtype: object
690
        """
691
        if len(self.metrics) < 1:
692
            raise StopIteration()
693
        return self.metrics.popleft()
694
695
    @abstractmethod
696
    def process(self, rset):
697
        """Record a result set in our backends format. Subclasses for specific
698
        backends must define this method.
699
700
        :param rset: A :class:`ResultSet` object
701
        :type rset: :class:`ResultSet`
702
        """
703
        raise NotImplementedError("process must be defined by a subclass")
704
705
706
class PluginLoader(object):
707
    """Load and control the reader and writer objects defined by
708
    the provided config.
709
710
    raises:
711
        ConfigError if a plugin is configured with an invalid path
712
        ConfigError if a plugin configuration is missing 'name'
713
        DuplicatePlugin if a plugin configuration has a duplicate 'name'
714
        PluginLoadError if there was an error loading the plugin
715
716
    :param config: a simple configuration helper object.
717
    :type config: plumd.config.conf
718
    :param log: a logger object
719
    :type log: logger
720
    :raises: ConfigError, DuplicatePlugin, PluginLoadError
721
    """
722
723
    def __init__(self, log, config):
724
        """Load and control the reader and writer objects defined by
725
        the provided config.
726
727
        raises:
728
            ConfigError if a plugin is configured with an invalid path
729
            ConfigError if a plugin configuration is missing 'name'
730
            DuplicatePlugin if a plugin configuration has a duplicate 'name'
731
            PluginLoadError if there was an error loading the plugin
732
733
        :param config: a simple configuration helper object.
734
        :type config: plumd.config.conf
735
        :param log: a logger object from structlog.get_logger()
736
        :type log: logger
737
        :raises: ConfigError, DuplicatePlugin, PluginLoadError
738
        """
739
        self.log = log
740
        msg = "initializing from: {0} conf: \n{1}"
741
        self.log.debug(msg.format(config.path, config))
742
        self.config = config
743
        self.readers = {}  # reader plugin io objects
744
        self.writers = {}  # writer plugin io objects
745
        self.load()
746
747
    def __str__(self):
748
        """Return a human readable str.
749
750
        :rtype: str
751
        """
752
        sval = "readers={0}, writers={1}".format(list(self.readers.keys()),
753
                                                 list(self.writers.keys()))
754
        return sval
755
756
    def __repr__(self):
757
        """Return a human readable str.
758
759
        :rtype: str
760
        """
761
        sval = "readers={0}, writers={1}".format(list(self.readers.keys()),
762
                                                 list(self.writers.keys()))
763
        return sval
764
765
    @property
766
    def nplugins(self):
767
        """Return the number of loaded plugins.
768
769
        :rtype: int
770
        """
771
        return len(list(self.readers.keys())) + len(list(self.writers.keys()))
772
773
    @property
774
    def nreaders(self):
775
        """Return the number of loaded reader plugins.
776
777
        :rtype: int
778
        """
779
        return len(list(self.readers.keys()))
780
781
    @property
782
    def nwriters(self):
783
        """Return the number of loaded writer plugins.
784
785
        :rtype: int
786
        """
787
        return len(list(self.writers.keys()))
788
789
    def load(self):
790
        """Load the configured plugin objects into self.readers and
791
        self.writers. Set the writers for each reader plugin.
792
793
        raises:
794
            ConfigError if ptype has an incorrect configuration path set
795
            ConfigError if a plugin configuration is missing 'name'
796
            DuplicatePlugin if a plugin configuration has a duplicate 'name'
797
            PluginLoadError if there was an error loading a plugin
798
799
        :raises: ConfigError, DuplicatePlugin
800
        """
801
        # get plugin configuration directory
802
        pdir = self.config.get('config.plugins')
803
        if not os.path.isdir(pdir):
804
            msg = "invalid plugin directory configured: {0}"
805
            raise ConfigError(msg.format(pdir))
806
807
        # load all plugins described by the plugin configurations
808
        msg = "loading plugins from: {0}"
809
        self.log.info(msg.format(self.config.get('config.plugins')))
810
        pobjs = load_all_plugins(self.log, self.config)
811
812
        # load_all_plugins returns a tuple of reader, writer objects
813
        self.readers, self.writers = pobjs
814
815
        # now the readers need to know what writers to write to
816
        config_plugin_writers(self)
817
818
    def start(self):
819
        """Start each reader/writer."""
820
        self.log.debug("starting")
821
822
        # start the writers then the readers
823
        for pdict in [self.writers, self.readers]:
824
            for pobj in pdict.values():
825
                self.log.debug("starting: {0}".format(pobj.name))
826
                pobj.start()
827
828
        # all done
829
        self.log.debug("started")
830
831
    def stop(self):
832
        """Stop each reader/writer."""
833
        self.log.debug("stopping plugins")
834
835
        # stop the readers first - they call each plugins onstop()
836
        self.log.debug("stopping readers")
837
        for pobj in list(self.readers.values()):
838
            self.log.debug("stopping reader: {0}".format(pobj.name))
839
            pobj.stop()
840
841
        # next stop the writers - they call each plugins onstop()
842
        self.log.debug("stopping writers")
843
        for pobj in list(self.writers.values()):
844
            self.log.debug("stopping writer: {0}".format(pobj.name))
845
            pobj.stop()
846
847
        # join threads now
848
        for pdict in [self.readers, self.writers]:
849
            for pname, pobj in list(pdict.items()):
850
                self.log.debug("waiting for plugin to stop: {0}".format(pname))
851
                pobj.join(timeout=self.config.get('shutdown_timeout'))
852
853
        # all done
854
        self.log.debug("stopped")
855
856
    def __del__(self):
857
        """Log __del__ calls."""
858
        self.log.debug("PluginLoader: __del__")
859
860
861
class Plugin(object):
862
    """The base class for all plugins - each plugin must sub class.
863
864
    :param config: an instance of plumd.conf configuration helper
865
    :type config: conf
866
    """
867
868
    __metaclass__ = ABCMeta
869
    defaults = {}
870
871
    def __init__(self, log, config):
872
        """The base class for all plugins - each plugin must sub class.
873
874
        :param config: an instance of plumd.conf configuration helper
875
        :type config: conf
876
        """
877
        self.config = config
878
        self.log = log
879
        # plugins can set their own defaults byt defining self.defaults
880
        self.config.defaults(self.defaults)
881
        # all plugins must be given a unique name
882
        self.name = config.get('name', 'unknown')
883
        # thread for reading or writing
884
        self.thread = None
885
        self.stop_evt = threading.Event()
886
887
    def __str__(self):
888
        """Return a human readable str.
889
890
        :rtype: str
891
        """
892
        sval = "Plugin(class={0})".format(self.__class__.__name__)
893
        return sval
894
895
    def __repr__(self):
896
        """Return a human readable str.
897
898
        :rtype: str
899
        """
900
        sval = "Plugin(class={0})".format(self.__class__.__name__)
901
        return sval
902
903
    def start(self):
904
        """Start run() in a new thread if not already running."""
905
        if self.thread is None or not self.thread.is_alive():
906
            self.thread = threading.Thread(target=self.run, group=None)
907
            self.thread.start()
908
            self.log.debug("Plugin {0} started".format(self.name))
909
        else:
910
            self.log.debug("Plugin {0} already running".format(self.name))
911
912
    def run(self):
913
        """Plugin sub classes must overide this."""
914
        raise NotImplementedError("Invalid Plugin")
915
916
    def stop(self):
917
        """Set our stop_evt Event."""
918
        # set the stop event and the self.read thread falls through.
919
        self.stop_evt.set()
920
921
    def join(self, timeout):
922
        """Join our thread."""
923
        if self.thread:
924
            self.thread.join(timeout)
925
            if self.thread.is_alive():
926
                msg = "Plugin {0} shutdown timeout - thread blocking exit"
927
                self.log.error(msg.format(self.name))
928
929
    def onstart(self):
930
        """Call before the first call to push()."""
931
        self.log.debug("Plugin: onstart: {0}".format(self.config.get('name')))
932
933
    def onstop(self):
934
        """Call just before the main process exits."""
935
        self.log.debug("Plugin: onstop: {0}".format(self.config.get('name')))
936
937
    def __del__(self):
938
        """Log __del__ calls."""
939
        self.log.debug("Plugin: {0} __del__".format(self.name))
940
941
942
class Reader(Plugin):
943
    """An abstract base class that all writer pluguns subclass."""
944
945
    __metaclass__ = ABCMeta
946
947
    defaults = {
948
        'poll.interval': 30,    # seconds between polls
949
        'delay.poll': 5         # random delay before first poll
950
    }
951
952
    def __init__(self, log, config):
953
        """Reader plugin abstract class.
954
955
        :param log: A logger
956
        :type log: logging.RootLogger
957
        :param config: a plumd.config.Conf configuration helper instance.
958
        :type config: plumd.config.Conf
959
        """
960
        super(Reader, self).__init__(log, config)
961
        self.config.defaults(Reader.defaults)
962
        # list of queues to write to
963
        self.queues = []
964
965
    def run(self):
966
        """Call the plugins onstart() function and then iterate over the
967
        plugins poll() function, pushing the result to each writer queue.
968
969
        When complete it calls the plugins onstop() function.
970
        """
971
        # starting up, call onstart
972
        self.onstart()
973
974
        # distribute metrics - don't everyone flood all at once
975
        time.sleep(random.randrange(int(self.config.get('delay.poll'))))
976
977
        # save a few cycles
978
        tloop = self.config.get('poll.interval')
979
        evt = self.stop_evt
980
        queues = self.queues
981
982
        # loop on calling the plugin objects poll and sending to writers
983
        while not evt.is_set():
984
985
            # loop every self.interval seconds
986
            with Interval(tloop, evt):
987
988
                # poll for metric result set
989
                rset = self.poll()
990
991
                # readers can return None values eg. a reader that polls
992
                # very often and periodically returns results
993
                if rset is None:
994
                    continue
995
996
                # write the value to each of our writer queues directly
997
                for queue_evt, queue in queues:
998
                    queue.append(rset)
999
                    queue_evt.set()
1000
1001
        # all done, call onstop
1002
        self.onstop()
1003
1004
    @abstractmethod
1005
    def poll(self):
1006
        """Reader plugins must define this method, it measures and returns a
1007
        metrics string.
1008
1009
        format <name:str> <value:float|int> <timestamp:time.time()>\n[...]
1010
        set to change to a python object.
1011
1012
        :rtype: str
1013
        """
1014
        # this will never get called
1015
        raise NotImplementedError("poll() not defined")
1016
1017
1018
class Writer(Plugin):
1019
    """An abstract base class that all writer pluguns subclass."""
1020
1021
    __metaclass__ = ABCMeta
1022
1023
    defaults = {
1024
        'maxqueue': 8192
1025
    }
1026
1027
    def __init__(self, log, config):
1028
        """Reader plugin abstract class.
1029
1030
        :param log: A logger
1031
        :type log: logging.RootLogger
1032
        :param config: a plumd.config.Conf configuration helper instance.
1033
        :type config: plumd.config.Conf
1034
        """
1035
        super(Writer, self).__init__(log, config)
1036
        self.config.defaults(Writer.defaults)
1037
        # list of queues to write to
1038
        self.queue = deque()
1039
        # Event signaling items on queue
1040
        self.queue_evt = threading.Event()
1041
1042
    def run(self):
1043
        """Call the plugins onstart() function and then consume
1044
        ResultSets from self.queue, calling self.push() for each.
1045
1046
        When complete it calls the plugins onstop() function.
1047
        """
1048
        # starting up, call onstart
1049
        self.onstart()
1050
1051
        # save a few cycles
1052
        queue_evt = self.queue_evt
1053
        queue = self.queue
1054
1055
        # loop on popping from queue and calling self.push()
1056
        while not self.stop_evt.is_set():
1057
            # wait for a ResultSet
1058
            queue_evt.wait()
1059
            # clear the event - race condition however
1060
            # next reader poll() will set the queue_evt again
1061
            queue_evt.clear()
1062
            try:
1063
                # write it
1064
                self.push(queue.popleft())
1065
            except IndexError:
1066
                self.log.debug("Writer: {0}: queue empty".format(self.name))
1067
1068
        # all done, call onstop
1069
        self.onstop()
1070
1071
    def stop(self):
1072
        """Set our stop_evt and queue_evt Events."""
1073
        # set the stop event and the self.read thread falls through.
1074
        self.stop_evt.set()
1075
        self.queue_evt.set()
1076
1077
    @abstractmethod
1078
    def push(self, rset):
1079
        """Writer plugins must define this method, it accepts a ResultSet.
1080
1081
        :param rset: ResultSet from a readers poll() call.
1082
        :type rset: plumd.ResultSet
1083
        :type metrics: str
1084
        """
1085
        # this will never get called
1086
        raise NotImplementedError("push() not defined")
1087