Completed
Branch prom (d3fc9e)
by Kenny
01:21
created

WriterThread.run()   B

Complexity

Conditions 6

Size

Total Lines 33

Duplication

Lines 33
Ratio 100 %

Importance

Changes 4
Bugs 0 Features 0
Metric Value
c 4
b 0
f 0
dl 33
loc 33
rs 7.5384
cc 6
1
# -*- coding: utf-8 -*-
2
"""There are many differences in the various time series databases. Plumd is
3
designed to support multiple backends using the classes defined here to
4
represent metrics in a uniform way:
5
6
    ResultSet - a collection of one or more Results from a call to poll()
7
    Result - a measurement of a single thing eg. disk free=0.0,used=100.0
8
    Meta - metadata associated with a Result eg. dev=sda1
9
    Int/Float/String/Boolean/Gauge/Rate/Counter/Timer - primitives for above
10
11
The ResultSet class is used to encapsulate all of the measurements taken
12
during a plugins poll() call and uses a single timestamp for that poll.
13
14
The Result class is used to encapsulate a single measurement - for example
15
measuring cpu utilization may result in idle, user, system etc values. These
16
would be recorded in a single Result object. In addition the Result class
17
allows for arbitrary metadata to be associated with a Result - eg. when
18
recording disk space there may be used, free, total however also metadata such
19
as dev=sda1 to associate the measurements with a specific block device.
20
21
The Meta class is used to encapsulate arbitrary metadata. The Result class
22
uses it to associate metadata with a measurement however it can also be used
23
for example to describe host level metadata (eg. hostname=<hostname>). The
24
Meta class allows writer plugins such as graphite to render the metadata as
25
part of the metric path (eg. servers.host.<hostname>.disk.used.dev.<device>) -
26
if there was no distiction between metadata key=value and metric key=value this
27
would not be possible. Also, the Meta class ensures that metadata keys are
28
unique.
29
30
The _Metrics classes are used to allow plumd to render metrics in a specific
31
format. For example influxdb wants integer values to end with an i when sending
32
via their line protocol. The various _Metric classes force the plugin to
33
explicitly say what each metric is (eg. Int, Float, String, Boolean, Counter,
34
Gauge, Timer, etc). Metrics are defined as namedtuples for reduced memory
35
utilization.
36
37
38
.. moduleauthor:: Kenny Freeman <[email protected]>
39
40
"""
41
42
import time
43
import random
44
import logging
45
import threading
46
import collections
47
from collections import deque
48
from abc import ABCMeta, abstractmethod
49
50
from plumd.util import Interval
51
52
__author__ = 'Kenny Freeman'
53
__email__ = '[email protected]'
54
__version__ = '0.7'
55
__license__ = "ISCL"
56
__docformat__ = 'reStructuredText'
57
58
59
DEFAULT_SERVER = 'null'
60
61
DEFAULT_CONFIG_FILE = "/etc/plumd.yaml"    # path to main configuration file
62
63
DEFAULT_CONFIG = {
64
    'log.level':        "warn",               # crit, error, warn, info, debug
65
    'log.format':       "[%(asctime)s] %(levelname)s %(process)d %(message)s",
66
    'config.plugins':   "/etc/plumd.conf.d/",  # path to the plugin confs
67
    'delay.startup':    1,                    # random delay for plugin start
68
    'delay.poll':       1,                    # random delay for first poll()
69
    'poll.interval':    15,                   # interval of reader poll() calls
70
    'max.queue':        512,                  # maximum size of internal queues
71
    'meta':             [],                   # key=value's for all metrics
72
    'meta.hostname':    True,                 # set and use hostname in meta
73
    'shutdown_timeout': 10                    # thread shutdown timeout
74
}
75
76
77
LOG_LEVELS = {                                # map config level to python level
78
    'crit':  logging.CRITICAL,
79
    'error': logging.ERROR,
80
    'warn':  logging.WARN,
81
    'info':  logging.INFO,
82
    'debug': logging.DEBUG
83
}
84
85
# possible values: threaded, basic
86
#   - basic for prometheus/server mode
87
#   - threaded for standard mode (readers run in threads, loop on sleep())
88
ENV_READER_CLASS = "PLUMD_READER_CLASS"
89
ENV_WRITER_CLASS = "PLUMD_WRITER_CLASS"
90
91
92
class DuplicatePlugin(Exception):
93
    """Two or more plugins with the same id were defined."""
