Completed
Push — master ( 08acac...d85769 )
by Kenny
01:15
created

Plugin.run()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 3
rs 10
cc 1
1
# -*- coding: utf-8 -*-
2
"""There are many differences in the various time series databases. Plumd is
3
designed to support multiple backends using the classes defined here to
4
represent metrics in a uniform way:
5
6
    ResultSet - a collection of one or more Results from a call to poll()
7
    Result - a measurement of a single thing eg. disk free=0.0,used=100.0
8
    Meta - metadata associated with a Result eg. dev=sda1
9
    Int/Float/String/Boolean/Gauge/Rate/Counter/Timer - primitives for above
10
11
The ResultSet class is used to encapsulate all of the measurements taken
12
during a plugins poll() call and uses a single timestamp for that poll.
13
14
The Result class is used to encapsulate a single measurement - for example
15
measuring cpu utilization may result in idle, user, system etc values. These
16
would be recorded in a single Result object. In addition the Result class
17
allows for arbitrary metadata to be associated with a Result - eg. when
18
recording disk space there may be used, free, total however also metadata such
19
as dev=sda1 to associate the measurements with a specific block device.
20
21
The Meta class is used to encapsulate arbitrary metadata. The Result class
22
uses it to associate metadata with a measurement however it can also be used
23
for example to describe host level metadata (eg. hostname=<hostname>). The
24
Meta class allows writer plugins such as graphite to render the metadata as
25
part of the metric path (eg. servers.host.<hostname>.disk.used.dev.<device>) -
26
if there was no distiction between metadata key=value and metric key=value this
27
would not be possible. Also, the Meta class ensures that metadata keys are
28
unique.
29
30
The _Metrics classes are used to allow plumd to render metrics in a specific
31
format. For example influxdb wants integer values to end with an i when sending
32
via their line protocol. The various _Metric classes force the plugin to
33
explicitly say what each metric is (eg. Int, Float, String, Boolean, Counter,
34
Gauge, Timer, etc). Metrics are defined as namedtuples for reduced memory
35
utilization.
36
37
The Render class is an abstract base class that writer plugins can subclass. The
38
render() function needs to be defined so that it returns a formatted metric. A
39
writer plugin would then instantiate the subclassed Render and use it to record
40
formatted metrics an each push() call. It can then step through the passed
41
metrics in chunks and send them to its backend. Having the metric types defined
42
explicitly allows the writers renderer to format metrics for arbitrary backends.
43
44
45
.. moduleauthor:: Kenny Freeman <[email protected]>
46
47
"""
48
49
import os
50
import time
51
import random
52
import logging
53
import threading
54
import collections
55
from collections import deque
56
from abc import ABCMeta, abstractmethod
57
58
from plumd.util import Interval, load_all_plugins, config_plugin_writers
59
60
__author__ = 'Kenny Freeman'
61
__email__ = '[email protected]'
62
__version__ = '0.7'
63
__license__ = "ISCL"
64
__docformat__ = 'reStructuredText'
65
66
67
DEFAULT_CONFIG_FILE = "/etc/plumd.yaml"    # path to main configuration file
68
69
DEFAULT_CONFIG = {
70
    'log.level':        "warn",               # crit, error, warn, info, debug
71
    'log.format':       "[%(asctime)s] %(levelname)s %(process)d %(message)s",
72
    'config.plugins':   "/etc/plumd.conf.d/",  # path to the plugin confs
73
    'delay.startup':    1,                    # random delay for plugin start
74
    'delay.poll':       1,                    # random delay for first poll()
75
    'poll.interval':    15,                   # interval of reader poll() calls
76
    'max.queue':        512,                  # maximum size of internal queues
77
    'meta':             [],                   # key=value's for all metrics
78
    'meta.hostname':    True,                 # set and use hostname in meta
79
    'shutdown_timeout': 10                    # thread shutdown timeout
80
}
81
82
83
LOG_LEVELS = {                                # map config level to python level
84
    'crit':  logging.CRITICAL,
85
    'error': logging.ERROR,
86
    'warn':  logging.WARN,
87
    'info':  logging.INFO,
88
    'debug': logging.DEBUG
89
}
90
91
92
class DuplicatePlugin(Exception):
93
    """Two or more plugins with the same id were defined."""
94
95
    pass
96
97
98
class PluginNotFoundError(Exception):
99
    """A plugin file, module or class was not found."""
100
101
    pass
