@@ 784-824 (lines=41) @@ | ||
781 | @property |
|
782 | def nwriters(self): |
|
783 | """Return the number of loaded writer plugins. |
|
784 | ||
785 | :rtype: int |
|
786 | """ |
|
787 | return len(list(self.writers.keys())) |
|
788 | ||
789 | def load(self): |
|
790 | """Load the configured plugin objects into self.readers and |
|
791 | self.writers. Set the writers for each reader plugin. |
|
792 | ||
793 | raises: |
|
794 | ConfigError if ptype has an incorrect configuration path set |
|
795 | ConfigError if a plugin configuration is missing 'name' |
|
796 | DuplicatePlugin if a plugin configuration has a duplicate 'name' |
|
797 | PluginLoadError if there was an error loading a plugin |
|
798 | ||
799 | :raises: ConfigError, DuplicatePlugin |
|
800 | """ |
|
801 | # get plugin configuration directory |
|
802 | pdir = self.config.get('config.plugins') |
|
803 | if not os.path.isdir(pdir): |
|
804 | msg = "invalid plugin directory configured: {0}" |
|
805 | raise ConfigError(msg.format(pdir)) |
|
806 | ||
807 | # load all plugins described by the plugin configurations |
|
808 | msg = "loading plugins from: {0}" |
|
809 | self.log.info(msg.format(self.config.get('config.plugins'))) |
|
810 | pobjs = load_all_plugins(self.log, self.config) |
|
811 | ||
812 | # load_all_plugins returns a tuple of reader, writer objects |
|
813 | self.readers, self.writers = pobjs |
|
814 | ||
815 | # now the readers need to know what writers to write to |
|
816 | config_plugin_writers(self) |
|
817 | ||
818 | def start(self): |
|
819 | """Start each reader/writer.""" |
|
820 | self.log.debug("starting") |
|
821 | ||
822 | # start the writers then the readers |
|
823 | for pdict in [self.writers, self.readers]: |
|
824 | for pobj in pdict.values(): |
|
825 | self.log.debug("starting: {0}".format(pobj.name)) |
|
826 | pobj.start() |
|
827 | ||
@@ 881-913 (lines=33) @@ | ||
878 | self.log = log |
|
879 | # plugins can set their own defaults byt defining self.defaults |
|
880 | self.config.defaults(self.defaults) |
|
881 | # all plugins must be given a unique name |
|
882 | self.name = config.get('name', 'unknown') |
|
883 | # thread for reading or writing |
|
884 | self.thread = None |
|
885 | self.stop_evt = threading.Event() |
|
886 | ||
887 | def __str__(self): |
|
888 | """Return a human readable str. |
|
889 | ||
890 | :rtype: str |
|
891 | """ |
|
892 | sval = "Plugin(class={0})".format(self.__class__.__name__) |
|
893 | return sval |
|
894 | ||
895 | def __repr__(self): |
|
896 | """Return a human readable str. |
|
897 | ||
898 | :rtype: str |
|
899 | """ |
|
900 | sval = "Plugin(class={0})".format(self.__class__.__name__) |
|
901 | return sval |
|
902 | ||
903 | def start(self): |
|
904 | """Start run() in a new thread if not already running.""" |
|
905 | if self.thread is None or not self.thread.is_alive(): |
|
906 | self.thread = threading.Thread(target=self.run, group=None) |
|
907 | self.thread.start() |
|
908 | self.log.debug("Plugin {0} started".format(self.name)) |
|
909 | else: |
|
910 | self.log.debug("Plugin {0} already running".format(self.name)) |
|
911 | ||
912 | def run(self): |
|
913 | """Plugin sub classes must overide this.""" |
|
914 | raise NotImplementedError("Invalid Plugin") |
|
915 | ||
916 | def stop(self): |