|
@@ 106-120 (lines=15) @@
|
| 103 |
|
return |
| 104 |
|
|
| 105 |
|
# can be stopped in between polls by setting self.run=>False |
| 106 |
|
while not self.stop_evt.is_set(): |
| 107 |
|
rset = self.queue.get(block=True) |
| 108 |
|
# we push None values onto the queue when exiting |
| 109 |
|
if rset is None: |
| 110 |
|
msg = "write: dequeued None value for writer {0}" |
| 111 |
|
self.log.debug(msg.format(self._name)) |
| 112 |
|
continue |
| 113 |
|
try: |
| 114 |
|
self.pobj.push(rset) |
| 115 |
|
except Exception as e: |
| 116 |
|
tb = traceback.format_exc() |
| 117 |
|
msg = "write: disabling writer {0}: exception in push: {1}: {2}" |
| 118 |
|
self.log.debug(msg.format(self._name, e, tb)) |
| 119 |
|
self.stop_evt.set() |
| 120 |
|
return |
| 121 |
|
|
| 122 |
|
|
| 123 |
|
def start(self): |
|
@@ 77-90 (lines=14) @@
|
| 74 |
|
def flush(self): |
| 75 |
|
"""Flushes self.queue to self.pobj.push().""" |
| 76 |
|
start = time.time() |
| 77 |
|
while not self.queue.empty(): |
| 78 |
|
rset = self.queue.get() |
| 79 |
|
# pretty much any exception can get raised in a plugins push call |
| 80 |
|
try: |
| 81 |
|
self.pobj.push(rset) |
| 82 |
|
except Exception as e: |
| 83 |
|
tb = traceback.format_exc() |
| 84 |
|
msg = "write: writer {0}: exception in flush: {1}: {2}" |
| 85 |
|
self.log.debug(msg.format(self._name, e, tb)) |
| 86 |
|
return |
| 87 |
|
if time.time() - start > 1: |
| 88 |
|
msg = "write: flush: flush taking too long, dropping metrics" |
| 89 |
|
self.log.error(msg) |
| 90 |
|
break |
| 91 |
|
|
| 92 |
|
|
| 93 |
|
def dequeue(self): |