102
103
104
class PluginLoadError(Exception):
105
    """Exception(s) were raised while loading a plugin."""
106
107
    pass
108
109
110
class PluginRuntimeError(Exception):
111
    """Exception(s) were raised while running a plugin."""
112
113
    pass
114
115
116
class ConfigError(Exception):
117
    """Invalid configuration encountered."""
118
119
    pass
120
121
122
# _Metric : the base class of all metric primitives is a named tuple.
123
_Metric = collections.namedtuple('metric', ['name', 'value'])
124
125
126
class Int(_Metric):
127
    """An integer metric - ensures the value passed is an int.
128
129
    raises:
130
        ValueError if the passed value doesn't cast properly to int
131
132
    :param name: The name of the metric
133
    :type name: str
134
    :param value: The recorded value of the metric
135
    :type value: int
136
    :raises: ValueError
137
    """
138
139
    def __new__(cls, name, value):
140
        """An integer metric - ensures the value passed is an int.
141
142
        raises:
143
            ValueError if the passed value doesn't cast properly to int
144
145
        :param name: The name of the metric
146
        :type name: str
147
        :param value: The recorded value of the metric
148
        :type value: int
149
        :raises: ValueError
150
        """
151
        value = int(value)
152
        cls = super(Int, cls).__new__(cls, name, value)
153
        return cls
154
155
156
class Float(_Metric):
157
    """A float metric.
158
159
    raises:
160
        ValueError if the passed value doesn't cast properly to float
161
162
    :param name: The name of the metric
163
    :type name: str
164
    :param value: The recorded value of the metric
165
    :type value: float
166
    :raises: ValueError
167
    """
168
169
    def __new__(cls, name, value):
170
        """A float metric.
171
172
        raises:
173
            ValueError if the passed value doesn't cast properly to float
174
175
        :param name: The name of the metric
176
        :type name: str
177
        :param value: The recorded value of the metric
178
        :type value: float
179
        :raises: ValueError
180
        """
181
        value = float(value)
182
        cls = super(Float, cls).__new__(cls, name, value)
183
        return cls
184
185
186
class String(_Metric):
187
    """A string metric.
188
189
    raises:
190
        ValueError if the passed value doesn't cast properly to str
191
192
    :param name: The name of the metric
193
    :type name: str
194
    :param value: The recorded value of the metric
195
    :type value: str
196
    :raises: ValueError
197
    """
198
199
    def __new__(cls, name, value):
200
        """A string metric.
201
202
        raises:
203
            ValueError if the passed value doesn't cast properly to str
204
205
        :param name: The name of the metric
206
        :type name: str
207
        :param value: The recorded value of the metric
208
        :type value: str
209
        :raises: ValueError
210
        """
211
        value = str(value)
212
        cls = super(String, cls).__new__(cls, name, value)
213
        return cls
214
215
216
class Boolean(_Metric):
217
    """A boolean metric.
218
219
    raises:
220
        ValueError if the passed value doesn't cast properly to bool
221
222
    :param name: The name of the metric
223
    :type name: str
224
    :param value: The recorded value of the metric
225
    :type value: bool
226
    :raises: ValueError
227
    """
228
229
    def __new__(cls, name, value):
230
        """A boolean metric.
231
232
        raises:
233
            ValueError if the passed value doesn't cast properly to bool
234
235
        :param name: The name of the metric
236
        :type name: str
237
        :param value: The recorded value of the metric
238
        :type value: bool
239
        :raises: ValueError
240
        """
241
        value = bool(value)
242
        cls = super(Boolean, cls).__new__(cls, name, value)
243
        return cls
244
245
246
class Gauge(Float):
247
    """A gauge value - this is always a Float value.
248
249
250
    raises:
251
        ValueError if the passed value doesn't cast properly to int
252
253
    :param name: The name of the metric
254
    :type name: str
255
    :param value: The recorded value of the metric
256
    :type value: float
257
    :raises: ValueError
258
    """
259
    pass
260
261
262
class Counter(Int):
263
    """A metric that counts things - this is always an Integer value.
264
265
266
    raises:
267
        ValueError if the passed value doesn't cast properly to int
268
269
    :param name: The name of the metric
270
    :type name: str
271
    :param value: The recorded value of the metric
272
    :type value: int
273
    :raises: ValueError
274
    """
275
    pass
