@@ 65-81 (lines=17) @@ | ||
62 | return self._name |
|
63 | ||
64 | ||
65 | def flush(self): |
|
66 | """Flushes self.queue to self.pobj.push().""" |
|
67 | start = time.time() |
|
68 | while not self.queue.empty(): |
|
69 | rset = self.queue.get() |
|
70 | # pretty much any exception can get raised in a plugins push call |
|
71 | try: |
|
72 | self.pobj.push(rset) |
|
73 | except Exception as e: |
|
74 | tb = traceback.format_exc() |
|
75 | msg = "write: writer {0}: exception in flush: {1}: {2}" |
|
76 | self.log.debug(msg.format(self._name, e, tb)) |
|
77 | return |
|
78 | if time.time() - start > 1: |
|
79 | msg = "write: flush: flush taking too long, dropping metrics" |
|
80 | self.log.error(msg) |
|
81 | break |
|
82 | ||
83 | ||
84 | def dequeue(self): |
@@ 83-104 (lines=22) @@ | ||
80 | return |
|
81 | ||
82 | ||
83 | def poll(self): |
|
84 | """Calls the plugins poll() function and either returns a valid |
|
85 | :class:`plumd.ResultSet` or None. |
|
86 | ||
87 | :rtype: plumd.ResultSet |
|
88 | """ |
|
89 | rset = None |
|
90 | # plugins can raise pretty much any exception, wrap in catch all try |
|
91 | try: |
|
92 | rset = self.pobj.poll() |
|
93 | if not isinstance(rset, plumd.ResultSet): |
|
94 | msg = "read: invalid result set from reader {0}: {1}" |
|
95 | self.log.error(msg.format(self._name, rset)) |
|
96 | rset = plumd.ResultSet() |
|
97 | except Exception as e: |
|
98 | # print a traceback and ensure the reader thread exits |
|
99 | tb = traceback.format_exc() |
|
100 | msg="read: disabling reader {0} due to exception in poll: {1} : {2}" |
|
101 | self.log.error(msg.format(self._name, e, tb)) |
|
102 | self.stop_evt.set() |
|
103 | rset = plumd.ResultSet() |
|
104 | return rset |
|
105 | ||
106 | ||
107 | def write(self, rset): |