|
@@ 784-824 (lines=41) @@
|
| 781 |
|
@property |
| 782 |
|
def nwriters(self): |
| 783 |
|
"""Return the number of loaded writer plugins. |
| 784 |
|
|
| 785 |
|
:rtype: int |
| 786 |
|
""" |
| 787 |
|
return len(list(self.writers.keys())) |
| 788 |
|
|
| 789 |
|
def load(self): |
| 790 |
|
"""Load the configured plugin objects into self.readers and |
| 791 |
|
self.writers. Set the writers for each reader plugin. |
| 792 |
|
|
| 793 |
|
raises: |
| 794 |
|
ConfigError if ptype has an incorrect configuration path set |
| 795 |
|
ConfigError if a plugin configuration is missing 'name' |
| 796 |
|
DuplicatePlugin if a plugin configuration has a duplicate 'name' |
| 797 |
|
PluginLoadError if there was an error loading a plugin |
| 798 |
|
|
| 799 |
|
:raises: ConfigError, DuplicatePlugin |
| 800 |
|
""" |
| 801 |
|
# get plugin configuration directory |
| 802 |
|
pdir = self.config.get('config.plugins') |
| 803 |
|
if not os.path.isdir(pdir): |
| 804 |
|
msg = "invalid plugin directory configured: {0}" |
| 805 |
|
raise ConfigError(msg.format(pdir)) |
| 806 |
|
|
| 807 |
|
# load all plugins described by the plugin configurations |
| 808 |
|
msg = "loading plugins from: {0}" |
| 809 |
|
self.log.info(msg.format(self.config.get('config.plugins'))) |
| 810 |
|
pobjs = load_all_plugins(self.log, self.config) |
| 811 |
|
|
| 812 |
|
# load_all_plugins returns a tuple of reader, writer objects |
| 813 |
|
self.readers, self.writers = pobjs |
| 814 |
|
|
| 815 |
|
# now the readers need to know what writers to write to |
| 816 |
|
config_plugin_writers(self) |
| 817 |
|
|
| 818 |
|
def start(self): |
| 819 |
|
"""Start each reader/writer.""" |
| 820 |
|
self.log.debug("starting") |
| 821 |
|
|
| 822 |
|
# start the writers then the readers |
| 823 |
|
for pdict in [self.writers, self.readers]: |
| 824 |
|
for pobj in pdict.values(): |
| 825 |
|
self.log.debug("starting: {0}".format(pobj.name)) |
| 826 |
|
pobj.start() |
| 827 |
|
|
|
@@ 881-913 (lines=33) @@
|
| 878 |
|
self.log = log |
| 879 |
|
# plugins can set their own defaults byt defining self.defaults |
| 880 |
|
self.config.defaults(self.defaults) |
| 881 |
|
# all plugins must be given a unique name |
| 882 |
|
self.name = config.get('name', 'unknown') |
| 883 |
|
# thread for reading or writing |
| 884 |
|
self.thread = None |
| 885 |
|
self.stop_evt = threading.Event() |
| 886 |
|
|
| 887 |
|
def __str__(self): |
| 888 |
|
"""Return a human readable str. |
| 889 |
|
|
| 890 |
|
:rtype: str |
| 891 |
|
""" |
| 892 |
|
sval = "Plugin(class={0})".format(self.__class__.__name__) |
| 893 |
|
return sval |
| 894 |
|
|
| 895 |
|
def __repr__(self): |
| 896 |
|
"""Return a human readable str. |
| 897 |
|
|
| 898 |
|
:rtype: str |
| 899 |
|
""" |
| 900 |
|
sval = "Plugin(class={0})".format(self.__class__.__name__) |
| 901 |
|
return sval |
| 902 |
|
|
| 903 |
|
def start(self): |
| 904 |
|
"""Start run() in a new thread if not already running.""" |
| 905 |
|
if self.thread is None or not self.thread.is_alive(): |
| 906 |
|
self.thread = threading.Thread(target=self.run, group=None) |
| 907 |
|
self.thread.start() |
| 908 |
|
self.log.debug("Plugin {0} started".format(self.name)) |
| 909 |
|
else: |
| 910 |
|
self.log.debug("Plugin {0} already running".format(self.name)) |
| 911 |
|
|
| 912 |
|
def run(self): |
| 913 |
|
"""Plugin sub classes must overide this.""" |
| 914 |
|
raise NotImplementedError("Invalid Plugin") |
| 915 |
|
|
| 916 |
|
def stop(self): |