Completed
Branch master (7e8cc2)
by Kenny
03:21 queued 19s
created

PluginWriter.join()   C

Complexity

Conditions 7

Size

Total Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
dl 0
loc 32
rs 5.5
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
"""Defines a class to read from a queue and write to a writer plugin.
3
4
.. moduleauthor:: Kenny Freeman <[email protected]>
5
6
"""
7
__author__ = 'Kenny Freeman'
8
__email__ = '[email protected]'
9
__license__ = "ISCL"
10
__docformat__ = 'reStructuredText'
11
12
import sys
13
PY3 = sys.version_info > (3,)
14
15
if PY3:
16
    import queue as Queue
17
else:
18
    import Queue
19
20
import time
21
import threading
22
import traceback
23
24
import plumd
25
26
27
class PluginWriter(object):
28
    """Writer plugin helper that provides a queue for reader plugins to queue
29
    metrics onto. It runs a thread that writes to a plugin objects push().
30
31
    :param log: A logger
32
    :type log: logging.RootLogger
33
    :param pobj: An instance of a writer plugin.
34
    :type pobj: writer
35
    :param pname: The configured name of the plugin.
36
    :type pname: str
37
    :param qsize: The maximum queue size to set.
38
    :type qsize: int
39
    """
40
41
42
    def __init__(self, log, pobj, pname, qsize):
43
        """
44
        Writer plugin helper that provides a queue for reader plugins to queue
45
        metrics onto. It runs a thread that writes to a plugin objects push().
46
47
        :param log: A logger
48
        :type log: logging.RootLogger
49
        :param pobj: An instance of a writer plugin.
50
        :type pobj: writer
51
        :param pname: The configured name of the plugin.
52
        :type pname: str
53
        :param qsize: The maximum queue size to set.
54
        :type qsize: int
55
        """
56
        self.log = log
57
        self._name = pname
58
        # the queue readers enqueue metrics onto
59
        self.queue = Queue.Queue(maxsize=qsize)
60
        self.pobj = pobj
61
        # the thread that dequeues metrics to pboj.push
62
        self.thread = None
63
        self.stop_evt = threading.Event()
64
65
66
    @property
67
    def name(self):
68
        """Simply return our configured name.
69
        :rtype: str
70
        """
71
        return self._name
72
73
74
    def flush(self):
75
        """Flushes self.queue to self.pobj.push()."""
76
        start = time.time()
77 View Code Duplication
        while not self.queue.empty():
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
78
            rset = self.queue.get()
79
            # pretty much any exception can get raised in a plugins push call
80
            try:
81
                self.pobj.push(rset)
82
            except Exception as e:
83
                tb = traceback.format_exc()
84
                msg = "write: writer {0}: exception in flush: {1}: {2}"
85
                self.log.debug(msg.format(self._name, e, tb))
86
                return
87
            if time.time() - start > 1:
88
                msg = "write: flush: flush taking too long, dropping metrics"
89
                self.log.error(msg)
90
                break
91
92
93
    def dequeue(self):
94
        """Dequeues from self.queue to self.pobj.push()."""
95
        # onstart needs to be called first
96
        try:
97
            self.pobj.onstart()
98
        except Exception as e:
99
            tb = traceback.format_exc()
100
            msg = "write: disabling writer {0}: exception in onstart: {1}: {2}"
101
            self.log.error(msg.format(self._name, e, tb))
102
            self.queue = Queue.Queue(maxsize=1)
103
            return
104
105
        # can be stopped in between polls by setting self.run=>False
106 View Code Duplication
        while not self.stop_evt.is_set():
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
107
            rset = self.queue.get(block=True)
108
            # we push None values onto the queue when exiting
109
            if rset is None:
110
                msg = "write: dequeued None value for writer {0}"
111
                self.log.debug(msg.format(self._name))
112
                continue
113
            try:
114
                self.pobj.push(rset)
115
            except Exception as e:
116
                tb = traceback.format_exc()
117
                msg = "write: disabling writer {0}: exception in push: {1}: {2}"
118
                self.log.debug(msg.format(self._name, e, tb))
119
                self.stop_evt.set()
120
                return
121
122
123
    def start(self):
124
        """Starts a writer thread that dequeues into pobj.push()."""
125
        self.thread = threading.Thread(target=self.dequeue, group=None)
126
        self.thread.start()
127
        self.log.debug("Writer {0} thread started".format(self._name))
128
        if not self.thread.is_alive():
129
            err = "Writer {0} thread failed to start".format(self._name)
130
            raise plumd.PluginRuntimeError(err)
131
132
133
    def stop(self):
134
        """Stops our thread, flush()s and calls pobj.stop()."""
135
        self.stop_evt.set()
136
137
138
    def onstop(self):
139
        """Calls the plugins onstop callable."""
140
        self.flush()
141
        # this can raise pretty much any exception
142
        try:
143
            self.pobj.onstop()
144
        except Exception as e:
145
            tb = traceback.format_exc()
146
            msg = "write: writer {0}: exception in onstop: {1}: {2}"
147
            self.log.error(msg.format(self._name, e, tb))
148
149
150
    def join(self, timeout):
151
        """Joins our thread."""
152
        # this function assumes self.stop() has been called
153
        # first we need to put a null entry onto the queue to wake the thread
154
        # if it is waiting on a blocking call to self.queue.get()
155
        if self.thread:
156
            # give the writer a short time to drain the queue
157
            # then drop all metrics and force an exit
158
            try:
159
                self.queue.put(None, block=False)
160
                self.queue.put(None, block=False)
161
            except Queue.Full:
162
                msg = "write: writer {0} queue full on stop"
163
                self.log.warn(msg.format(self._name))
164
            # give the writer some time to flush it's queue
165
            self.thread.join(timeout)
166
            if not self.thread.isAlive():
167
                return
168
            # if it's still running then forcibly empty its queue
169
            if not self.queue.empty():
170
                msg = "write: timeout exceeded: dropping metrics: writer {0}"
171
                self.log.error(msg.format(self._name))
172
                while not self.queue.empty():
173
                    self.queue.get(block=False)
174
            # now put None values onto the queue to force a loop and exit
175
            try:
176
                self.queue.put(None, block=False)
177
                self.queue.put(None, block=False)
178
            except Queue.Full:
179
                pass
180
            # the thread should exit now
181
            self.thread.join()
182
183
184
    def __del__(self):
185
        """Mark __del__ in logs."""
186
        self.log.debug("write: del: {0}".format(self._name))
187