94
95
    pass
96
97
98
class PluginNotFoundError(Exception):
99
    """A plugin file, module or class was not found."""
100
101
    pass
102
103
104
class PluginLoadError(Exception):
105
    """Exception(s) were raised while loading a plugin."""
106
107
    pass
108
109
110
class PluginRuntimeError(Exception):
111
    """Exception(s) were raised while running a plugin."""
112
113
    pass
114
115
116
class ConfigError(Exception):
117
    """Invalid configuration encountered."""
118
119
    pass
120
121
122
# _Metric : the base class of all metric primitives is a named tuple.
123
_Metric = collections.namedtuple('metric', ['name', 'value'])
124
125
126
class Int(_Metric):
127
    """An integer metric - ensures the value passed is an int.
128
129
    raises:
130
        ValueError if the passed value doesn't cast properly to int
131
132
    :param name: The name of the metric
133
    :type name: str
134
    :param value: The recorded value of the metric
135
    :type value: int
136
    :raises: ValueError
137
    """
138
139
    def __new__(cls, name, value):
140
        value = int(value)
141
        cls = super(Int, cls).__new__(cls, name, value)
142
        return cls
143
144
145
class Float(_Metric):
146
    """A float metric.
147
148
    raises:
149
        ValueError if the passed value doesn't cast properly to float
150
151
    :param name: The name of the metric
152
    :type name: str
153
    :param value: The recorded value of the metric
154
    :type value: float
155
    :raises: ValueError
156
    """
157
158
    def __new__(cls, name, value):
159
        value = float(value)
160
        cls = super(Float, cls).__new__(cls, name, value)
161
        return cls
162
163
164
class String(_Metric):
165
    """A string metric.
166
167
    raises:
168
        ValueError if the passed value doesn't cast properly to str
169
170
    :param name: The name of the metric
171
    :type name: str
172
    :param value: The recorded value of the metric
173
    :type value: str
174
    :raises: ValueError
175
    """
176
177
    def __new__(cls, name, value):
178
        value = str(value)
179
        cls = super(String, cls).__new__(cls, name, value)
180
        return cls
181
182
183
class Boolean(_Metric):
184
    """A boolean metric.
185
186
    raises:
187
        ValueError if the passed value doesn't cast properly to bool
188
189
    :param name: The name of the metric
190
    :type name: str
191
    :param value: The recorded value of the metric
192
    :type value: bool
193
    :raises: ValueError
194
    """
195
196
    def __new__(cls, name, value):
197
        value = bool(value)
198
        cls = super(Boolean, cls).__new__(cls, name, value)
199
        return cls
200
201
202
class Gauge(Float):
203
    """A gauge value - this is always a Float value.
204
205
206
    raises:
207
        ValueError if the passed value doesn't cast properly to int
208
209
    :param name: The name of the metric
210
    :type name: str
211
    :param value: The recorded value of the metric
212
    :type value: float
213
    :raises: ValueError
214
    """
215
    pass
216
217
218
class Counter(Int):
219
    """A metric that counts things - this is always an Integer value.
220
221
222
    raises:
223
        ValueError if the passed value doesn't cast properly to int
224
225
    :param name: The name of the metric
226
    :type name: str
227
    :param value: The recorded value of the metric
228
    :type value: int
229
    :raises: ValueError
230
    """
231
    pass
232
233
234
class Rate(Float):
235
    """A metric that describes a rate - this is always a Float.
236
237
    raises:
238
        ValueError if the passed value doesn't cast properly to float
239
240
    :param name: The name of the metric
241
    :type name: str
242
    :param value: The recorded value of the metric
243
    :type value: float
244
    :raises: ValueError
245
    """
246
    pass