276
277
278
class Rate(Float):
279
    """A metric that describes a rate - this is always a Float.
280
281
    raises:
282
        ValueError if the passed value doesn't cast properly to float
283
284
    :param name: The name of the metric
285
    :type name: str
286
    :param value: The recorded value of the metric
287
    :type value: float
288
    :raises: ValueError
289
    """
290
    pass
291
292
293
class Timer(Float):
294
    """A metric that describes a timer value - this is always a Float.
295
296
    raises:
297
        ValueError if the passed value doesn't cast properly to float
298
299
    :param name: The name of the metric
300
    :type name: str
301
    :param value: The recorded value of the metric
302
    :type value: float
303
    :raises: ValueError
304
    """
305
    pass
306
307
308
class Meta(object):
309
    """Encapsulate generic key=value metadata however ensure that the
310
    values recorded are one of the four supported base types of Int, Float,
311
    String and Boolean. Also ensures keys are unique.
312
313
    :param metas: Variable list of values.
314
    :type metas: Int or Float or String or Boolean
315
    :raises: ValueError if metas is not one of the expected types or if any of
316
        the keys already exist.
317
    """
318
319
    types = [Int, Float, String, Boolean]
320
321
    def __init__(self):
322
        """Encapsulate generic key=value metadata however ensure that the
323
        values recorded are one of the four supported base types of Int, Float,
324
        String and Boolean. Also ensures keys are unique.
325
326
        :param metas: Variable list of values.
327
        :type metas: Int or Float or String or Boolean
328
        :raises: ValueError if metas is not one of the expected types or if any
329
            of the keys already exist.
330
        """
331
        self._meta = collections.OrderedDict()
332
333
    def __str__(self):
334
        """Return a human readable str.
335
336
        :rtype: str
337
        """
338
        sval = "Meta(items={0})"
339
        return sval.format(sorted(self._meta.keys()))
340
341
    def __repr__(self):
342
        """Return a human readable str.
343
344
        :rtype: str
345
        """
346
        sval = "Meta(items={0})"
347
        return sval.format(sorted(self._meta.keys()))
348
349
    @property
350
    def nkeys(self):
351
        """Return the number of key=value pairs."""
352
        return len(list(self._meta.keys()))
353
354
    @property
355
    def keys(self):
356
        """Return a list of metadata keys.
357
358
        :rtype: :class:`list`
359
        """
360
        return self._meta.keys()
361
362
    @property
363
    def items(self):
364
        """Return a (by default) sorted list of (key, value) tuples.
365
366
        :rtype: :class:`list`
367
        """
368
        return self._meta.items()
369
370
    def add(self, val):
371
        """Record a value to our metadata.
372
373
        :param val: The value to add.
374
        :type val: Int or Float or String or Boolean
375
        :raises: ValueError if val is an unexpected type or the
376
            keys already exist.
377
        """
378
        if val.__class__ not in self.types:
379
            cname = val.__class__.__name__
380
            err = "Class not supported: class: {0} : value: {1}"
381
            raise ValueError(err.format(cname, val))
382
        # using indexes: 0 => name, 1 => value
383
        if val[0] not in self._meta:
384
            self._meta[val[0]] = val
385
            return
386
        raise ValueError("key exists: {0}".format(val[0]))
387
388
389
class Result(object):
390
    """Encapsulate a set of metrics and an optional Meta metadata object.
391
392
    :param name: The name of the result eg. cpu
393
    :type name: str
394
    :param metrics: optional list of metrics
395
    :type metrics: Int or Float or String or Boolean or Gauge or Counter
396
        or Rate or Timer
397
    """
398
399
    types = [Int, Float, String, Boolean, Gauge, Counter, Rate, Timer]
400
401
    def __init__(self, name, metrics=None):
402
        """Encapsulate a set of metrics and an optional Meta metadata object.
403
404
        :param name: The name of the result eg. cpu
405
        :type name: str
406
        :param metrics: optional list of metrics
407
        :type metrics: Int or Float or String or Boolean or Gauge or Counter
408
            or Rate or Timer
409
        """
410
        self._name = name
411
        self._metrics = list()
412
        if metrics:
413
            for metric in metrics:
414
                mclass = metric.__class__
415
                if mclass not in self.types:
416
                    err = "Class not supported: class: {0} : metric: {1}"
417
                    raise ValueError(err.format(mclass.__name__, metric))
418
                self._metrics.append(metric)
419
        self._meta = Meta()
420
421
    def __str__(self):
