Completed
Branch master (7e8cc2)
by Kenny
03:21 queued 19s
created

PluginReader.write()   B

Complexity

Conditions 4

Size

Total Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 1 Features 1
Metric Value
cc 4
dl 0
loc 27
rs 8.5806
c 1
b 1
f 1
1
# -*- coding: utf-8 -*-
2
"""Defines a class to read from a queue and write to a writer plugin.
3
4
.. moduleauthor:: Kenny Freeman <[email protected]>
5
6
"""
7
__author__ = 'Kenny Freeman'
8
__email__ = '[email protected]'
9
__license__ = "ISCL"
10
__docformat__ = 'reStructuredText'
11
12
import sys
13
PY3 = sys.version_info > (3,)
14
15
import time
16
if PY3:
17
    import queue as Queue
18
else:
19
    import Queue
20
import random
21
import threading
22
import traceback
23
24
import plumd
25
import plumd.util
26
27
28
class PluginReader(object):
29
    """Reads from a plugin instance by calling pobj.poll() and writes
30
    to a list of PluginWriters by calling <writerobj>.queue.put() - loops
31
    on a configured interval.
32
33
    :param log: A logger
34
    :type log: logging.get_logger()
35
    :param pobj: An instance of a Reader plugin.
36
    :type pobj: Reader
37
    :param pname: The configured name of the plugin.
38
    :type pname: str
39
    :param interval: The conigured polling interval for thep plugin.
40
    :type interval: int
41
    :raises: ConfigError, PluginLoadError
42
    """
43
44
45
    def __init__(self, log, pobj, pname):
46
        """Reads from a plugin instance by calling pobj.poll() and writes
47
        to a list of PluginWriters by calling <writerobj>.queue.put() - loops
48
        on a configured interval.
49
50
        :param log: A logger
51
        :type log: structlog.get_logger()
52
        :param pobj: An instance of a Reader plugin.
53
        :type pobj: Reader
54
        :param pname: The configured name of the plugin.
55
        :type pname: str
56
        :raises: ConfigError, PluginLoadError
57
        """
58
        self.log = log
59
        # poll() interval - get from plugins configuration
60
        self.interval = pobj.config.get('poll.interval')
61
        self._name = pname
62
        self.pobj = pobj
63
        # what writers should we write metrics to?
64
        self.writers = []
65
        # thread that polls for metrics and writes to writers
66
        self.thread = None
67
        self.stop_evt = threading.Event()
68
69
70
    @property
71
    def name(self):
72
        """Simply return our configured name.
73
        :rtype: str
74
        """
75
        return self._name
76
77
78
    def onstart(self):
79
        """Call the plugins onstart function."""
80
        try:
81
            self.pobj.onstart()
82
        except Exception as e:
83
            tb = traceback.format_exc()
84
            msg = "read: disabling reader {0}: exception in onstart: {1}: {2}"
85
            self.log.error(msg.format(self._name, e, tb))
86
            return
87
88
89
    def poll(self):
90
        """Calls the plugins poll() function and either returns a valid
91
        :class:`plumd.ResultSet` or None.
92
93
        :rtype: plumd.ResultSet
94
        """
95
        rset = None
96
        # plugins can raise pretty much any exception, wrap in catch all try
97
        try:
98
            rset = self.pobj.poll()
99
            if not isinstance(rset, plumd.ResultSet):
100
                msg = "read: invalid result set from reader {0}: {1}"
101
                self.log.error(msg.format(self._name, rset))
102
                rset = plumd.ResultSet()
103
        except Exception as e:
104
            # print a traceback and ensure the reader thread exits
105
            tb = traceback.format_exc()
106
            msg="read: disabling reader {0} due to exception in poll: {1} : {2}"
107
            self.log.error(msg.format(self._name, e, tb))
108
            self.stop_evt.set()
109
            rset = plumd.ResultSet()
110
        return rset
111
112
113
    def write(self, rset):
114
        """Iterates through each writer plugin and writes the
115
        :class:`plumd.ResultSet` to the writers input queue. Uses a
116
        :class:`plumd.utils.Interval` to determine if it will block when
117
        pushing to a writers queue. If there is time remaining in the current
118
        poll interval it will block. Also, if a writers queue is full it will
119
        drop the metric for that writer. Future versions may use persistance
120
        to queue result sets to disk if the backend metrics services are down.
121
122
        :param rset: a result set from a plugin poll() call.
123
        :type rset: plumd.ResultSet
124
        :param lint: an interval helper.
125
        :type lint: plumd.util.Interval
126
        """
127
        # only send result sets that have metrics
128
        if rset.nresults < 1:
129
            return
130
131
        # enqueue onto each writers queue, wait for any remaining time
132
        for wobj in self.writers:
133
            try:
134
                # allow queue puts to block for a short time
135
                # this cannot block for long as it will also block shutdown
136
                wobj.queue.put(rset, block=True, timeout=1)
137
            except Queue.Full:
138
                msg = "read: {0}: dropping metrics: writer plugin queue full: {1}"
139
                self.log.warn(msg.format(self._name, wobj.name))
140
141
142
    def read(self):
143
        """Calls the plugins onstart() function and then iterates over the
144
        plugins poll() function, pushing the result to each writer queue."""
145
        self.onstart()
146
147
        ## distribute metrics - don't everyone flood all at once
148
        time.sleep(random.randrange(int(self.pobj.config.get('delay.poll'))))
149
150
        # loop on calling the plugin objects poll and sending to writers
151
        while not self.stop_evt.is_set():
152
            # loop every self.interval seconds
153
            with plumd.util.Interval(self.interval, self.stop_evt) as lint:
154
                if self.stop_evt.is_set():
155
                    break
156
                rset = self.poll()
157
                # readers can return None values eg. a reader that polls
158
                # very often and periodically returns results
159
                if rset:
160
                    self.write(rset)
161
                # warn if loop time exceeded
162
                if lint.remaining < 0:
163
                    msg = "read: {0} reader loop time exceeded"
164
                    self.log.warn(msg.format(self._name))
165
166
167
    def start(self):
168
        """Calls pobj.onstart() and starts a thread to poll() metrics."""
169
        self.thread = threading.Thread(target=self.read, group=None)
170
        self.thread.start()
171
        self.log.debug("Reader {0} thread started".format(self._name))
172
        if not self.thread.is_alive():
173
            err = "Reader {0} thread failed to start".format(self._name)
174
            raise plumd.PluginRuntimeError(err)
175
176
177
    def stop(self):
178
        """Stops the poll() thread and calls pobj.onstop()."""
179
        # set the stop event and the self.read thread falls through.
180
        self.stop_evt.set()
181
182
183
    def onstop(self):
184
        """Calls the plugins onstop callable."""
185
        # this can raise pretty much any exception
186
        try:
187
            self.pobj.onstop()
188
        except Exception as e:
189
            tb = traceback.format_exc()
190
            msg = "read: exception during onstop for reader {0}: {1}: {2}"
191
            self.log.error(msg.format(self._name, e, tb))
192
193
194
    def join(self, timeout):
195
        """Joins our thread."""
196
        if self.thread:
197
            self.thread.join(timeout)
198
            if self.thread.isAlive():
199
                msg = "read: plugin {0} shutdown timeout - blocking exit"
200
                self.log.error(msg.format(self._name))
201
202
203
    def __del__(self):
204
        """Logs __del__ calls."""
205
        self.log.debug("read: del: {0}".format(self._name))
206