Completed
Push — master ( ee7b96...42a2db )
by Kenny
01:20
created

PluginReader.write()   B

Complexity

Conditions 4

Size

Total Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 1 Features 1
Metric Value
cc 4
dl 0
loc 27
rs 8.5806
c 1
b 1
f 1
1
# -*- coding: utf-8 -*-
2
"""Defines a class to read from a queue and write to a writer plugin.
3
4
.. moduleauthor:: Kenny Freeman <[email protected]>
5
6
"""
7
__author__ = 'Kenny Freeman'
8
__email__ = '[email protected]'
9
__license__ = "ISCL"
10
__docformat__ = 'reStructuredText'
11
12
import time
13
import Queue
14
import random
15
import threading
16
import traceback
17
18
import plumd
19
import plumd.util
20
21
22
class PluginReader(object):
23
    """Reads from a plugin instance by calling pobj.poll() and writes
24
    to a list of PluginWriters by calling <writerobj>.queue.put() - loops
25
    on a configured interval.
26
27
    :param log: A logger
28
    :type log: logging.get_logger()
29
    :param pobj: An instance of a Reader plugin.
30
    :type pobj: Reader
31
    :param pname: The configured name of the plugin.
32
    :type pname: str
33
    :param interval: The conigured polling interval for thep plugin.
34
    :type interval: int
35
    :raises: ConfigError, PluginLoadError
36
    """
37
38
39
    def __init__(self, log, pobj, pname):
40
        """Reads from a plugin instance by calling pobj.poll() and writes
41
        to a list of PluginWriters by calling <writerobj>.queue.put() - loops
42
        on a configured interval.
43
44
        :param log: A logger
45
        :type log: structlog.get_logger()
46
        :param pobj: An instance of a Reader plugin.
47
        :type pobj: Reader
48
        :param pname: The configured name of the plugin.
49
        :type pname: str
50
        :raises: ConfigError, PluginLoadError
51
        """
52
        self.log = log
53
        # poll() interval - get from plugins configuration
54
        self.interval = pobj.config.get('poll.interval')
55
        self._name = pname
56
        self.pobj = pobj
57
        # what writers should we write metrics to?
58
        self.writers = []
59
        # thread that polls for metrics and writes to writers
60
        self.thread = None
61
        self.stop_evt = threading.Event()
62
63
64
    @property
65
    def name(self):
66
        """Simply return our configured name.
67
        :rtype: str
68
        """
69
        return self._name
70
71
72
    def onstart(self):
73
        """Call the plugins onstart function."""
74
        try:
75
            self.pobj.onstart()
76
        except Exception as e:
77
            tb = traceback.format_exc()
78
            msg = "read: disabling reader {0}: exception in onstart: {1}: {2}"
79
            self.log.error(msg.format(self._name, e, tb))
80
            return
81
82
83 View Code Duplication
    def poll(self):
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
84
        """Calls the plugins poll() function and either returns a valid
85
        :class:`plumd.ResultSet` or None.
86
87
        :rtype: plumd.ResultSet
88
        """
89
        rset = None
90
        # plugins can raise pretty much any exception, wrap in catch all try
91
        try:
92
            rset = self.pobj.poll()
93
            if not isinstance(rset, plumd.ResultSet):
94
                msg = "read: invalid result set from reader {0}: {1}"
95
                self.log.error(msg.format(self._name, rset))
96
                rset = plumd.ResultSet()
97
        except Exception as e:
98
            # print a traceback and ensure the reader thread exits
99
            tb = traceback.format_exc()
100
            msg="read: disabling reader {0} due to exception in poll: {1} : {2}"
101
            self.log.error(msg.format(self._name, e, tb))
102
            self.stop_evt.set()
103
            rset = plumd.ResultSet()
104
        return rset
105
106
107
    def write(self, rset):
108
        """Iterates through each writer plugin and writes the
109
        :class:`plumd.ResultSet` to the writers input queue. Uses a
110
        :class:`plumd.utils.Interval` to determine if it will block when
111
        pushing to a writers queue. If there is time remaining in the current
112
        poll interval it will block. Also, if a writers queue is full it will
113
        drop the metric for that writer. Future versions may use persistance
114
        to queue result sets to disk if the backend metrics services are down.
115
116
        :param rset: a result set from a plugin poll() call.
117
        :type rset: plumd.ResultSet
118
        :param lint: an interval helper.
119
        :type lint: plumd.util.Interval
120
        """
121
        # only send result sets that have metrics
122
        if rset.nresults < 1:
123
            return
124
125
        # enqueue onto each writers queue, wait for any remaining time
126
        for wobj in self.writers:
127
            try:
128
                # allow queue puts to block for a short time
129
                # this cannot block for long as it will also block shutdown
130
                wobj.queue.put(rset, block=True, timeout=1)
131
            except Queue.Full:
132
                msg = "read: {0}: dropping metrics: writer plugin queue full: {1}"
133
                self.log.warn(msg.format(self._name, wobj.name))
134
135
136
    def read(self):
137
        """Calls the plugins onstart() function and then iterates over the
138
        plugins poll() function, pushing the result to each writer queue."""
139
        self.onstart()
140
141
        ## distribute metrics - don't everyone flood all at once
142
        time.sleep(random.randrange(int(self.pobj.config.get('delay.poll'))))
143
144
        # loop on calling the plugin objects poll and sending to writers
145
        while not self.stop_evt.is_set():
146
            # loop every self.interval seconds
147
            with plumd.util.Interval(self.interval, self.stop_evt) as lint:
148
                if self.stop_evt.is_set():
149
                    break
150
                rset = self.poll()
151
                # readers can return None values eg. a reader that polls
152
                # very often and periodically returns results
153
                if rset:
154
                    self.write(rset)
155
                # warn if loop time exceeded
156
                if lint.remaining < 0:
157
                    msg = "read: {0} reader loop time exceeded"
158
                    self.log.warn(msg.format(self._name))
159
        self.onstop()
160
161
162
    def start(self):
163
        """Calls pobj.onstart() and starts a thread to poll() metrics."""
164
        self.thread = threading.Thread(target=self.read, group=None)
165
        self.thread.start()
166
167
168
    def stop(self):
169
        """Stops the poll() thread and calls pobj.onstop()."""
170
        # set the stop event and the self.read thread falls through.
171
        self.stop_evt.set()
172
173
174
    def onstop(self):
175
        """Calls the plugins onstop callable."""
176
        # this can raise pretty much any exception
177
        try:
178
            self.pobj.onstop()
179
        except Exception as e:
180
            tb = traceback.format_exc()
181
            msg = "read: exception during onstop for reader {0}: {1}: {2}"
182
            self.log.error(msg.format(self._name, e, tb))
183
184
185
    def join(self, timeout):
186
        """Joins our thread."""
187
        if self.thread:
188
            self.thread.join(timeout)
189
            if self.thread.isAlive():
190
                msg = "read: plugin {0} shutdown timeout - blocking exit"
191
                self.log.error(msg.format(self._name))
192
193
194
    def __del__(self):
195
        """Logs __del__ calls."""
196
        self.log.debug("read: del: {0}".format(self._name))
197