422
        """Return a human readable str.
423
424
        :rtype: str
425
        """
426
        sval = "Result(name={0}, metrics={1})"
427
        return sval.format(self._name, self._metrics)
428
429
    def __repr__(self):
430
        """Return a human readable str.
431
432
        :rtype: str
433
        """
434
        sval = "Result(name={0}, metrics={1})"
435
        return sval.format(self._name, self._metrics)
436
437
    @property
438
    def name(self):
439
        """Return the Result name."""
440
        return self._name
441
442
    @property
443
    def metrics(self):
444
        """Return the Results list of metrics."""
445
        return self._metrics
446
447
    @property
448
    def meta(self):
449
        """Return the results metadata object."""
450
        return self._meta
451
452
    def add(self, metric):
453
        """Add a metric to our list of metrics.
454
455
        raises:
456
            ValueError if the metric is not a supported type
457
458
        :param metric: One of the metric types (Int, Float, String, etc)
459
        :type metric: Int or Float or String or Boolean or Counter or Gauge
460
        :raises: ValueError
461
        """
462
        mclass = metric.__class__
463
        if mclass not in self.types:
464
            err = "Class not supported: class: {0} : metric: {1}"
465
            raise ValueError(err.format(mclass.__name__, metric))
466
        self._metrics.append(metric)
467
468
    def add_list(self, metrics):
469
        """Add a list of metrics to our list of metrics.
470
471
        raises:
472
            ValueError if any metrics are not a supported type
473
474
        :param metrics: list of metrics (Int, Float, String, etc)
475
        :type metrics: list
476
        :raises: ValueError
477
        """
478
        for metric in metrics:
479
            mclass = metric.__class__
480
            if mclass not in self.types:
481
                err = "Class not supported: class: {0} : metric: {1}"
482
                raise ValueError(err.format(mclass.__name__, metric))
483
            self._metrics.append(metric)
484
485
486
class ResultSet(object):
487
    """A class to encapsulate a series of measurement Results during a plugins
488
    poll() call. Each poll() must return a ResultSet object.
489
490
    The class has a list of Result objects and a timestamp. The timestamp
491
    is set to the current UTC time when the object is created. This is done
492
    since each poll() call should normally complete within ms or us and the
493
    target audience for the recorded metrics are eg. grafana graphs and
494
    system alerts with resolutions down to 1 second typically. The difference
495
    in time between the first recorded metric and last is generally going to
496
    be much less than 1 second.
497
498
    :Example:
499
500
    >>>import plumd
501
    >>>metrics = [ plumd.Float("idle", 0.0), plumd.Float("user", 100.0) ]
502
    >>>res = Result("cpu", metrics)
503
    >>>rset = ResultSet(results=[res])
504
505
    >>>metrics = [ plumd.Float("free", 80.0), plumd.Float("used", 20.0) ]
506
    >>>res = Result("disk", metrics)
507
    >>>res.meta.add(plumd.String("dev", "sda1"))
508
    >>>rset = ResultSet(results=[res])
509
    """
510
511
    def __init__(self, results=None):
512
        """Class to encapsulate a collection of metrics eg. from poll()."""
513
        self._results = list() if results is None else list(results)
514
        self._time = time.time()
515
516
    @property
517
    def time(self):
518
        """Return our timestamp.
519
520
        :rtype: float
521
        """
522
        return self._time
523
524
    @property
525
    def nresults(self):
526
        """Return the number of results recorded so far.
527
528
        :rtype: int
529
        """
530
        return len(self._results)
531
532
    @property
533
    def results(self):
534
        """Yield a tuple for each Result recorded in the format:
535
536
        ( time, result_name, result_meta, [metric, metric, metric] )
537
538
        :rtype: generator
539
        """
540
        for robj in self._results:
541
            yield (self._time, robj.name, robj.meta, robj.metrics)
542
543
    def __str__(self):
544
        """Return a human readable str.
545
546
        :rtype: str
547
        """
548
        sval = "ResultSet(results={0}, time={1})"
549
        return sval.format(len(self._results), self._time)
550
551
    def __repr__(self):
552
        """Return a human readable str.
553
554
        :rtype: str
555
        """
556
        sval = "ResultSet(results={0}, time={1})"
557
        return sval.format(len(self._results), self._time)
558
559
    def add(self, result):
560
        """Record a result in this ResultSet.
561
562
        :param result: a Result object
563
        :type results: Result
564
        """