247
248
249
class Timer(Float):
250
    """A metric that describes a timer value - this is always a Float.
251
252
    raises:
253
        ValueError if the passed value doesn't cast properly to float
254
255
    :param name: The name of the metric
256
    :type name: str
257
    :param value: The recorded value of the metric
258
    :type value: float
259
    :raises: ValueError
260
    """
261
    pass
262
263
264
class Meta(object):
265
    """Encapsulate generic key=value metadata however ensure that the
266
    values recorded are one of the four supported base types of Int, Float,
267
    String and Boolean. Also ensures keys are unique.
268
269
    :param metas: Variable list of values.
270
    :type metas: Int or Float or String or Boolean
271
    :raises: ValueError if metas is not one of the expected types or if any of
272
        the keys already exist.
273
    """
274
275
    types = [Int, Float, String, Boolean]
276
277
    def __init__(self):
278
        """Encapsulate generic key=value metadata however ensure that the
279
        values recorded are one of the four supported base types of Int, Float,
280
        String and Boolean. Also ensures keys are unique.
281
282
        :param metas: Variable list of values.
283
        :type metas: Int or Float or String or Boolean
284
        :raises: ValueError if metas is not one of the expected types or if any
285
            of the keys already exist.
286
        """
287
        self._meta = collections.OrderedDict()
288
289
    def __str__(self):
290
        """Return a human readable str.
291
292
        :rtype: str
293
        """
294
        sval = "Meta(items={0})"
295
        return sval.format(sorted(self._meta.keys()))
296
297
    def __repr__(self):
298
        """Return a human readable str.
299
300
        :rtype: str
301
        """
302
        sval = "Meta(items={0})"
303
        return sval.format(sorted(self._meta.keys()))
304
305
    @property
306
    def nkeys(self):
307
        """Return the number of key=value pairs."""
308
        return len(list(self._meta.keys()))
309
310
    @property
311
    def keys(self):
312
        """Return a list of metadata keys.
313
314
        :rtype: :class:`list`
315
        """
316
        return self._meta.keys()
317
318
    @property
319
    def items(self):
320
        """Return a (by default) sorted list of (key, value) tuples.
321
322
        :rtype: :class:`list`
323
        """
324
        return self._meta.items()
325
326
    def add(self, val):
327
        """Record a value to our metadata.
328
329
        :param val: The value to add.
330
        :type val: Int or Float or String or Boolean
331
        :raises: ValueError if val is an unexpected type or the
332
            keys already exist.
333
        """
334
        if val.__class__ not in self.types:
335
            cname = val.__class__.__name__
336
            err = "Class not supported: class: {0} : value: {1}"
337
            raise ValueError(err.format(cname, val))
338
        # using indexes: 0 => name, 1 => value
339
        if val[0] not in self._meta:
340
            self._meta[val[0]] = val
341
            return
342
        raise ValueError("key exists: {0}".format(val[0]))
343
344
345
class Result(object):
346
    """Encapsulate a set of metrics and an optional Meta metadata object.
347
348
    :param name: The name of the result eg. cpu
349
    :type name: str
350
    :param metrics: optional list of metrics
351
    :type metrics: Int or Float or String or Boolean or Gauge or Counter
352
        or Rate or Timer
353
    """
354
355
    types = [Int, Float, String, Boolean, Gauge, Counter, Rate, Timer]
356
357
    def __init__(self, name, metrics=None):
358
        """Encapsulate a set of metrics and an optional Meta metadata object.
359
360
        :param name: The name of the result eg. cpu
361
        :type name: str
362
        :param metrics: optional list of metrics
363
        :type metrics: Int or Float or String or Boolean or Gauge or Counter
364
            or Rate or Timer
365
        """
366
        self._name = name
367
        self._metrics = list()
368
        if metrics:
369
            for metric in metrics:
370
                mclass = metric.__class__
371
                if mclass not in self.types:
372
                    err = "Class not supported: class: {0} : metric: {1}"
373
                    raise ValueError(err.format(mclass.__name__, metric))
374
                self._metrics.append(metric)
375
        self._meta = Meta()
376
377
    def __str__(self):
378
        """Return a human readable str.
379
380
        :rtype: str
381
        """
382
        sval = "Result(name={0}, metrics={1})"
383
        return sval.format(self._name, self._metrics)
