|
@@ 784-824 (lines=41) @@
|
| 781 |
|
err = "attempt to remove non-existant render: {0}" |
| 782 |
|
self.log.error(err.format(exc)) |
| 783 |
|
|
| 784 |
|
def run(self): |
| 785 |
|
"""Start reading from the reader Plugin after calling its onstart. |
| 786 |
|
|
| 787 |
|
Call stop() to get this thread to exit - before returning it will |
| 788 |
|
call the readers onstop(). |
| 789 |
|
""" |
| 790 |
|
if not self.queue or not self.render: |
| 791 |
|
err = "output queue/render not set for reader: {0}" |
| 792 |
|
raise PluginRuntimeError(err.format(self.plugin.name)) |
| 793 |
|
|
| 794 |
|
# starting up, call onstart |
| 795 |
|
self.plugin.onstart() |
| 796 |
|
|
| 797 |
|
# distribute metrics - don't everyone flood all at once |
| 798 |
|
time.sleep(random.randrange(int(self.start_delay))) |
| 799 |
|
|
| 800 |
|
# save a few cycles |
| 801 |
|
tloop = self.plugin.interval |
| 802 |
|
evt = self.stop_evt |
| 803 |
|
poll = self.plugin.poll |
| 804 |
|
|
| 805 |
|
# loop on calling the plugin objects poll and sending to writers |
| 806 |
|
while not evt.is_set(): |
| 807 |
|
|
| 808 |
|
# loop every self.interval seconds |
| 809 |
|
with Interval(tloop, evt): |
| 810 |
|
|
| 811 |
|
# poll for metric result set |
| 812 |
|
rset = poll() |
| 813 |
|
|
| 814 |
|
# readers can return None values eg. a reader that polls |
| 815 |
|
# very often and periodically returns results |
| 816 |
|
if rset is None: |
| 817 |
|
continue |
| 818 |
|
|
| 819 |
|
# write the value to each of our output queues directly |
| 820 |
|
for render in self.renders: |
| 821 |
|
render.process(rset.results) |
| 822 |
|
|
| 823 |
|
# all done, call onstop |
| 824 |
|
self.plugin.onstop() |
| 825 |
|
|
| 826 |
|
|
| 827 |
|
class WriterThread(PluginThread): |
|
@@ 881-913 (lines=33) @@
|
| 878 |
|
for i in range(0, 10): |
| 879 |
|
self.queue.put_nowait(None) |
| 880 |
|
|
| 881 |
|
def run(self): |
| 882 |
|
"""Start writing metrics to self.writer from self.queue. |
| 883 |
|
|
| 884 |
|
First calls self.writer.onstart(). |
| 885 |
|
|
| 886 |
|
Call stop() to get this thread to exit - before returning it will |
| 887 |
|
call the writers onstop(). |
| 888 |
|
""" |
| 889 |
|
|
| 890 |
|
if not self.queue or not self.render: |
| 891 |
|
err = "input queue/render not set for writer: {0}" |
| 892 |
|
raise PluginRuntimeError(err.format(self.plugin.name)) |
| 893 |
|
|
| 894 |
|
# starting up, call onstart |
| 895 |
|
self.plugin.onstart() |
| 896 |
|
|
| 897 |
|
# save a few cycles |
| 898 |
|
stop_evt = self.stop_evt |
| 899 |
|
write = self.plugin.write |
| 900 |
|
|
| 901 |
|
# loop on popping from queue and calling self.push() |
| 902 |
|
while not stop_evt.is_set(): |
| 903 |
|
entry = self.queue.get() |
| 904 |
|
if entry: |
| 905 |
|
write(entry) |
| 906 |
|
|
| 907 |
|
# flush any partially rendered metrics on shutdown |
| 908 |
|
partial = self.render.flush() |
| 909 |
|
if partial: |
| 910 |
|
self.plugin.flush(partial) |
| 911 |
|
|
| 912 |
|
# all done, call onstop |
| 913 |
|
self.plugin.onstop() |