565
        if not isinstance(result, Result):
566
            raise ValueError("Invalid result: {0}".format(result))
567
        self._results.append(result)
568
569
    def add_list(self, results):
570
        """Record a list of results in this ResultSet.
571
572
        :param results: a list of Result objects
573
        :type results: list
574
        """
575
        for result in results:
576
            if not isinstance(result, Result):
577
                raise ValueError("Invalid result: {0}".format(result))
578
            self._results.append(result)
579
580
581
class Render(object):
582
    """A helper class to render metrics, buffer them and return chunks of
583
    metrics. Used as a superclass for the various metric writers (eg.
584
    influxdb, graphite, etc).
585
586
    The idea is to feed this object a ResultSet on each call to a writer
587
    plugins push() and then consume chunks of metrics from it until a full
588
    chunk cannot be returned.
589
590
    To use this, a writer plugin subclasses Render and defines the process
591
    method.The plugins implementation of process() should format the
592
    metrics it gets passed in a format suitable for sending to its backend
593
    and store them in the metrics dequeue.
594
595
    The plugin can then create an instance of it's render object and in
596
    push() simply call instance.process(results) and instance.get_chunk()
597
    to get the next chunk of metrics.
598
599
    On shutdown it can also call instance.get_chunk(partial=True) to get
600
    the remaining metrics. Normally it would want to fetch full batches
601
    of metrics.
602
603
    :param rconfig: config.Conf object passed from a writer plugin instance
604
    :type rconfig: :class:`config.Conf`
605
    """
606
607
    __metaclass__ = ABCMeta
608
609
    defaults = {
610
        'max_queue': 8192,      # maxlen of deque used for metrics
611
        'max_items': 64,        # maximum number of items to send at once
612
        'max_bytes': 1400,      # maximum number of bytes to send at once
613
        'meta': {}              # metadata to add to each metric
614
    }
615
616
    def __init__(self, rconfig):
617
        """A helper class to render metrics, buffer them and return chunks of
618
        metrics. Used as a superclass for the various metric writers (eg.
619
        influxdb, graphite, etc).
620
621
        The idea is to feed this object a ResultSet on each call to a writer
622
        plugins push() and then consume chunks of metrics from it until a full
623
        chunk cannot be returned.
624
625
        To use this, a writer plugin subclasses Render and defines the process
626
        method.The plugins implementation of process() should format the
627
        metrics it gets passed in a format suitable for sending to its backend
628
        and store them in the metrics dequeue.
629
630
        The plugin can then create an instance of it's render object and in
631
        push() simply call instance.process(results) and instance.get_chunk()
632
        to get the next chunk of metrics.
633
634
        On shutdown it can also call instance.get_chunk(partial=True) to get
635
        the remaining metrics. Normally it would want to fetch full batches
636
        of metrics.
637
638
        :param rconfig: config.Conf object passed from a writer plugin instance
639
        :type rconfig: :class:`config.Conf`
640
        """
641
        # ensure default configuration is set
642
        rconfig.defaults(Render.defaults)
643
        self.max_bytes = rconfig.get('max_bytes')
644
        self.max_items = rconfig.get('max_items')
645
        # Use a double ended queue with a maximum size
646
        maxlen = rconfig.get('max_queue')
647
        self.metrics = collections.deque(maxlen=maxlen)
648
        # define a metadata object and copy in any configured host metadata
649
        self.meta = Meta()
650
        clookup = {
651
            int: Int,
652
            float: Float,
653
            str: String,
654
            bool: Boolean
655
        }
656
        for entry in rconfig.get('meta'):
657
            for key, val in entry.items():
658
                if val.__class__ not in clookup:
659
                    args = (key, val.__class__.__name__)
660
                    err = "host metadata: unsupported type: {0}={1}".format(
661
                        *args)
662
                    raise ValueError(err)
663
                self.meta.add(clookup[val.__class__](key, val))
664
665
    def __iter__(self):
666
        """A Render iterator returns rendered metrics where the metrics are
667
        each as close to self.max_bytes in size as possible.
668
669
        :rtype: iterator
670
        """
671
        return self
672
673
    def __next__(self):
674
        """Return the next metric from the deque, or raise StopIteration() if
675
        there are no more metrics.
676
677
        :rtype: object
678
        """
679
        if len(self.metrics) < 1:
680
            raise StopIteration()
681
        return self.metrics.popleft()
682
683
    def next(self):