384
385
    def __repr__(self):
386
        """Return a human readable str.
387
388
        :rtype: str
389
        """
390
        sval = "Result(name={0}, metrics={1})"
391
        return sval.format(self._name, self._metrics)
392
393
    @property
394
    def name(self):
395
        """Return the Result name."""
396
        return self._name
397
398
    @property
399
    def metrics(self):
400
        """Return the Results list of metrics."""
401
        return self._metrics
402
403
    @property
404
    def meta(self):
405
        """Return the results metadata object."""
406
        return self._meta
407
408
    def add(self, metric):
409
        """Add a metric to our list of metrics.
410
411
        raises:
412
            ValueError if the metric is not a supported type
413
414
        :param metric: One of the metric types (Int, Float, String, etc)
415
        :type metric: Int or Float or String or Boolean or Counter or Gauge
416
        :raises: ValueError
417
        """
418
        mclass = metric.__class__
419
        if mclass not in self.types:
420
            err = "Class not supported: class: {0} : metric: {1}"
421
            raise ValueError(err.format(mclass.__name__, metric))
422
        self._metrics.append(metric)
423
424
    def add_list(self, metrics):
425
        """Add a list of metrics to our list of metrics.
426
427
        raises:
428
            ValueError if any metrics are not a supported type
429
430
        :param metrics: list of metrics (Int, Float, String, etc)
431
        :type metrics: list
432
        :raises: ValueError
433
        """
434
        for metric in metrics:
435
            mclass = metric.__class__
436
            if mclass not in self.types:
437
                err = "Class not supported: class: {0} : metric: {1}"
438
                raise ValueError(err.format(mclass.__name__, metric))
439
            self._metrics.append(metric)
440
441
442
class ResultSet(object):
443
    """A class to encapsulate a series of measurement Results.
444
445
    This is the return value from each Reader plugin poll() call.
446
447
    The class has a list of Result objects and a timestamp. The timestamp
448
    is set to the current UTC time when the object is created. This is done
449
    since each poll() call should normally complete within ms or us and the
450
    target audience for the recorded metrics are eg. grafana graphs and
451
    system alerts with resolutions down to 1 second typically. The difference
452
    in time between the first recorded metric and last is generally going to
453
    be much less than 1 second.
454
455
    :Example:
456
457
    >>>import plumd
458
    >>>metrics = [ plumd.Float("idle", 0.0), plumd.Float("user", 100.0) ]
459
    >>>res = Result("cpu", metrics)
460
    >>>rset = ResultSet(results=[res])
461
462
    >>>metrics = [ plumd.Float("free", 80.0), plumd.Float("used", 20.0) ]
463
    >>>res = Result("disk", metrics)
464
    >>>res.meta.add(plumd.String("dev", "sda1"))
465
    >>>rset = ResultSet(results=[res])
466
    """
467
468
    def __init__(self, results=None):
469
        """Class to encapsulate a collection of metrics eg. from poll()."""
470
        self._results = list() if results is None else list(results)
471
        self._time = time.time()
472
473
    @property
474
    def time(self):
475
        """Return our timestamp.
476
477
        :rtype: float
478
        """
479
        return self._time
480
481
    @property
482
    def nresults(self):
483
        """Return the number of results recorded so far.
484
485
        :rtype: int
486
        """
487
        return len(self._results)
488
489
    @property
490
    def results(self):
491
        """Yield a tuple for each Result recorded.
492
493
        The tuple format is:
494
495
            ( time, result_name, result_meta, [metric, metric, metric] )
496
497
        :rtype: generator
498
        """
499
        for robj in self._results:
500
            yield (self._time, robj.name, robj.meta, robj.metrics)
501
502
    def __str__(self):
503
        """Return a human readable str.
504
505
        :rtype: str
506
        """
507
        sval = "ResultSet(results={0}, time={1})"
508
        return sval.format(len(self._results), self._time)
509
510
    def __repr__(self):
511
        """Return a human readable str.
512
513
        :rtype: str
514
        """
515
        sval = "ResultSet(results={0}, time={1})"
516
        return sval.format(len(self._results), self._time)
517
518
    def add(self, result):
