Total Complexity | 25 |
Total Lines | 175 |
Duplicated Lines | 12.57 % |
Changes | 1 | ||
Bugs | 1 | Features | 1 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
1 | # -*- coding: utf-8 -*- |
||
22 | class PluginReader(object): |
||
23 | """Reads from a plugin instance by calling pobj.poll() and writes |
||
24 | to a list of PluginWriters by calling <writerobj>.queue.put() - loops |
||
25 | on a configured interval. |
||
26 | |||
27 | :param log: A logger |
||
28 | :type log: logging.get_logger() |
||
29 | :param pobj: An instance of a Reader plugin. |
||
30 | :type pobj: Reader |
||
31 | :param pname: The configured name of the plugin. |
||
32 | :type pname: str |
||
33 | :param interval: The conigured polling interval for thep plugin. |
||
34 | :type interval: int |
||
35 | :raises: ConfigError, PluginLoadError |
||
36 | """ |
||
37 | |||
38 | |||
39 | def __init__(self, log, pobj, pname): |
||
40 | """Reads from a plugin instance by calling pobj.poll() and writes |
||
41 | to a list of PluginWriters by calling <writerobj>.queue.put() - loops |
||
42 | on a configured interval. |
||
43 | |||
44 | :param log: A logger |
||
45 | :type log: structlog.get_logger() |
||
46 | :param pobj: An instance of a Reader plugin. |
||
47 | :type pobj: Reader |
||
48 | :param pname: The configured name of the plugin. |
||
49 | :type pname: str |
||
50 | :raises: ConfigError, PluginLoadError |
||
51 | """ |
||
52 | self.log = log |
||
53 | # poll() interval - get from plugins configuration |
||
54 | self.interval = pobj.config.get('poll.interval') |
||
55 | self._name = pname |
||
56 | self.pobj = pobj |
||
57 | # what writers should we write metrics to? |
||
58 | self.writers = [] |
||
59 | # thread that polls for metrics and writes to writers |
||
60 | self.thread = None |
||
61 | self.stop_evt = threading.Event() |
||
62 | |||
63 | |||
64 | @property |
||
65 | def name(self): |
||
66 | """Simply return our configured name. |
||
67 | :rtype: str |
||
68 | """ |
||
69 | return self._name |
||
70 | |||
71 | |||
72 | def onstart(self): |
||
73 | """Call the plugins onstart function.""" |
||
74 | try: |
||
75 | self.pobj.onstart() |
||
76 | except Exception as e: |
||
77 | tb = traceback.format_exc() |
||
78 | msg = "read: disabling reader {0}: exception in onstart: {1}: {2}" |
||
79 | self.log.error(msg.format(self._name, e, tb)) |
||
80 | return |
||
81 | |||
82 | |||
83 | View Code Duplication | def poll(self): |
|
1 ignored issue
–
show
|
|||
84 | """Calls the plugins poll() function and either returns a valid |
||
85 | :class:`plumd.ResultSet` or None. |
||
86 | |||
87 | :rtype: plumd.ResultSet |
||
88 | """ |
||
89 | rset = None |
||
90 | # plugins can raise pretty much any exception, wrap in catch all try |
||
91 | try: |
||
92 | rset = self.pobj.poll() |
||
93 | if not isinstance(rset, plumd.ResultSet): |
||
94 | msg = "read: invalid result set from reader {0}: {1}" |
||
95 | self.log.error(msg.format(self._name, rset)) |
||
96 | rset = plumd.ResultSet() |
||
97 | except Exception as e: |
||
98 | # print a traceback and ensure the reader thread exits |
||
99 | tb = traceback.format_exc() |
||
100 | msg="read: disabling reader {0} due to exception in poll: {1} : {2}" |
||
101 | self.log.error(msg.format(self._name, e, tb)) |
||
102 | self.stop_evt.set() |
||
103 | rset = plumd.ResultSet() |
||
104 | return rset |
||
105 | |||
106 | |||
107 | def write(self, rset): |
||
108 | """Iterates through each writer plugin and writes the |
||
109 | :class:`plumd.ResultSet` to the writers input queue. Uses a |
||
110 | :class:`plumd.utils.Interval` to determine if it will block when |
||
111 | pushing to a writers queue. If there is time remaining in the current |
||
112 | poll interval it will block. Also, if a writers queue is full it will |
||
113 | drop the metric for that writer. Future versions may use persistance |
||
114 | to queue result sets to disk if the backend metrics services are down. |
||
115 | |||
116 | :param rset: a result set from a plugin poll() call. |
||
117 | :type rset: plumd.ResultSet |
||
118 | :param lint: an interval helper. |
||
119 | :type lint: plumd.util.Interval |
||
120 | """ |
||
121 | # only send result sets that have metrics |
||
122 | if rset.nresults < 1: |
||
123 | return |
||
124 | |||
125 | # enqueue onto each writers queue, wait for any remaining time |
||
126 | for wobj in self.writers: |
||
127 | try: |
||
128 | # allow queue puts to block for a short time |
||
129 | # this cannot block for long as it will also block shutdown |
||
130 | wobj.queue.put(rset, block=True, timeout=1) |
||
131 | except Queue.Full: |
||
132 | msg = "read: {0}: dropping metrics: writer plugin queue full: {1}" |
||
133 | self.log.warn(msg.format(self._name, wobj.name)) |
||
134 | |||
135 | |||
136 | def read(self): |
||
137 | """Calls the plugins onstart() function and then iterates over the |
||
138 | plugins poll() function, pushing the result to each writer queue.""" |
||
139 | self.onstart() |
||
140 | |||
141 | ## distribute metrics - don't everyone flood all at once |
||
142 | time.sleep(random.randrange(int(self.pobj.config.get('delay.poll')))) |
||
143 | |||
144 | # loop on calling the plugin objects poll and sending to writers |
||
145 | while not self.stop_evt.is_set(): |
||
146 | # loop every self.interval seconds |
||
147 | with plumd.util.Interval(self.interval, self.stop_evt) as lint: |
||
148 | if self.stop_evt.is_set(): |
||
149 | break |
||
150 | rset = self.poll() |
||
151 | # readers can return None values eg. a reader that polls |
||
152 | # very often and periodically returns results |
||
153 | if rset: |
||
154 | self.write(rset) |
||
155 | # warn if loop time exceeded |
||
156 | if lint.remaining < 0: |
||
157 | msg = "read: {0} reader loop time exceeded" |
||
158 | self.log.warn(msg.format(self._name)) |
||
159 | self.onstop() |
||
160 | |||
161 | |||
162 | def start(self): |
||
163 | """Calls pobj.onstart() and starts a thread to poll() metrics.""" |
||
164 | self.thread = threading.Thread(target=self.read, group=None) |
||
165 | self.thread.start() |
||
166 | |||
167 | |||
168 | def stop(self): |
||
169 | """Stops the poll() thread and calls pobj.onstop().""" |
||
170 | # set the stop event and the self.read thread falls through. |
||
171 | self.stop_evt.set() |
||
172 | |||
173 | |||
174 | def onstop(self): |
||
175 | """Calls the plugins onstop callable.""" |
||
176 | # this can raise pretty much any exception |
||
177 | try: |
||
178 | self.pobj.onstop() |
||
179 | except Exception as e: |
||
180 | tb = traceback.format_exc() |
||
181 | msg = "read: exception during onstop for reader {0}: {1}: {2}" |
||
182 | self.log.error(msg.format(self._name, e, tb)) |
||
183 | |||
184 | |||
185 | def join(self, timeout): |
||
186 | """Joins our thread.""" |
||
187 | if self.thread: |
||
188 | self.thread.join(timeout) |
||
189 | if self.thread.isAlive(): |
||
190 | msg = "read: plugin {0} shutdown timeout - blocking exit" |
||
191 | self.log.error(msg.format(self._name)) |
||
192 | |||
193 | |||
194 | def __del__(self): |
||
195 | """Logs __del__ calls.""" |
||
196 | self.log.debug("read: del: {0}".format(self._name)) |
||
197 |