Completed
Push — master ( f161ca...d2dc5c )
by Kenny
01:12
created

PluginLoader.__str__()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 6
rs 9.4285
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
"""Plugin system oject that controls readers and writers.
3
4
.. moduleauthor:: Kenny Freeman <[email protected]>
5
6
"""
7
__author__ = 'Kenny Freeman'
8
__email__ = '[email protected]'
9
__license__ = "ISCL"
10
__docformat__ = 'reStructuredText'
11
12
import os.path
13
14
import plumd
15
import plumd.config
16
import plumd.plugins
17
import plumd.plugins.util
18
import plumd.plugins.read
19
import plumd.plugins.write
20
21
22
class PluginLoader(object):
23
    """Loads and controls the reader and writer objects.
24
25
    raises:
26
        ConfigError if a plugin is configured with an invalid path
27
        ConfigError if a plugin configuration is missing 'name'
28
        DuplicatePlugin if a plugin configuration has a duplicate 'name'
29
        PluginLoadError if there was an error loading the plugin
30
31
    :param config: a simple configuration helper object.
32
    :type config: plumd.config.conf
33
    :param log: a logger object
34
    :type log: logger
35
    :raises: ConfigError, DuplicatePlugin, PluginLoadError
36
    """
37
38
    def __init__(self, log, config):
39
        """Loads and controls the reader and writer objects.
40
41
        raises:
42
            ConfigError if a plugin is configured with an invalid path
43
            ConfigError if a plugin configuration is missing 'name'
44
            DuplicatePlugin if a plugin configuration has a duplicate 'name'
45
            PluginLoadError if there was an error loading the plugin
46
47
        :param config: a simple configuration helper object.
48
        :type config: plumd.config.conf
49
        :param log: a logger object from structlog.get_logger()
50
        :type log: logger
51
        :raises: ConfigError, DuplicatePlugin, PluginLoadError
52
        """
53
        self.log = log
54
        msg = "initializing from: {0} conf: \n{1}"
55
        self.log.debug(msg.format(config.path, config))
56
        self.config = config
57
        self.readers = {} # reader plugin io objects
58
        self.writers = {} # writer plugin io objects
59
        self.load()
60
61
62
    def __str__(self):
63
        """Return a human readable str.
64
        :rtype: str"""
65
        s = "readers={0}, writers={1}".format(list(self.readers.keys()),
66
                                              list(self.writers.keys()))
67
        return s
68
69
70
    def __repr__(self):
71
        """Return a human readable str.
72
        :rtype: str"""
73
        s = "readers={0}, writers={1}".format(list(self.readers.keys()),
74
                                              list(self.writers.keys()))
75
        return s
76
77
78
    @property
79
    def nplugins(self):
80
        """Returns the number of loaded plugins.
81
82
        :rtype: int
83
        """
84
        return len(list(self.readers.keys())) + len(list(self.writers.keys()))
85
86
87
    @property
88
    def nreaders(self):
89
        """Return the number of loaded reader plugins.
90
91
        :rtype: int
92
        """
93
        return len(list(self.readers.keys()))
94
95
96
    @property
97
    def nwriters(self):
98
        """Return the number of loaded writer plugins.
99
100
        :rtype: int
101
        """
102
        return len(list(self.writers.keys()))
103
104
105
    def load(self):
106
        """Loads the configured plugin objects into self.readers and
107
        self.writers. Sets the writers for each reader plugin.
108
109
        raises:
110
            ConfigError if ptype has an incorrect configuration path set
111
            ConfigError if a plugin configuration is missing 'name'
112
            DuplicatePlugin if a plugin configuration has a duplicate 'name'
113
            PluginLoadError if there was an error loading a plugin
114
115
        :raises: ConfigError, DuplicatePlugin
116
        """
117
        # load all plugins
118
        pdir = self.config.get('config.plugins')
119
        if not os.path.isdir(pdir):
120
            msg = "invalid plugin directory configured: {0}"
121
            raise plumd.ConfigError(msg.format(pdir))
122
        msg = "loading plugins from: {0}"
123
        self.log.info(msg.format(self.config.get('config.plugins')))
124
        pobjs = plumd.plugins.util.load_all_plugins(self.log, self.config)
125
        # load_all_plugins returns a tuple of reader, writer objects
126
        robjs, wobjs = pobjs
127
128
        # setup reader threads for each reader plugin
129
        for rname, robj in robjs.items():
130
            clsn = plumd.plugins.read.PluginReader
131
            self.readers[rname] = clsn(self.log, robj, rname)
132
133
        # setup writer threads for each writer plugin
134
        for wname, wobj in wobjs.items():
135
            qsize = self.config.get('max.queue')
136
            clsn = plumd.plugins.write.PluginWriter
137
            self.writers[wname] = clsn(self.log, wobj, wname, qsize)
138
139
        # now the readers need to know what writers to write to
140
        plumd.plugins.util.config_plugin_writers(self)
141
142
143
    def start(self):
144
        """Calls the start() function on each reader/writer."""
145
        self.log.debug("starting")
146
147
        # start the writers then the readers
148
        plugins = [('writers', self.writers), ('readers', self.readers) ]
149
        for ptype, pdict in plugins:
150
            self.log.debug("starting: {0}".format(ptype))
151
            for pobj in pdict.values():
152
                pobj.start()
153
154
        # all done
155
        self.log.debug("started")
156
157
158
    def stop(self):
159
        """Calls the stop() function on each reader/writer."""
160
        self.log.debug("stopping plugins")
161
162
        # stop the readers first - they call each plugins onstop()
163
        self.log.debug("stopping readers")
164
        for pobj in list(self.readers.values()):
165
            self.log.debug("stopping reader: {0}".format(pobj.name))
166
            pobj.stop()
167
            pobj.onstop()
168
169
        ## next stop the writers - they call each plugins onstop()
170
        self.log.debug("stopping writers")
171
        for pobj in list(self.writers.values()):
172
            self.log.debug("stopping writer: {0}".format(pobj.name))
173
            pobj.stop()
174
            pobj.onstop()
175
176
        # join threads now
177
        pobjs = [ ('readers', self.readers),
178
                  ('writers', self.writers) ]
179
        for ( ptype, pdict ) in pobjs:
180
            for pname, pobj in list(pdict.items()):
181
                self.log.debug("waiting for plugin to stop: {0}".format(pname))
182
                pobj.join(timeout=self.config.get('shutdown_timeout'))
183
184
        # all done
185
        self.log.debug("stopped")
186
187
188
    def __del__(self):
189
        """Log __del__ calls."""
190
        self.log.debug("del")
191