684
        """Return the next metric from the deque, or raise StopIteration() if
685
        there are no more metrics.
686
687
        :rtype: object
688
        """
689
        if len(self.metrics) < 1:
690
            raise StopIteration()
691
        return self.metrics.popleft()
692
693
    @abstractmethod
694
    def process(self, rset):
695
        """Record a result set in our backends format. Subclasses for specific
696
        backends must define this method.
697
698
        :param rset: A :class:`ResultSet` object
699
        :type rset: :class:`ResultSet`
700
        """
701
        raise NotImplementedError("process must be defined by a subclass")
702
703
704
class PluginLoader(object):
705
    """Load and control the reader and writer objects defined by
706
    the provided config.
707
708
    raises:
709
        ConfigError if a plugin is configured with an invalid path
710
        ConfigError if a plugin configuration is missing 'name'
711
        DuplicatePlugin if a plugin configuration has a duplicate 'name'
712
        PluginLoadError if there was an error loading the plugin
713
714
    :param config: a simple configuration helper object.
715
    :type config: plumd.config.conf
716
    :param log: a logger object
717
    :type log: logger
718
    :raises: ConfigError, DuplicatePlugin, PluginLoadError
719
    """
720
721
    def __init__(self, log, config):
722
        """Load and control the reader and writer objects defined by
723
        the provided config.
724
725
        raises:
726
            ConfigError if a plugin is configured with an invalid path
727
            ConfigError if a plugin configuration is missing 'name'
728
            DuplicatePlugin if a plugin configuration has a duplicate 'name'
729
            PluginLoadError if there was an error loading the plugin
730
731
        :param config: a simple configuration helper object.
732
        :type config: plumd.config.conf
733
        :param log: a logger object from structlog.get_logger()
734
        :type log: logger
735
        :raises: ConfigError, DuplicatePlugin, PluginLoadError
736
        """
737
        self.log = log
738
        msg = "initializing from: {0} conf: \n{1}"
739
        self.log.debug(msg.format(config.path, config))
740
        self.config = config
741
        self.readers = {}  # reader plugin io objects
742
        self.writers = {}  # writer plugin io objects
743
        self.load()
744
745
    def __str__(self):
746
        """Return a human readable str.
747
748
        :rtype: str
749
        """
750
        sval = "readers={0}, writers={1}".format(list(self.readers.keys()),
751
                                                 list(self.writers.keys()))
752
        return sval
753
754
    def __repr__(self):
755
        """Return a human readable str.
756
757
        :rtype: str
758
        """
759
        sval = "readers={0}, writers={1}".format(list(self.readers.keys()),
760
                                                 list(self.writers.keys()))
761
        return sval
762
763
    @property
764
    def nplugins(self):
765
        """Return the number of loaded plugins.
766
767
        :rtype: int
768
        """
769
        return len(list(self.readers.keys())) + len(list(self.writers.keys()))
770
771
    @property
772
    def nreaders(self):
773
        """Return the number of loaded reader plugins.
774
775
        :rtype: int
776
        """
777
        return len(list(self.readers.keys()))
778
779
    @property
780
    def nwriters(self):
781
        """Return the number of loaded writer plugins.
782
783
        :rtype: int
784
        """
785
        return len(list(self.writers.keys()))
786
787
    def load(self):
788
        """Load the configured plugin objects into self.readers and
789
        self.writers. Set the writers for each reader plugin.
790
791
        raises:
792
            ConfigError if ptype has an incorrect configuration path set
793
            ConfigError if a plugin configuration is missing 'name'
794
            DuplicatePlugin if a plugin configuration has a duplicate 'name'
795
            PluginLoadError if there was an error loading a plugin
796
797
        :raises: ConfigError, DuplicatePlugin
798
        """
799
        # get plugin configuration directory
800
        pdir = self.config.get('config.plugins')
801
        if not os.path.isdir(pdir):
802
            msg = "invalid plugin directory configured: {0}"
803
            raise ConfigError(msg.format(pdir))
804
805
        # load all plugins described by the plugin configurations
806
        msg = "loading plugins from: {0}"
807
        self.log.info(msg.format(self.config.get('config.plugins')))
808
        pobjs = load_all_plugins(self.log, self.config)
809
810
        # load_all_plugins returns a tuple of reader, writer objects
811
        self.readers, self.writers = pobjs
812
813
        # now the readers need to know what writers to write to
814
        config_plugin_writers(self)
