Completed
Push — master ( 3a6f3a...76f5e2 )
by Kenny
01:26
created

Writer.__init__()   A

Complexity

Conditions 1

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 14
rs 9.4285
1
# -*- coding: utf-8 -*-
2
"""Abstract classes that define the reader and writer plugin base classes.
3
4
.. moduleauthor:: Kenny Freeman <[email protected]>
5
6
"""
7
__author__ = 'Kenny Freeman'
8
__email__ = '[email protected]'
9
__license__ = "ISCL"
10
__docformat__ = 'reStructuredText'
11
12
import time
13
import random
14
import threading
15
from collections import deque
16
from abc import ABCMeta, abstractmethod
17
18
from plumd import NotImplementedError
19
from plumd.util import Interval
20
21
22
class Plugin(object):
23
    """The base class for all plugins.
24
25
    :param config: an instance of plumd.conf configuration helper
26
    :type config: conf
27
    """
28
    __metaclass__ = ABCMeta
29
    defaults = {}
30
31
    def __init__(self, log, config):
32
        """reader abstract class constructor - each plugin must sub class.
33
34
        :param config: an instance of plumd.conf configuration helper
35
        :type config: conf
36
        """
37
        self.config = config
38
        self.log = log
39
        # plugins can set their own defaults byt defining self.defaults
40
        self.config.defaults(self.defaults)
41
        # all plugins must be given a unique name
42
        self.name = config.get('name', 'unknown')
43
        # thread for reading or writing
44
        self.thread = None
45
        self.stop_evt = threading.Event()
46
47
48
    def __str__(self):
49
        """Return a human readable str.
50
        :rtype: str"""
51
        s = "Plugin(class={1})".format(self.__class__.__name__)
52
        return s
53
54
55
    def __repr__(self):
56
        """Return a human readable str.
57
        :rtype: str"""
58
        s = "Plugin(class={1})".format(self.__class__.__name__)
59
        return s
60
61
62
    def start(self):
63
        """Start run() in a new thread if not already running."""
64
        if self.thread is None or not self.thread.is_alive():
65
            self.thread = threading.Thread(target=self.run, group=None)
66
            self.thread.start()
67
            self.log.debug("Plugin {0} started".format(self.name))
68
        else:
69
            self.log.debug("Plugin {0} already running".format(self.name))
70
71
72
    def stop(self):
73
        """Set our stop_evt Event."""
74
        # set the stop event and the self.read thread falls through.
75
        self.stop_evt.set()
76
77
78
    def join(self, timeout):
79
        """Joins our thread."""
80
        if self.thread:
81
            self.thread.join(timeout)
82
            if self.thread.is_alive():
83
                msg = "Plugin {0} shutdown timeout - thread blocking exit"
84
                self.log.error(msg.format(self.name))
85
86
87
    def onstart(self):
88
        """onstart() is called before the first call to push()."""
89
        self.log.debug("Plugin: onstart: {0}".format(self.config.get('name')))
90
91
92
    def onstop(self):
93
        """onstop() is called just before the main process exits."""
94
        self.log.debug("Plugin: onstop: {0}".format(self.config.get('name')))
95
96
97
    def onreload(self):
98
        """onreload() is called to reload configuration at runtime."""
99
        raise NotImplementedError("todo")
100
101
102
    def __del__(self):
103
        """Logs __del__ calls."""
104
        self.log.debug("Plugin: {0} __del__".format(self.name))
105
106
107
class Reader(Plugin):
108
    """An abstract base class that all writer pluguns subclass."""
109
    __metaclass__ = ABCMeta
110
111
    defaults = {
112
        'poll.interval': 30,    # seconds between polls
113
        'delay.poll': 5         # random delay before first poll
114
    }
115
116
    def __init__(self, log, config):
117
        """Reader plugin abstract class.
118
119
        :param log: A logger
120
        :type log: logging.RootLogger
121
        :param config: a plumd.config.Conf configuration helper instance.
122
        :type config: plumd.config.Conf
123
        """
