Completed
Push — master ( 3279ec...8d12dc )
by Kenny
01:09
created

PluginReader.poll()   B

Complexity

Conditions 4

Size

Total Lines 23

Duplication

Lines 23
Ratio 100 %

Importance

Changes 1
Bugs 1 Features 1
Metric Value
cc 4
dl 23
loc 23
rs 8.7972
c 1
b 1
f 1
1
# -*- coding: utf-8 -*-
2
"""Defines a class to read from a queue and write to a writer plugin.
3
4
.. moduleauthor:: Kenny Freeman <[email protected]>
5
6
"""
7
__author__ = 'Kenny Freeman'
8
__email__ = '[email protected]'
9
__license__ = "ISCL"
10
__docformat__ = 'reStructuredText'
11
12
import time
13
import Queue
14
import random
15
import threading
16
import traceback
17
18
import plumd
19
import plumd.util
20
21
22
class PluginReader(object):
23
    """Reads from a plugin instance by calling pobj.poll() and writes
24
    to a list of PluginWriters by calling <writerobj>.queue.put() - loops
25
    on a configured interval.
26
27
    :param log: A logger
28
    :type log: logging.get_logger()
29
    :param pobj: An instance of a Reader plugin.
30
    :type pobj: Reader
31
    :param pname: The configured name of the plugin.
32
    :type pname: str
33
    :param interval: The conigured polling interval for thep plugin.
34
    :type interval: int
35
    :raises: ConfigError, PluginLoadError
36
    """
37
38
39
    def __init__(self, log, pobj, pname):
40
        """Reads from a plugin instance by calling pobj.poll() and writes
41
        to a list of PluginWriters by calling <writerobj>.queue.put() - loops
42
        on a configured interval.
43
44
        :param log: A logger
45
        :type log: structlog.get_logger()
46
        :param pobj: An instance of a Reader plugin.
47
        :type pobj: Reader
48
        :param pname: The configured name of the plugin.
49
        :type pname: str
50
        :raises: ConfigError, PluginLoadError
51
        """
52
        self.log = log
53
        # poll() interval - get from plugins configuration
54
        self.interval = pobj.config.get('poll.interval')
55
        self._name = pname
56
        self.pobj = pobj
57
        # what writers should we write metrics to?
58
        self.writers = []
59
        # thread that polls for metrics and writes to writers
60
        self.thread = None
61
        self.stop_evt = threading.Event()
62
63
64
    @property
65
    def name(self):
66
        """Simply return our configured name.
67
        :rtype: str
68
        """
69
        return self._name
70
71
72
    def onstart(self):
73
        """Call the plugins onstart function."""
74
        try:
75
            self.pobj.onstart()
76
        except Exception as e:
77
            tb = traceback.format_exc()
78
            msg = "read: disabling reader {0}: exception in onstart: {1}: {2}"
79
            self.log.error(msg.format(self._name, e, tb))
80
            return
81
82
83 View Code Duplication
    def poll(self):
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
84
        """Calls the plugins poll() function and either returns a valid
85
        :class:`plumd.ResultSet` or None.
86
87
        :rtype: plumd.ResultSet
88
        """
89
        rset = None
90
        # plugins can raise pretty much any exception, wrap in catch all try
91
        try:
92
            rset = self.pobj.poll()
93
            # ensure the result set is valid, if not set it to an empty one
94
            if rset is None or not isinstance(rset, plumd.ResultSet):
95
                msg = "read: invalid result set from reader {0}: {1}"
96
                self.log.error(msg.format(self._name, rset))
97
                rset = plumd.ResultSet()
98
        except Exception as e:
99
            # print a traceback and ensure the reader thread exits
100
            tb = traceback.format_exc()
101
            msg="read: disabling reader {0} due to exception in poll: {1} : {2}"
102
            self.log.error(msg.format(self._name, e, tb))
103
            self.stop_evt.set()
104
            rset = plumd.ResultSet()
105
        return rset
106
107
108
    def write(self, rset, lint):
109
        """Iterates through each writer plugin and writes the
110
        :class:`plumd.ResultSet` to the writers input queue. Uses a
111
        :class:`plumd.utils.Interval` to determine if it will block when
112
        pushing to a writers queue. If there is time remaining in the current
113
        poll interval it will block. Also, if a writers queue is full it will
114
        drop the metric for that writer. Future versions may use persistance
115
        to queue result sets to disk if the backend metrics services are down.
116
117
        :param rset: a result set from a plugin poll() call.
118
        :type rset: plumd.ResultSet
119
        :param lint: an interval helper.
120
        :type lint: plumd.util.Interval
121
        """
122
        # only send result sets that have metrics
123
        if rset.nresults < 1:
124
            return
125
126
        # enqueue onto each writers queue, wait for any remaining time
127
        for wobj in self.writers:
128
            try:
129
                # allow queue puts to block for a short time
130
                # this cannot block for long as it will also block shutdown
131
                wobj.queue.put(rset, block=True, timeout=1)
132
            except Queue.Full:
133
                msg = "read: {0}: dropping metrics: writer plugin queue full: {1}"
134
                self.log.warn(msg.format(self._name, wobj.name))
135
136
137
    def read(self):
138
        """Calls the plugins onstart() function and then iterates over the
139
        plugins poll() function, pushing the result to each writer queue."""
140
        self.onstart()
141
142
        ## distribute metrics - don't everyone flood all at once
143
        time.sleep(random.randrange(int(self.pobj.config.get('delay.poll'))))
144
145
        # loop on calling the plugin objects poll and sending to writers
146
        while not self.stop_evt.is_set():
147
            # loop every self.interval seconds
148
            with plumd.util.Interval(self.interval, self.stop_evt) as lint:
149
                if self.stop_evt.is_set():
150
                    break
151
                rset = self.poll()
152
                self.write(rset, lint)
153
                # warn if loop time exceeded
154
                if lint.remaining < 0:
155
                    msg = "read: {0} reader loop time exceeded"
156
                    self.log.warn(msg.format(self._name))
157
        self.onstop()
158
159
160
    def start(self):
161
        """Calls pobj.onstart() and starts a thread to poll() metrics."""
162
        self.thread = threading.Thread(target=self.read, group=None)
163
        self.thread.start()
164
165
166
    def stop(self):
167
        """Stops the poll() thread and calls pobj.onstop()."""
168
        # set the stop event and the self.read thread falls through.
169
        self.stop_evt.set()
170
171
172
    def onstop(self):
173
        """Calls the plugins onstop callable."""
174
        # this can raise pretty much any exception
175
        try:
176
            self.pobj.onstop()
177
        except Exception as e:
178
            tb = traceback.format_exc()
179
            msg = "read: exception during onstop for reader {0}: {1}: {2}"
180
            self.log.error(msg.format(self._name, e, tb))
181
182
183
    def join(self, timeout):
184
        """Joins our thread."""
185
        if self.thread:
186
            self.thread.join()
187
188
189
    def __del__(self):
190
        """Logs __del__ calls."""
191
        self.log.debug("read: del: {0}".format(self._name))
192