815
816
    def start(self):
817
        """Start each reader/writer."""
818
        self.log.debug("starting")
819
820
        # start the writers then the readers
821
        for pdict in [self.writers, self.readers]:
822
            for pobj in pdict.values():
823
                self.log.debug("starting: {0}".format(pobj.name))
824
                pobj.start()
825
826
        # all done
827
        self.log.debug("started")
828
829
    def stop(self):
830
        """Stop each reader/writer."""
831
        self.log.debug("stopping plugins")
832
833
        # stop the readers first - they call each plugins onstop()
834
        self.log.debug("stopping readers")
835
        for pobj in list(self.readers.values()):
836
            self.log.debug("stopping reader: {0}".format(pobj.name))
837
            pobj.stop()
838
839
        # next stop the writers - they call each plugins onstop()
840
        self.log.debug("stopping writers")
841
        for pobj in list(self.writers.values()):
842
            self.log.debug("stopping writer: {0}".format(pobj.name))
843
            pobj.stop()
844
845
        # join threads now
846
        for pdict in [self.readers, self.writers]:
847
            for pname, pobj in list(pdict.items()):
848
                self.log.debug("waiting for plugin to stop: {0}".format(pname))
849
                pobj.join(timeout=self.config.get('shutdown_timeout'))
850
851
        # all done
852
        self.log.debug("stopped")
853
854
    def __del__(self):
855
        """Log __del__ calls."""
856
        self.log.debug("PluginLoader: __del__")
857
858
859
class Plugin(object):
860
    """The base class for all plugins - each plugin must sub class.
861
862
    :param config: an instance of plumd.conf configuration helper
863
    :type config: conf
864
    """
865
866
    __metaclass__ = ABCMeta
867
    defaults = {}
868
869
    def __init__(self, log, config):
870
        """The base class for all plugins - each plugin must sub class.
871
872
        :param config: an instance of plumd.conf configuration helper
873
        :type config: conf
874
        """
875
        self.config = config
876
        self.log = log
877
        # plugins can set their own defaults byt defining self.defaults
878
        self.config.defaults(self.defaults)
879
        # all plugins must be given a unique name
880
        self.name = config.get('name', 'unknown')
881
        # thread for reading or writing
882
        self.thread = None
883
        self.stop_evt = threading.Event()
884
885
    def __str__(self):
886
        """Return a human readable str.
887
888
        :rtype: str
889
        """
890
        sval = "Plugin(class={0})".format(self.__class__.__name__)
891
        return sval
892
893
    def __repr__(self):
894
        """Return a human readable str.
895
896
        :rtype: str
897
        """
898
        sval = "Plugin(class={0})".format(self.__class__.__name__)
899
        return sval
900
901
    def start(self):
902
        """Start run() in a new thread if not already running."""
903
        if self.thread is None or not self.thread.is_alive():
904
            self.thread = threading.Thread(target=self.run, group=None)
905
            self.thread.start()
906
            self.log.debug("Plugin {0} started".format(self.name))
907
        else:
908
            self.log.debug("Plugin {0} already running".format(self.name))
909
910
    def run(self):
911
        """Plugin sub classes must overide this."""
912
        raise NotImplementedError("Invalid Plugin")
913
914
    def stop(self):
915
        """Set our stop_evt Event."""
916
        # set the stop event and the self.read thread falls through.
917
        self.stop_evt.set()
918
919
    def join(self, timeout):
920
        """Join our thread."""
921
        if self.thread:
922
            self.thread.join(timeout)
923
            if self.thread.is_alive():
924
                msg = "Plugin {0} shutdown timeout - thread blocking exit"
925
                self.log.error(msg.format(self.name))
926
927
    def onstart(self):
928
        """Call before the first call to push()."""
929
        self.log.debug("Plugin: onstart: {0}".format(self.config.get('name')))
930
931
    def onstop(self):
932
        """Call just before the main process exits."""
933
        self.log.debug("Plugin: onstop: {0}".format(self.config.get('name')))
934
935
    def __del__(self):
936
        """Log __del__ calls."""
937
        self.log.debug("Plugin: {0} __del__".format(self.name))
938
939
940
class Reader(Plugin):
941
    """An abstract base class that all writer pluguns subclass."""
942
943
    __metaclass__ = ABCMeta
944
945
    defaults = {
946
        'poll.interval': 30,    # seconds between polls
947
        'delay.poll': 5         # random delay before first poll
948
    }
