Completed
Push — master ( d058c9...ca4b9c )
by Kenny
45s
created

plumd.plugins.PluginReader.write()   B

Complexity

Conditions 5

Size

Total Lines 27

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 5
dl 0
loc 27
rs 8.0894
1
# -*- coding: utf-8 -*-
2
"""Defines a class to read from a queue and write to a writer plugin.
3
4
.. moduleauthor:: Kenny Freeman <[email protected]>
5
6
"""
7
__author__ = 'Kenny Freeman'
8
__email__ = '[email protected]'
9
__license__ = "ISCL"
10
__docformat__ = 'reStructuredText'
11
12
import time
13
import Queue
14
import random
15
import threading
16
import traceback
17
18
import structlog # pip install structlog
19
20
import plumd
21
import plumd.util
22
23
24
class PluginReader(object):
25
    """Reads from a plugin instance by calling pobj.poll() and writes
26
    to a list of PluginWriters by calling <writerobj>.queue.put() - loops
27
    on a configured interval.
28
29
    :param log: A logger returned from structlog.get_logger()
30
    :type log: structlog.get_logger()
31
    :param pobj: An instance of a Reader plugin.
32
    :type pobj: Reader
33
    :param pname: The configured name of the plugin.
34
    :type pname: str
35
    :param interval: The conigured polling interval for thep plugin.
36
    :type interval: int
37
    :raises: ConfigError, PluginLoadError
38
    """
39
40
41
    def __init__(self, log, pobj, pname):
42
        """Reads from a plugin instance by calling pobj.poll() and writes
43
        to a list of PluginWriters by calling <writerobj>.queue.put() - loops
44
        on a configured interval.
45
46
        :param log: A logger returned from structlog.get_logger()
47
        :type log: structlog.get_logger()
48
        :param pobj: An instance of a Reader plugin.
49
        :type pobj: Reader
50
        :param pname: The configured name of the plugin.
51
        :type pname: str
52
        :raises: ConfigError, PluginLoadError
53
        """
54
        self.log = log
55
        # poll() interval - get from plugins configuration
56
        self.interval = pobj.config.get('poll.interval')
57
        self._name = pname
58
        self.pobj = pobj
59
        # what writers should we write metrics to?
60
        self.writers = []
61
        # thread that polls for metrics and writes to writers
62
        self.thread = None
63
        self.stop_evt = threading.Event()
64
65
66
    @property
67
    def name(self):
68
        """Simply return our configured name.
69
        :rtype: str
70
        """
71
        return self._name
72
73
74
    def onstart(self):
75
        """Call the plugins onstart function."""
76
        self.log.debug("read", onstart="calling")
77
        try:
78
            self.pobj.onstart()
79
        except excepton as e:
80
            tb = traceback.format_exc()
81
            msg = "disabling plugin due to exception during onstart()"
82
            self.log.error("start", onstart="exception", exception=e,
83
                           msg=msg, trace=tb)
84
            return
85
        self.log.debug("start", onstart="complete")
86
87
88
    def poll(self):
89
        """Calls the plugins poll() function and either returns a valid
90
        :class:`plumd.ResultSet` or None.
91
92
        :rtype: plumd.ResultSet
93
        """
94
        rset = None
95
        # plugins can raise pretty much any exception, wrap in catch all try
96
        try:
97
            rset = self.pobj.poll()
98
            # ensure the result set is valid, if not set it to an empty one
99
            if rset is None or not isinstance(rset, plumd.ResultSet):
100
                self.log.error("read", poll="invalid", results=rset)
101
                rset = plumd.ResultSet()
102
        except Exception as e:
103
            # print a traceback and ensure the reader thread exits
104
            tb = traceback.format_exc()
105
            msg="disabling read plugin due to exception during poll()"
106
            self.log.error("read", poll="exception", msg=msg,
107
                           exception=e, trace=tb)
108
            self.stop_evt.set()
109
            rset = plumd.ResultSet()
110
        return rset
111
112
113
    def write(self, rset, lint):
114
        """Iterates through each writer plugin and writes the
115
        :class:`plumd.ResultSet` to the writers input queue. Uses a
116
        :class:`plumd.utils.Interval` to determine if it will block when
117
        pushing to a writers queue. If there is time remaining in the current
118
        poll interval it will block. Also, if a writers queue is full it will
119
        drop the metric for that writer. Future versions may use persistance
120
        to queue result sets to disk if the backend metrics services are down.
121
122
        :param rset: a result set from a plugin poll() call.
123
        :type rset: plumd.ResultSet
124
        :param lint: an interval helper.
125
        :type lint: plumd.util.Interval
126
        """
127
        # only send result sets that have metrics
128
        if rset.nresults < 1:
129
            return
130
131
        # enqueue onto each writers queue, wait for any remaining time
132
        for wobj in self.writers:
133
            try:
134
                to = lint.remaining
135
                block = True if to > 0 else False
136
                wobj.queue.put(rset, block=block, timeout=to)
137
            except Queue.Full as e:
138
                self.log.warn("read", action="drop", reason="queue full",
139
                              writer=wobj.name)
140
141
142
    def read(self):
143
        """Calls the plugins onstart() function and then iterates over the
144
        plugins poll() function, pushing the result to each writer queue."""
145
        self.onstart()
146
147
        ## distribute metrics - don't everyone flood all at once
148
        self.log.debug("read", poll="start", interval=self.interval)
149
        time.sleep(random.randrange(int(self.pobj.config.get('delay.poll'))))
150
151
        # loop on calling the plugin objects poll and sending to writers
152
        while not self.stop_evt.is_set():
153
            # loop every self.interval seconds
154
            with plumd.util.Interval(self.interval, self.stop_evt) as lint:
155
                rset = self.poll()
156
                self.write(rset, lint)
157
                # warn if loop time exceeded
158
                if lint.remaining < 0:
159
                    self.log.warn("loop exceeded")
160
        self.log.debug("read", poll="stop")
161
        self.onstop()
162
163
164
    def start(self):
165
        """Calls pobj.onstart() and starts a thread to poll() metrics."""
166
        self.log.debug("start", read="starting")
167
        self.thread = threading.Thread(target=self.read, group=None)
168
        self.thread.start()
169
        self.log.debug("start", read="started")
170
171
172
    def stop(self):
173
        """Stops the poll() thread and calls pobj.onstop()."""
174
        self.log.debug("stopping")
175
        # set the stop event and the self.read thread falls through.
176
        self.stop_evt.set()
177
178
179
    def onstop(self):
180
        """Calls the plugins onstop callable."""
181
        self.log.debug("onstop", onstop="calling")
182
        # this can raise pretty much any exception
183
        try:
184
            self.pobj.onstop()
185
        except Exception as e:
186
            tb = traceback.format_exc()
187
            self.log.error("onstop", onstop="exception", excepton=e, trace=tb)
188
        self.log.debug("onstop", onstop="complete")
189
190
191
    def join(self):
192
        """Joins our thread."""
193
        self.log.debug("join", action="joining")
194
        if self.thread:
195
            self.thread.join()
196
        else:
197
            self.log.debug("join", msg="not running")
198
        self.log.debug("join", action="complete")
199
200
201
    def __del__(self):
202
        """Logs __del__ calls."""
203
        self.log.debug("del")
204