519
        """Record a result in this ResultSet.
520
521
        :param result: a Result object
522
        :type results: Result
523
        """
524
        if not isinstance(result, Result):
525
            raise ValueError("Invalid result: {0}".format(result))
526
        self._results.append(result)
527
528
    def add_list(self, results):
529
        """Record a list of results in this ResultSet.
530
531
        :param results: a list of Result objects
532
        :type results: list
533
        """
534
        for result in results:
535
            if not isinstance(result, Result):
536
                raise ValueError("Invalid result: {0}".format(result))
537
            self._results.append(result)
538
539
540
class Plugin(object):
541
    """The base class for all Plugins."""
542
543
    __metaclass__ = ABCMeta
544
    defaults = {}
545
546
    def __init__(self, log, config):
547
        """Create a plugin.
548
549
        :param config: an instance of plumd.conf configuration helper
550
        :type config: conf
551
        """
552
        self.log = log
553
        self.config = config
554
        # plugins can set their own defaults byt defining self.defaults
555
        self.config.defaults(self.defaults)
556
        # all plugins must be given a unique name
557
        self.name = config.get('name', exception=True)
558
559
    def __str__(self):
560
        """Return a human readable str.
561
562
        :rtype: str
563
        """
564
        sval = "Plugin(class={0})".format(self.__class__.__name__)
565
        return sval
566
567
    def __repr__(self):
568
        """Return a human readable str.
569
570
        :rtype: str
571
        """
572
        sval = "Plugin(class={0})".format(self.__class__.__name__)
573
        return sval
574
575
    def onstart(self):
576
        """Call before the first call to push()."""
577
        self.log.debug("Plugin: onstart: {0}".format(self.config.get('name')))
578
579
    def onstop(self):
580
        """Call just before the main process exits."""
581
        self.log.debug("Plugin: onstop: {0}".format(self.config.get('name')))
582
583
    def __del__(self):
584
        """Log __del__ calls."""
585
        self.log.debug("Plugin: {0} __del__".format(self.name))
586
587
588
class Reader(Plugin):
589
    """An abstract base class that all reader pluguns must subclass."""
590
591
    __metaclass__ = ABCMeta
592
593
    defaults = {
594
        'poll.interval': 30,    # seconds between polls
595
        'delay.poll': 5         # random delay before first poll
596
    }
597
598
    def __init__(self, log, config):
599
        """Reader plugin base class.
600
601
        :param log: A logger
602
        :type log: logging.RootLogger
603
        :param config: a plumd.config.Conf configuration helper instance.
604
        :type config: plumd.config.Conf
605
        """
606
        super(Reader, self).__init__(log, config)
607
        self.config.defaults(Reader.defaults)
608
        self.interval = config.get("interval", exception=True)
609
610
    @abstractmethod
611
    def poll(self):
612
        """All Reader plugins must define this method, it returns a ResultSet.
613
614
        :rtype: plumd.ResultSet
615
        """
616
        # this will never get called
617
        raise NotImplementedError("poll() not defined")
618
619
620
class Render(Plugin):
621
    """Placeholder class to identify the various Render plugins."""
622
623
    pass
624
625
626
class Writer(Plugin):
627
    """An abstract base class that all writer pluguns must subclass."""
628
629
    __metaclass__ = ABCMeta
630
631
    defaults = {
632
        'maxqueue': 32
633
    }
634
635
    def __init__(self, log, config):
636
        """Reader plugin abstract class.
637
638
        :param log: A logger
639
        :type log: logging.RootLogger
640
        :param config: a plumd.config.Conf configuration helper instance.
641
        :type config: plumd.config.Conf
642
        """
643
        super(Writer, self).__init__(log, config)
644
        self.config.defaults(Writer.defaults)
645
646
    @abstractmethod
647
    def write(self, metrics):
648
        """Write the rendered metrics to our backend.
649
650
        What metrics is depends on the Render implementation.
651
652
        :param metrics: A rendered metric from our Render instance
653
        :type metrics: object
654
        """
655
        raise NotImplementedError("write must be defined by a subclass")
656
657
    @abstractmethod
658
    def flush(self, metrics):
