Completed
Push — master ( da3b73...3279ec )
by Kenny
01:17
created

plumd.plugins.PluginWriter   A

Complexity

Total Complexity 17

Size/Duplication

Total Lines 141
Duplicated Lines 0 %
Metric Value
dl 0
loc 141
rs 10
wmc 17

9 Methods

Rating   Name   Duplication   Size   Complexity  
A PluginWriter.flush() 0 12 3
B PluginWriter.dequeue() 0 29 5
A PluginWriter.stop() 0 3 1
A PluginWriter.__del__() 0 3 1
A PluginWriter.name() 0 6 1
A PluginWriter.onstop() 0 10 2
A PluginWriter.start() 0 4 1
A PluginWriter.__init__() 0 22 1
A PluginWriter.join() 0 9 2
1
# -*- coding: utf-8 -*-
2
"""Defines a class to read from a queue and write to a writer plugin.
3
4
.. moduleauthor:: Kenny Freeman <[email protected]>
5
6
"""
7
__author__ = 'Kenny Freeman'
8
__email__ = '[email protected]'
9
__license__ = "ISCL"
10
__docformat__ = 'reStructuredText'
11
12
import Queue
13
import threading
14
import traceback
15
16
17
class PluginWriter(object):
18
    """Writer plugin helper that provides a queue for reader plugins to queue
19
    metrics onto. It runs a thread that writes to a plugin objects push().
20
21
    :param log: A logger
22
    :type log: logging.RootLogger
23
    :param pobj: An instance of a writer plugin.
24
    :type pobj: writer
25
    :param pname: The configured name of the plugin.
26
    :type pname: str
27
    :param qsize: The maximum queue size to set.
28
    :type qsize: int
29
    """
30
31
32
    def __init__(self, log, pobj, pname, qsize):
33
        """
34
        Writer plugin helper that provides a queue for reader plugins to queue
35
        metrics onto. It runs a thread that writes to a plugin objects push().
36
37
        :param log: A logger
38
        :type log: logging.RootLogger
39
        :param pobj: An instance of a writer plugin.
40
        :type pobj: writer
41
        :param pname: The configured name of the plugin.
42
        :type pname: str
43
        :param qsize: The maximum queue size to set.
44
        :type qsize: int
45
        """
46
        self.log = log
47
        self._name = pname
48
        # the queue readers enqueue metrics onto
49
        self.queue = Queue.Queue(maxsize=qsize)
50
        self.pobj = pobj
51
        # the thread that dequeues metrics to pboj.push
52
        self.thread = None
53
        self.stop_evt = threading.Event()
54
55
56
    @property
57
    def name(self):
58
        """Simply return our configured name.
59
        :rtype: str
60
        """
61
        return self._name
62
63
64
    def flush(self):
65
        """Flushes self.queue to self.pobj.push()."""
66
        while not self.queue.empty():
67
            rset = self.queue.get()
68
            # pretty much any exception can get raised in a plugins push call
69
            try:
70
                self.pobj.push(rset)
71
            except Exception as e:
72
                tb = traceback.format_exc()
73
                msg = "write: disabling writer {0}: exception in flush: {1}: {2}"
74
                self.log.debug(msg.format(self._name, e, tb))
75
                return
76
77
78
    def dequeue(self):
79
        """Dequeues from self.queue to self.pobj.push()."""
80
        # onstart needs to be called first
81
        try:
82
            self.pobj.onstart()
83
        except Exception as e:
84
            tb = traceback.format_exc()
85
            msg = "write: disabling writer {0}: exception in dequeue: {1}: {2}"
86
            self.log.error(msg.format(self._name, e, tb))
87
            self.queue = Queue.Queue(maxsize=1)
88
            return
89
90
        # can be stopped in between polls by setting self.run=>False
91
        while not self.stop_evt.is_set():
92
            rset = self.queue.get(block=True)
93
            # we push None values onto the queue when exiting
94
            if rset is None:
95
                msg = "write: dequeue for writer {0}: ignoring null value"
96
                self.log.debug(msg.format(self._name))
97
                continue
98
            try:
99
                self.pobj.push(rset)
100
            except Exception as e:
101
                tb = traceback.format_exc()
102
                msg = "write: disabling writer {0}: exception in push: {1}: {2}"
103
                self.log.debug(msg.format(self._name, e, tb))
104
                self.stop_evt.set()
105
                return
106
        self.onstop()
107
108
109
    def start(self):
110
        """Starts a writer thread that dequeues into pobj.push()."""
111
        self.thread = threading.Thread(target=self.dequeue, group=None)
112
        self.thread.start()
113
114
115
    def stop(self):
116
        """Stops our thread, flush()s and calls pobj.stop()."""
117
        self.stop_evt.set()
118
119
120
    def onstop(self):
121
        """Calls the plugins onstop callable."""
122
        self.flush()
123
        # this can raise pretty much any exception
124
        try:
125
            self.pobj.onstop()
126
        except Exception as e:
127
            tb = traceback.format_exc()
128
            msg = "write: writer {0}: exception in onstop: {1}: {2}"
129
            self.log.error(msg.format(self._name, e, tb))
130
131
132
    def join(self):
133
        """Joins our thread."""
134
        # this function assumes self.stop() has been called
135
        # first we need to put a null entry onto the queue to wake the thread
136
        # if it is waiting on a blocking call to self.queue.get()
137
        if self.thread:
138
            self.queue.put(None)
139
            self.queue.put(None)
140
            self.thread.join()
141
142
143
    def __del__(self):
144
        """Mark __del__ in logs."""
145
        self.log.debug("write: del: {0}".format(self._name))
146