Completed
Push — master ( 3279ec...8d12dc )
by Kenny
01:09
created

PluginWriter.join()   C

Complexity

Conditions 7

Size

Total Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
dl 0
loc 32
rs 5.5
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
"""Defines a class to read from a queue and write to a writer plugin.
3
4
.. moduleauthor:: Kenny Freeman <[email protected]>
5
6
"""
7
__author__ = 'Kenny Freeman'
8
__email__ = '[email protected]'
9
__license__ = "ISCL"
10
__docformat__ = 'reStructuredText'
11
12
import time
13
import Queue
14
import threading
15
import traceback
16
17
18
class PluginWriter(object):
19
    """Writer plugin helper that provides a queue for reader plugins to queue
20
    metrics onto. It runs a thread that writes to a plugin objects push().
21
22
    :param log: A logger
23
    :type log: logging.RootLogger
24
    :param pobj: An instance of a writer plugin.
25
    :type pobj: writer
26
    :param pname: The configured name of the plugin.
27
    :type pname: str
28
    :param qsize: The maximum queue size to set.
29
    :type qsize: int
30
    """
31
32
33
    def __init__(self, log, pobj, pname, qsize):
34
        """
35
        Writer plugin helper that provides a queue for reader plugins to queue
36
        metrics onto. It runs a thread that writes to a plugin objects push().
37
38
        :param log: A logger
39
        :type log: logging.RootLogger
40
        :param pobj: An instance of a writer plugin.
41
        :type pobj: writer
42
        :param pname: The configured name of the plugin.
43
        :type pname: str
44
        :param qsize: The maximum queue size to set.
45
        :type qsize: int
46
        """
47
        self.log = log
48
        self._name = pname
49
        # the queue readers enqueue metrics onto
50
        self.queue = Queue.Queue(maxsize=qsize)
51
        self.pobj = pobj
52
        # the thread that dequeues metrics to pboj.push
53
        self.thread = None
54
        self.stop_evt = threading.Event()
55
56
57
    @property
58
    def name(self):
59
        """Simply return our configured name.
60
        :rtype: str
61
        """
62
        return self._name
63
64
65 View Code Duplication
    def flush(self):
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
66
        """Flushes self.queue to self.pobj.push()."""
67
        start = time.time()
68
        while not self.queue.empty():
69
            rset = self.queue.get()
70
            # pretty much any exception can get raised in a plugins push call
71
            try:
72
                self.pobj.push(rset)
73
            except Exception as e:
74
                tb = traceback.format_exc()
75
                msg = "write: writer {0}: exception in flush: {1}: {2}"
76
                self.log.debug(msg.format(self._name, e, tb))
77
                return
78
            if time.time() - start > 1:
79
                msg = "write: flush: flush taking too long, dropping metrics"
80
                self.log.error(msg)
81
                break
82
83
84
    def dequeue(self):
85
        """Dequeues from self.queue to self.pobj.push()."""
86
        # onstart needs to be called first
87
        try:
88
            self.pobj.onstart()
89
        except Exception as e:
90
            tb = traceback.format_exc()
91
            msg = "write: disabling writer {0}: exception in onstart: {1}: {2}"
92
            self.log.error(msg.format(self._name, e, tb))
93
            self.queue = Queue.Queue(maxsize=1)
94
            return
95
96
        # can be stopped in between polls by setting self.run=>False
97
        while not self.stop_evt.is_set():
98
            rset = self.queue.get(block=True)
99
            # we push None values onto the queue when exiting
100
            if rset is None:
101
                msg = "write: dequeued None value for writer {0}"
102
                self.log.debug(msg.format(self._name))
103
                continue
104
            try:
105
                self.pobj.push(rset)
106
            except Exception as e:
107
                tb = traceback.format_exc()
108
                msg = "write: disabling writer {0}: exception in push: {1}: {2}"
109
                self.log.debug(msg.format(self._name, e, tb))
110
                self.stop_evt.set()
111
                return
112
113
        # now call the plugins onstop
114
        self.onstop()
115
116
117
    def start(self):
118
        """Starts a writer thread that dequeues into pobj.push()."""
119
        self.thread = threading.Thread(target=self.dequeue, group=None)
120
        self.thread.start()
121
122
123
    def stop(self):
124
        """Stops our thread, flush()s and calls pobj.stop()."""
125
        self.stop_evt.set()
126
127
128
    def onstop(self):
129
        """Calls the plugins onstop callable."""
130
        self.flush()
131
        # this can raise pretty much any exception
132
        try:
133
            self.pobj.onstop()
134
        except Exception as e:
135
            tb = traceback.format_exc()
136
            msg = "write: writer {0}: exception in onstop: {1}: {2}"
137
            self.log.error(msg.format(self._name, e, tb))
138
139
    def join(self, timeout):
140
        """Joins our thread."""
141
        # this function assumes self.stop() has been called
142
        # first we need to put a null entry onto the queue to wake the thread
143
        # if it is waiting on a blocking call to self.queue.get()
144
        if self.thread:
145
            # give the writer a short time to drain the queue
146
            # then drop all metrics and force an exit
147
            try:
148
                self.queue.put(None, block=False)
149
                self.queue.put(None, block=False)
150
            except Queue.Full as e:
151
                msg = "write: writer {0} queue full on stop"
152
                self.log.warn(msg.format(self._name))
153
            # give the writer some time to flush it's queue
154
            self.thread.join(timeout)
155
            if not self.thread.isAlive():
156
                return
157
            # if it's still running then forcibly empty its queue
158
            if not self.queue.empty():
159
                msg = "write: timeout exceeded: dropping metrics: writer {0}"
160
                self.log.error(msg.format(self._name))
161
                while not self.queue.empty():
162
                    self.queue.get(block=False)
163
            # now put None values onto the queue to force a loop and exit
164
            try:
165
                self.queue.put(None, block=False)
166
                self.queue.put(None, block=False)
167
            except Queue.Full as e:
168
                pass
169
            # the thread should exit now
170
            self.thread.join()
171
172
173
    def __del__(self):
174
        """Mark __del__ in logs."""
175
        self.log.debug("write: del: {0}".format(self._name))
176