124
        super(Reader, self).__init__(log, config)
125
        self.config.defaults(Reader.defaults)
126
        # list of queues to write to
127
        self.queues = []
128
129
130
    def run(self):
131
        """Calls the plugins onstart() function and then iterates over the
132
        plugins poll() function, pushing the result to each writer queue.
133
134
        When complete it calls the plugins onstop() function."""
135
        # starting up, call onstart
136
        self.onstart()
137
138
        ## distribute metrics - don't everyone flood all at once
139
        time.sleep(random.randrange(int(self.config.get('delay.poll'))))
140
141
        # save a few cycles
142
        tloop = self.config.get('poll.interval')
143
        evt = self.stop_evt
144
        queues = self.queues
145
146
        # loop on calling the plugin objects poll and sending to writers
147
        while not evt.is_set():
148
149
            # loop every self.interval seconds
150
            with Interval(tloop, evt) as lint:
151
152
                # poll for metric result set
153
                rset = self.poll()
154
155
                # readers can return None values eg. a reader that polls
156
                # very often and periodically returns results
157
                if rset is None:
158
                    continue
159
160
                # write the value to each of our writer queues directly
161
                for queue_evt, queue in queues:
162
                    queue.append(rset)
163
                    queue_evt.set()
164
165
        # all done, call onstop
166
        self.onstop()
167
168
169
    @abstractmethod
170
    def poll(self):
171
        """Reader plugins must define this method, it measures and returns a
172
        metrics string.
173
174
        format <name:str> <value:float|int> <timestamp:time.time()>\n[...]
175
        set to change to a python object.
176
177
        :rtype: str
178
        """
179
        # this will never get called
180
        raise NotImplementedError("poll() not defined")
181
182
183
class Writer(Plugin):
184
    """An abstract base class that all writer pluguns subclass."""
185
    __metaclass__ = ABCMeta
186
187
    defaults = {
188
        'maxqueue': 8192
189
    }
190
191
    def __init__(self, log, config):
192
        """Reader plugin abstract class.
193
194
        :param log: A logger
195
        :type log: logging.RootLogger
196
        :param config: a plumd.config.Conf configuration helper instance.
197
        :type config: plumd.config.Conf
198
        """
199
        super(Writer, self).__init__(log, config)
200
        self.config.defaults(Writer.defaults)
201
        # list of queues to write to
202
        self.queue = deque()
203
        # Event signaling items on queue
204
        self.queue_evt = threading.Event()
205
206
207
    def run(self):
208
        """Calls the plugins onstart() function and then consumes
209
        ResultSets from self.queue, calling self.push() for each.
210
211
        When complete it calls the plugins onstop() function."""
212
        # starting up, call onstart
213
        self.onstart()
214
215
        # save a few cycles
216
        stop_evt = self.stop_evt
217
        queue_evt = self.queue_evt
218
        queue = self.queue
219
220
        # loop on popping from queue and calling self.push()
221
        while not self.stop_evt.is_set():
222
            # wait for a ResultSet
223
            queue_evt.wait()
224
            # clear the event - race condition however
225
            # next reader poll() will set the queue_evt again
226
            queue_evt.clear()
227
            try:
228
                # write it
229
                self.push(queue.popleft())
230
            except IndexError as e:
231
                self.log.debug("Writer: {0}: queue empty".format(self.name))
232
                pass
233
234
        # all done, call onstop
235
        self.onstop()
236
237
238
    def stop(self):
239
        """Set our stop_evt and queue_evt Events."""
240
        # set the stop event and the self.read thread falls through.
241
        self.stop_evt.set()
242
        self.queue_evt.set()
243
244
245
    @abstractmethod
246
    def push(self, rset):
247
        """Writer plugins must define this method, it accepts a ResultSet.
248
249
        :param rset: ResultSet from a readers poll() call.
250
        :type rset: plumd.ResultSet
251
        :type metrics: str
252
        """
253
        # this will never get called
254
        raise NotImplementedError("push() not defined")
255