949
950
    def __init__(self, log, config):
951
        """Reader plugin abstract class.
952
953
        :param log: A logger
954
        :type log: logging.RootLogger
955
        :param config: a plumd.config.Conf configuration helper instance.
956
        :type config: plumd.config.Conf
957
        """
958
        super(Reader, self).__init__(log, config)
959
        self.config.defaults(Reader.defaults)
960
        # list of queues to write to
961
        self.queues = []
962
963
    def run(self):
964
        """Call the plugins onstart() function and then iterate over the
965
        plugins poll() function, pushing the result to each writer queue.
966
967
        When complete it calls the plugins onstop() function.
968
        """
969
        # starting up, call onstart
970
        self.onstart()
971
972
        # distribute metrics - don't everyone flood all at once
973
        time.sleep(random.randrange(int(self.config.get('delay.poll'))))
974
975
        # save a few cycles
976
        tloop = self.config.get('poll.interval')
977
        evt = self.stop_evt
978
        queues = self.queues
979
980
        # loop on calling the plugin objects poll and sending to writers
981
        while not evt.is_set():
982
983
            # loop every self.interval seconds
984
            with Interval(tloop, evt):
985
986
                # poll for metric result set
987
                rset = self.poll()
988
989
                # readers can return None values eg. a reader that polls
990
                # very often and periodically returns results
991
                if rset is None:
992
                    continue
993
994
                # write the value to each of our writer queues directly
995
                for queue_evt, queue in queues:
996
                    queue.append(rset)
997
                    queue_evt.set()
998
999
        # all done, call onstop
1000
        self.onstop()
1001
1002
    @abstractmethod
1003
    def poll(self):
1004
        """Reader plugins must define this method, it measures and returns a
1005
        metrics string.
1006
1007
        format <name:str> <value:float|int> <timestamp:time.time()>\n[...]
1008
        set to change to a python object.
1009
1010
        :rtype: str
1011
        """
1012
        # this will never get called
1013
        raise NotImplementedError("poll() not defined")
1014
1015
1016
class Writer(Plugin):
1017
    """An abstract base class that all writer pluguns subclass."""
1018
1019
    __metaclass__ = ABCMeta
1020
1021
    defaults = {
1022
        'maxqueue': 8192
1023
    }
1024
1025
    def __init__(self, log, config):
1026
        """Reader plugin abstract class.
1027
1028
        :param log: A logger
1029
        :type log: logging.RootLogger
1030
        :param config: a plumd.config.Conf configuration helper instance.
1031
        :type config: plumd.config.Conf
1032
        """
1033
        super(Writer, self).__init__(log, config)
1034
        self.config.defaults(Writer.defaults)
1035
        # list of queues to write to
1036
        self.queue = deque()
1037
        # Event signaling items on queue
1038
        self.queue_evt = threading.Event()
1039
1040
    def run(self):
1041
        """Call the plugins onstart() function and then consume
1042
        ResultSets from self.queue, calling self.push() for each.
1043
1044
        When complete it calls the plugins onstop() function.
1045
        """
1046
        # starting up, call onstart
1047
        self.onstart()
1048
1049
        # save a few cycles
1050
        queue_evt = self.queue_evt
1051
        queue = self.queue
1052
1053
        # loop on popping from queue and calling self.push()
1054
        while not self.stop_evt.is_set():
1055
            # wait for a ResultSet
1056
            queue_evt.wait()
1057
            # clear the event - race condition however
1058
            # next reader poll() will set the queue_evt again
1059
            queue_evt.clear()
1060
            try:
1061
                # write it
1062
                self.push(queue.popleft())
1063
            except IndexError:
1064
                self.log.debug("Writer: {0}: queue empty".format(self.name))
1065
1066
        # all done, call onstop
1067
        self.onstop()
1068
1069
    def stop(self):
1070
        """Set our stop_evt and queue_evt Events."""
1071
        # set the stop event and the self.read thread falls through.
1072
        self.stop_evt.set()
1073
        self.queue_evt.set()
1074
1075
    @abstractmethod
1076
    def push(self, rset):
1077
        """Writer plugins must define this method, it accepts a ResultSet.
1078
1079
        :param rset: ResultSet from a readers poll() call.
1080
        :type rset: plumd.ResultSet
1081
        :type metrics: str
1082
        """
1083
        # this will never get called
1084
        raise NotImplementedError("push() not defined")
1085