659
        """Flush the rendered metrics to our backend.
660
661
        What metrics is depends on the Render implementation.
662
663
        This is used only during shutdown to send any remaining metrics
664
        from a Render plugin. Normally metrics are sent in chunks (depending
665
        on the Writer plugin) however during shutdown any partially chunked
666
        metrics need to be sent to the backend.
667
668
        Also, this function should return within a reasonable time (ie. it
669
        needs to have a timeout) as it is called during shutdown.
670
671
        :param metrics: A rendered metric from our Render instance
672
        :type metrics: object
673
        """
674
        raise NotImplementedError("write must be defined by a subclass")
675
676
677
class PluginThread(object):
678
    """Base class for Reader/Writer Thread objects."""
679
680
    __metaclass__ = ABCMeta
681
    defaults = {
682
        'thread.daemon': True
683
    }
684
685
    def __init__(self, log, config, plugin):
686
        """Base class for Reader/Writer Thread objects.
687
688
        :param log: A logger
689
        :type log: logging.RootLogger
690
        :param config: Thread configuration
691
        :type config: plumd.config.Conf
692
        :param plugin: A Reader/Writer plugin instance
693
        :type plugin: plumd.Plugin
694
        """
695
        self.log = log
696
        # plugins can set their own defaults byt defining self.defaults
697
        config.defaults(self.defaults)
698
        # thread for reading or writing
699
        self.thread = None
700
        self.stop_evt = threading.Event()
701
        self.plugin = plugin
702
        self.daemon = config.get("thread.daemon", exception=True)
703
704
    @abstractmethod
705
    def run(self):
706
        """Plugin sub classes must overide this."""
707
        raise NotImplementedError("Invalid ThreadedPlugin")
708
709
    def start(self):
710
        """Start run() in a new thread if not already running."""
711
        if self.thread is None or not self.thread.is_alive():
712
            self.thread = threading.Thread(target=self.run, group=None)
713
            if self.daemon:
714
                self.thread.daemon = True
715
            self.thread.start()
716
            self.log.info("Plugin {0} started".format(self.plugin.name))
717
        else:
718
            self.log.warn("Plugin {0} already running".format(self.plugin.name))
719
720
    def stop(self):
721
        """Set our stop_evt Event to force our thread to (eventually) return."""
722
        self.stop_evt.set()
723
724
    def join(self, timeout):
725
        """Stop and wait up to timeout seconds for our thread to return.
726
727
        :param timeout: Number of seconds to wait for our thread to return.
728
        :type timeout: int
729
        """
730
        self.stop()
731
        if self.thread:
732
            self.thread.join(timeout)
733
            if self.thread.is_alive():
734
                msg = "Plugin {0} shutdown timeout - thread blocking exit"
735
                self.log.error(msg.format(self.plugin.name))
736
737
738
class ReaderThread(PluginThread):
739
    """Threading required to push metrics from Reader=>Writer."""
740
741
    __metaclass__ = ABCMeta
742
743
    defaults = {
744
        # random delay between 0 and delay.poll s for first poll() call
745
        # this is to prevent stampeeding herd after restarting many instances
746
        "delay.poll": 5
747
    }
748
749
    def __init__(self, log, config, plugin):
750
        """Create a ReaderThread, do not start it.
751
752
        :param log: A logger
753
        :type log: logging.RootLogger
754
        :param config: configuration for this reader thread
755
        :type config: plumd.config.Conf
756
        :param plugin: a plumd.Reader plugin instance.
757
        :type plugin: plumd.Reader
758
        """
759
        super(ReaderThread, self).__init__(log, config, plugin)
760
        config.defaults(ReaderThread.defaults)
761
        self.start_delay = config.get("delay.poll", exception=True)
762
        self.renders = deque()
763
764
    def register(self, render):
765
        """Add the Render object.
766
767
        :param render: A Render to send metrics to.
768
        :type render: plumd.Render
769
        """
770
        self.renders.append(render)
771
772
    def deregister(self, render):
773
        """Remove the Render object.
774
775
        :param render: A Render to send metrics to.
776
        :type render: plumd.Render
777
        """
778
        try:
