Completed
Push — master ( da3b73...3279ec )
by Kenny
01:17
created

PluginLoader.stop()   B

Complexity

Conditions 5

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
c 0
b 0
f 0
dl 0
loc 26
rs 8.0894
1
# -*- coding: utf-8 -*-
2
"""Plugin system oject that controls readers and writers.
3
4
.. moduleauthor:: Kenny Freeman <[email protected]>
5
6
"""
7
__author__ = 'Kenny Freeman'
8
__email__ = '[email protected]'
9
__license__ = "ISCL"
10
__docformat__ = 'reStructuredText'
11
12
import plumd
13
import plumd.config
14
import plumd.plugins
15
import plumd.plugins.util
16
import plumd.plugins.read
17
import plumd.plugins.write
18
19
20
class PluginLoader(object):
21
    """Loads and controls the reader and writer objects.
22
23
    raises:
24
        ConfigError if a plugin is configured with an invalid path
25
        ConfigError if a plugin configuration is missing 'name'
26
        DuplicatePlugin if a plugin configuration has a duplicate 'name'
27
        PluginLoadError if there was an error loading the plugin
28
29
    :param config: a simple configuration helper object.
30
    :type config: plumd.config.conf
31
    :param log: a logger object
32
    :type log: logger
33
    :raises: ConfigError, DuplicatePlugin, PluginLoadError
34
    """
35
36
    def __init__(self, log, config):
37
        """Loads and controls the reader and writer objects.
38
39
        raises:
40
            ConfigError if a plugin is configured with an invalid path
41
            ConfigError if a plugin configuration is missing 'name'
42
            DuplicatePlugin if a plugin configuration has a duplicate 'name'
43
            PluginLoadError if there was an error loading the plugin
44
45
        :param config: a simple configuration helper object.
46
        :type config: plumd.config.conf
47
        :param log: a logger object from structlog.get_logger()
48
        :type log: logger
49
        :raises: ConfigError, DuplicatePlugin, PluginLoadError
50
        """
51
        self.log = log
52
        msg = "initializing from: {0} conf: \n{1}"
53
        self.log.debug(msg.format(config.path, config))
54
        self.config = config
55
        self.readers = {} # reader plugin io objects
56
        self.writers = {} # writer plugin io objects
57
        self.load()
58
59
60
    @property
61
    def nplugins(self):
62
        """Returns the number of loaded plugins.
63
64
        :rtype: int
65
        """
66
        return len(self.readers.keys()) + len(self.writers.keys())
67
68
69
    @property
70
    def nreaders(self):
71
        """Return the number of loaded reader plugins.
72
73
        :rtype: int
74
        """
75
        return len(self.readers.keys())
76
77
78
    @property
79
    def nwriters(self):
80
        """Return the number of loaded writer plugins.
81
82
        :rtype: int
83
        """
84
        return len(self.writers.keys())
85
86
87
    def load(self):
88
        """Loads the configured plugin objects into self.readers and
89
        self.writers. Sets the writers for each reader plugin.
90
91
        raises:
92
            ConfigError if ptype has an incorrect configuration path set
93
            ConfigError if a plugin configuration is missing 'name'
94
            DuplicatePlugin if a plugin configuration has a duplicate 'name'
95
            PluginLoadError if there was an error loading a plugin
96
97
        :raises: ConfigError, DuplicatePlugin
98
        """
99
        # load all plugins
100
        pobjs = plumd.plugins.util.load_all_plugins(self.log, self.config)
101
        # load_all_plugins returns a tuple of reader, writer objects
102
        robjs, wobjs = pobjs
103
104
        # setup reader threads for each reader plugin
105
        for rname, robj in robjs.items():
106
            clsn = plumd.plugins.read.PluginReader
107
            self.readers[rname] = clsn(self.log, robj, rname)
108
109
        # setup writer threads for each writer plugin
110
        for wname, wobj in wobjs.items():
111
            qsize = self.config.get('max.queue')
112
            clsn = plumd.plugins.write.PluginWriter
113
            self.writers[wname] = clsn(self.log, wobj, wname, qsize)
114
115
        # now the readers need to know what writers to write to
116
        plumd.plugins.util.config_plugin_writers(self)
117
118
119
    def update_readers_writers(self):
120
        """Update the list of writers for each reader loaded."""
121
        # now set the PluginReaders writers
122
        for prname, probj in self.readers.items():
123
            wcfg = probj.pobj.config.get('writers')
124
            # if nothing is configured the reader writes to all
125
            if wcfg is None:
126
                probj.writers = self.writers.values()
127
                continue
128
129
            # get list of writer names
130
            wnames = [ w for w in wcfg if w in self.writers.keys() ]
131
            if wnames:
132
                args = [prname, wnames]
133
                self.log.info("load: setting {0} writers: {1}".format(*args))
134
                probj.writers = [ self.writers[w] for w in wnames ]
135
                continue
136
137
            # no writers found or no writers configured
138
            msg = "load: disabling reader {0}: no writers"
139
            self.log.error(msg.format(probj.name))
140
            del(self.readers[probj.name])
141
            del(probj.pobj)
142
            del(probj)
143
144
145
    def start(self):
146
        """Calls the start() function on each reader/writer."""
147
        self.log.debug("starting")
148
149
        # start the writers then the readers
150
        plugins = [('writers', self.writers), ('readers', self.readers) ]
151
        for ptype, pdict in plugins:
152
            self.log.debug("starting: {0}".format(ptype))
153
            map(lambda pobj: pobj.start(), pdict.values())
154
155
        # all done
156
        self.log.debug("started")
157
158
159
    def stop(self):
160
        """Calls the stop() function on each reader/writer."""
161
        self.log.debug("stopping plugins")
162
163
        # stop the readers first - they call each plugins onstop()
164
        self.log.debug("stopping readers")
165
        for pobj in self.readers.values():
166
            self.log.debug("stopping reader: {0}".format(pobj.name))
167
            pobj.stop()
168
169
        ## next stop the writers - they call each plugins onstop()
170
        self.log.debug("stopping writers")
171
        for pobj in self.writers.values():
172
            self.log.debug("stopping writer: {0}".format(pobj.name))
173
            pobj.stop()
174
175
        # join threads now
176
        pobjs = [ ('readers', self.readers),
177
                  ('writers', self.writers) ]
178
        for ( ptype, pdict ) in pobjs:
179
            for pname, pobj in pdict.items():
180
                self.log.debug("waiting for plugin to stop: {0}".format(pname))
181
                pobj.join()
182
183
        # all done
184
        self.log.debug("stopped")
185
186
187
    def __del__(self):
188
        """Log __del__ calls."""
189
        self.log.debug("del")
190