Completed
Push — master ( d058c9...ca4b9c )
by Kenny
45s
created

plumd.plugins.PluginLoader.load()   B

Complexity

Conditions 3

Size

Total Lines 37

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 3
dl 0
loc 37
rs 8.8571
1
# -*- coding: utf-8 -*-
2
"""Plugin system oject that controls readers and writers.
3
4
.. moduleauthor:: Kenny Freeman <[email protected]>
5
6
"""
7
__author__ = 'Kenny Freeman'
8
__email__ = '[email protected]'
9
__license__ = "ISCL"
10
__docformat__ = 'reStructuredText'
11
12
import structlog    # pip install structlog
13
14
import plumd
15
import plumd.config
16
import plumd.plugins
17
import plumd.plugins.util
18
import plumd.plugins.read
19
import plumd.plugins.write
20
21
22
class PluginLoader(object):
23
    """Loads and controls the reader and writer objects.
24
25
    raises:
26
        ConfigError if a plugin is configured with an invalid path
27
        ConfigError if a plugin configuration is missing 'name'
28
        DuplicatePlugin if a plugin configuration has a duplicate 'name'
29
        PluginLoadError if there was an error loading the plugin
30
31
    :param config: a simple configuration helper object.
32
    :type config: plumd.config.conf
33
    :param log: a logger object from structlog.get_logger()
34
    :type log: logger
35
    :raises: ConfigError, DuplicatePlugin, PluginLoadError
36
    """
37
38
    def __init__(self, log, config):
39
        """Loads and controls the reader and writer objects.
40
41
        raises:
42
            ConfigError if a plugin is configured with an invalid path
43
            ConfigError if a plugin configuration is missing 'name'
44
            DuplicatePlugin if a plugin configuration has a duplicate 'name'
45
            PluginLoadError if there was an error loading the plugin
46
47
        :param config: a simple configuration helper object.
48
        :type config: plumd.config.conf
49
        :param log: a logger object from structlog.get_logger()
50
        :type log: logger
51
        :raises: ConfigError, DuplicatePlugin, PluginLoadError
52
        """
53
        self.log = log
54
        self.log.debug("initializing", file=config.path, config=config.conf)
55
        self.config = config
56
        self.readers = {} # reader plugin io objects
57
        self.writers = {} # writer plugin io objects
58
        self.load()
59
60
61
    @property
62
    def nplugins(self):
63
        """Returns the number of loaded plugins.
64
65
        :rtype: int
66
        """
67
        return len(self.readers.keys()) + len(self.writers.keys())
68
69
70
    @property
71
    def nreaders(self):
72
        """Return the number of loaded reader plugins.
73
74
        :rtype: int
75
        """
76
        return len(self.readers.keys())
77
78
79
    @property
80
    def nwriters(self):
81
        """Return the number of loaded writer plugins.
82
83
        :rtype: int
84
        """
85
        return len(self.writers.keys())
86
87
88
    def load(self):
89
        """Loads the configured plugin objects into self.readers and
90
        self.writers. Sets the writers for each reader plugin.
91
92
        raises:
93
            ConfigError if ptype has an incorrect configuration path set
94
            ConfigError if a plugin configuration is missing 'name'
95
            DuplicatePlugin if a plugin configuration has a duplicate 'name'
96
            PluginLoadError if there was an error loading a plugin
97
98
        :raises: ConfigError, DuplicatePlugin
99
        """
100
        # load all plugins
101
        pobjs = plumd.plugins.util.load_all_plugins(self.log, self.config)
102
        # load_all_plugins returns a tuple of reader, writer objects
103
        robjs, wobjs = pobjs
104
        self.log.debug('loaded', readers=robjs, writers=wobjs)
105
106
        # setup reader threads for each reader plugin
107
        for rname, robj in robjs.items():
108
            plog = structlog.get_logger().bind(src="read.{0}".format(rname))
109
            clsn = plumd.plugins.read.PluginReader
110
            self.readers[rname] = clsn(plog, robj, rname)
111
112
        # setup writer threads for each writer plugin
113
        for wname, wobj in wobjs.items():
114
            plog = structlog.get_logger().bind(src="write.{0}".format(wname))
115
            qsize = self.config.get('max.queue')
116
            clsn = plumd.plugins.write.PluginWriter
117
            self.writers[wname] = clsn(plog, wobj, wname, qsize)
118
119
        # now the readers need to know what writers to write to
120
        plumd.plugins.util.config_plugin_writers(self)
121
122
        # all done
123
        self.log.info("loaded", readers=len(self.readers.keys()),
124
                      writers=len(self.writers.keys()))
125
126
127
    def update_readers_writers(self):
128
        """Update the list of writers for each reader we have loaded."""
129
        # now set the PluginReaders writers
130
        for prname, probj in self.readers.items():
131
            wcfg = probj.pobj.config.get('writers')
132
            # if nothing is configured the reader writes to all
133
            if wcfg is None:
134
                probj.writers = self.writers.values()
135
                continue
136
137
            # get list of writer names
138
            wnames = [ w for w in wcfg if w in self.writers.keys() ]
139
            if len(wnames) > 0:
140
                msg = "setting writers"
141
                self.log.info("load", msg=msg, reader=prname,
142
                              writers=wobj_names)
143
                probj.writers = [ self.writers[w] for w in wnames ]
144
                continue
145
146
            # no writers found or no writers configured
147
            msg = "disabling reader plugin"
148
            self.log.error("load", msg=msg, reader=probj.name,
149
                           reason="no writers")
150
            del(self.readers[probj.name])
151
            del(probj.pobj)
152
            del(probj)
153
154
155
    def start(self):
156
        """Calls the start() function on each reader/writer."""
157
        self.log.info("starting")
158
159
        # start the writers then the readers
160
        plugins = [('writers', self.writers), ('readers', self.readers) ]
161
        for ptype, pdict in plugins:
162
            self.log.debug("starting", component=ptype)
163
            map(lambda pobj: pobj.start(), pdict.values())
164
165
        # all done
166
        self.log.info("started")
167
168
169
    def stop(self):
170
        """Calls the stop() function on each reader/writer."""
171
        self.log.info("stop", action="stopping")
172
173
        # stop the readers first - they call each plugins onstop()
174
        self.log.debug("stopping", component="readers")
175
        for pobj in self.readers.values():
176
            self.log.debug("stop", action="stopping", plugin=pobj.name)
177
            pobj.stop()
178
179
        ## next stop the writers - they call each plugins onstop()
180
        self.log.debug("stopping", component="writers")
181
        for pobj in self.writers.values():
182
            self.log.debug("stop", action="stopping", plugin=pobj.name)
183
            pobj.stop()
184
185
        # join threads now
186
        pobjs = [ ('readers', self.readers),
187
                  ('writers', self.writers) ]
188
        for ( ptype, pdict ) in pobjs:
189
            for pname, pobj in pdict.items():
190
                self.log.debug("stop", action="waiting", plugin=pname)
191
                pobj.join()
192
193
        # all done
194
        self.log.info("stopped")
195
196
197
    def __del__(self):
198
        """Log __del__ calls."""
199
        self.log.debug("del")
200