779
            self.renders.remove(render)
780
        except ValueError as exc:
781
            err = "attempt to remove non-existant render: {0}"
782
            self.log.error(err.format(exc))
783
784 View Code Duplication
    def run(self):
785
        """Start reading from the reader Plugin after calling its onstart.
786
787
        Call stop() to get this thread to exit - before returning it will
788
        call the readers onstop().
789
        """
790
        if not self.queue or not self.render:
791
            err = "output queue/render not set for reader: {0}"
792
            raise PluginRuntimeError(err.format(self.plugin.name))
793
794
        # starting up, call onstart
795
        self.plugin.onstart()
796
797
        # distribute metrics - don't everyone flood all at once
798
        time.sleep(random.randrange(int(self.start_delay)))
799
800
        # save a few cycles
801
        tloop = self.plugin.interval
802
        evt = self.stop_evt
803
        poll = self.plugin.poll
804
805
        # loop on calling the plugin objects poll and sending to writers
806
        while not evt.is_set():
807
808
            # loop every self.interval seconds
809
            with Interval(tloop, evt):
810
811
                # poll for metric result set
812
                rset = poll()
813
814
                # readers can return None values eg. a reader that polls
815
                # very often and periodically returns results
816
                if rset is None:
817
                    continue
818
819
                # write the value to each of our output queues directly
820
                for render in self.renders:
821
                    render.process(rset.results)
822
823
        # all done, call onstop
824
        self.plugin.onstop()
825
826
827
class WriterThread(PluginThread):
828
    """Threading required to isolate Readers from Writer backpressure."""
829
830
    __metaclass__ = ABCMeta
831
832
    defaults = {
833
        'maxqueue': 2048
834
    }
835
836
    def __init__(self, log, config, plugin):
837
        """Reader plugin abstract class.
838
839
        :param log: A logger
840
        :type log: logging.RootLogger
841
        :param config: a plumd.config.Conf configuration helper instance.
842
        :type config: plumd.config.Conf
843
        :param plugin: a plumd.Writer plugin instance.
844
        :type plugin: plumd.Writer
845
        """
846
        super(WriterThread, self).__init__(log, config, plugin)
847
        config.defaults(WriterThread.defaults)
848
        self.stop_evt = threading.Event()
849
        self.render = None
850
        self.queue = None
851
852
    def register(self, render):
853
        """Set the Render object.
854
855
        :param render: A Render to send metrics to.
856
        :type render: plumd.Render
857
        """
858
        if self.render:
859
            self.render.deregister(self.plugin.name)
860
        self.render = render
861
        self.queue = self.render.register(self.plugin.name)
862
863
    def deregister(self, render):
864
        """Un-Set the Render object.
865
866
        :param render: A Render instance.
867
        :type render: plumd.Render
868
        """
869
        if self.render:
870
            self.render.deregister(self.plugin.name)
871
            self.render = None
872
            self.queue = None
873
874
    def stop(self):
875
        """Set this threads stop event to force run to return."""
876
        self.stop_evt.set()
877
        # wakeup writer
878
        for i in range(0, 10):
879
            self.queue.put_nowait(None)
880
881 View Code Duplication
    def run(self):
882
        """Start writing metrics to self.writer from self.queue.
883
884
        First calls self.writer.onstart().
885
886
        Call stop() to get this thread to exit - before returning it will
887
        call the writers onstop().
888
        """
889
890
        if not self.queue or not self.render:
891
            err = "input queue/render not set for writer: {0}"
892
            raise PluginRuntimeError(err.format(self.plugin.name))
893
894
        # starting up, call onstart
895
        self.plugin.onstart()
896
897
        # save a few cycles
898
        stop_evt = self.stop_evt
899
        write = self.plugin.write
900
901
        # loop on popping from queue and calling self.push()
902
        while not stop_evt.is_set():
903
            entry = self.queue.get()
904
            if entry:
905
                write(entry)
906
907
        # flush any partially rendered metrics on shutdown
908
        partial = self.render.flush()
909
        if partial:
910
            self.plugin.flush(partial)
911
912
        # all done, call onstop
913
        self.plugin.onstop()