@@ 784-824 (lines=41) @@ | ||
781 | err = "attempt to remove non-existant render: {0}" |
|
782 | self.log.error(err.format(exc)) |
|
783 | ||
784 | def run(self): |
|
785 | """Start reading from the reader Plugin after calling its onstart. |
|
786 | ||
787 | Call stop() to get this thread to exit - before returning it will |
|
788 | call the readers onstop(). |
|
789 | """ |
|
790 | if not self.queue or not self.render: |
|
791 | err = "output queue/render not set for reader: {0}" |
|
792 | raise PluginRuntimeError(err.format(self.plugin.name)) |
|
793 | ||
794 | # starting up, call onstart |
|
795 | self.plugin.onstart() |
|
796 | ||
797 | # distribute metrics - don't everyone flood all at once |
|
798 | time.sleep(random.randrange(int(self.start_delay))) |
|
799 | ||
800 | # save a few cycles |
|
801 | tloop = self.plugin.interval |
|
802 | evt = self.stop_evt |
|
803 | poll = self.plugin.poll |
|
804 | ||
805 | # loop on calling the plugin objects poll and sending to writers |
|
806 | while not evt.is_set(): |
|
807 | ||
808 | # loop every self.interval seconds |
|
809 | with Interval(tloop, evt): |
|
810 | ||
811 | # poll for metric result set |
|
812 | rset = poll() |
|
813 | ||
814 | # readers can return None values eg. a reader that polls |
|
815 | # very often and periodically returns results |
|
816 | if rset is None: |
|
817 | continue |
|
818 | ||
819 | # write the value to each of our output queues directly |
|
820 | for render in self.renders: |
|
821 | render.process(rset.results) |
|
822 | ||
823 | # all done, call onstop |
|
824 | self.plugin.onstop() |
|
825 | ||
826 | ||
827 | class WriterThread(PluginThread): |
|
@@ 881-913 (lines=33) @@ | ||
878 | for i in range(0, 10): |
|
879 | self.queue.put_nowait(None) |
|
880 | ||
881 | def run(self): |
|
882 | """Start writing metrics to self.writer from self.queue. |
|
883 | ||
884 | First calls self.writer.onstart(). |
|
885 | ||
886 | Call stop() to get this thread to exit - before returning it will |
|
887 | call the writers onstop(). |
|
888 | """ |
|
889 | ||
890 | if not self.queue or not self.render: |
|
891 | err = "input queue/render not set for writer: {0}" |
|
892 | raise PluginRuntimeError(err.format(self.plugin.name)) |
|
893 | ||
894 | # starting up, call onstart |
|
895 | self.plugin.onstart() |
|
896 | ||
897 | # save a few cycles |
|
898 | stop_evt = self.stop_evt |
|
899 | write = self.plugin.write |
|
900 | ||
901 | # loop on popping from queue and calling self.push() |
|
902 | while not stop_evt.is_set(): |
|
903 | entry = self.queue.get() |
|
904 | if entry: |
|
905 | write(entry) |
|
906 | ||
907 | # flush any partially rendered metrics on shutdown |
|
908 | partial = self.render.flush() |
|
909 | if partial: |
|
910 | self.plugin.flush(partial) |
|
911 | ||
912 | # all done, call onstop |
|
913 | self.plugin.onstop() |