Completed
Push — dev-4.1-unstable ( f650fd...fbeeec )
by Felipe A.
56s
created

WidgetPluginManager.create_widget()   C

Complexity

Conditions 7

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
dl 0
loc 25
rs 5.5
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import sys
5
import argparse
6
import warnings
7
import collections
8
9
from . import mimetype
10
11
12
class PluginNotFoundError(ImportError):
13
    pass
14
15
16
class WidgetException(Exception):
17
    pass
18
19
20
class WidgetParameterException(WidgetException):
21
    pass
22
23
24
class PluginManagerBase(object):
25
26
    @property
27
    def namespaces(self):
28
        return self.app.config['plugin_namespaces'] if self.app else []
29
30
    def __init__(self, app=None):
31
        if app is None:
32
            self.clear()
33
        else:
34
            self.init_app(app)
35
36
    def init_app(self, app):
37
        self.app = app
38
        if not hasattr(app, 'extensions'):
39
            app.extensions = {}
40
        app.extensions['plugin_manager'] = self
41
        self.reload()
42
43
    def reload(self):
44
        self.clear()
45
        for plugin in self.app.config.get('plugin_modules', ()):
46
            self.load_plugin(plugin)
47
48
    def clear(self):
49
        pass
50
51
    def import_plugin(self, plugin):
52
        names = [
53
            '%s.%s' % (namespace, plugin) if namespace else plugin
54
            for namespace in self.namespaces
55
            ]
56
57
        for name in names:
58
            if name in sys.modules:
59
                return sys.modules[name]
60
61
        for name in names:
62
            try:
63
                __import__(name)
64
                return sys.modules[name]
65
            except (ImportError, KeyError):
66
                pass
67
68
        raise PluginNotFoundError(
69
            'No plugin module %r found, tried %r' % (plugin, names),
70
            plugin, names)
71
72
    def load_plugin(self, plugin):
73
        return self.import_plugin(plugin)
74
75
76
class RegistrablePluginManager(PluginManagerBase):
77
    def load_plugin(self, plugin):
78
        module = super(RegistrablePluginManager, self).load_plugin(plugin)
79
        if hasattr(module, 'register_plugin'):
80
            module.register_plugin(self)
81
        return module
82
83
84
class BlueprintPluginManager(RegistrablePluginManager):
85
    '''
86
    Note: blueprints are not removed on `clear` nor reloaded on `reload`
87
    as flask does not allow it.
88
    '''
89
    def __init__(self, app=None):
90
        self._blueprint_known = set()
91
        super(BlueprintPluginManager, self).__init__(app=app)
92
93
    def register_blueprint(self, blueprint):
94
        if blueprint not in self._blueprint_known:
95
            self.app.register_blueprint(blueprint)
96
            self._blueprint_known.add(blueprint)
97
98
99
class WidgetPluginManager(RegistrablePluginManager):
100
    widget_types = {
101
        'base': collections.namedtuple(
102
            'Widget',
103
            ('place', 'type')),
104
        'link': collections.namedtuple(
105
            'Link',
106
            ('place', 'type', 'css', 'icon', 'text', 'endpoint', 'href')),
107
        'button': collections.namedtuple(
108
            'Button',
109
            ('place', 'type', 'css', 'text', 'endpoint', 'href')),
110
        'upload': collections.namedtuple(
111
            'Upload',
112
            ('place', 'type', 'css', 'text', 'endpoint', 'action')),
113
        'stylesheet': collections.namedtuple(
114
            'Stylesheet',
115
            ('place', 'type', 'endpoint', 'filename', 'href')),
116
        'script': collections.namedtuple(
117
            'Script',
118
            ('place', 'type', 'endpoint', 'filename', 'src')),
119
        'html': collections.namedtuple(
120
            'Html',
121
            ('place', 'type', 'html')),
122
    }
123
124
    def clear(self):
125
        self._widgets = []
126
        super(WidgetPluginManager, self).clear()
127
128
    def get_widgets(self, file=None, place=None):
129
        return list(self.iter_widgets(file, place))
130
131
    def iter_widgets(self, file=None, place=None):
132
        for filter, cwidget in self._widgets:
133
            try:
134
                if filter and not filter(file):
135
                    continue
136
            except BaseException as e:
137
                # Exception is handled  as this method execution is deffered,
138
                # making hard to debug for plugin developers.
139
                warnings.warn(
140
                    'Plugin action filtering failed with error: %s' % e,
141
                    RuntimeWarning
142
                    )
143
                continue
144
            if place and place != cwidget.place:
145
                continue
146
            yield cwidget
147
148
    def create_widget(self, place=None, type=None, **kwargs):
149
        widget_class = self.widget_types.get(type, self.widget_types['base'])
150
        kwargs.update(
151
            place=place,
152
            type=type
153
            )
154
        kwargs.update(
155
            (f, None)
156
            for f in widget_class._fields
157
            if f not in kwargs
158
            )
159
        try:
160
            element = widget_class(**kwargs)
161
        except TypeError as e:
162
            message = e.args[0] if e.args else ''
163
            if (
164
              'unexpected keyword argument' in message or
165
              'required positional argument' in message
166
              ):
167
                raise WidgetParameterException(
168
                    'type %s; %s; available: %r'
169
                    % (type, message, widget_class._fields)
170
                    )
171
            raise e
172
        return element
173
174
    def register_widget(self, place=None, type=None, widget=None, filter=None,
175
                        **kwargs):
176
        element = widget or self.create_widget(place, type, **kwargs)
177
        self._widgets.append((filter, element))
178
179
180
class MimetypePluginManager(RegistrablePluginManager):
181
    _default_mimetype_functions = (
182
        mimetype.by_python,
183
        mimetype.by_file,
184
        mimetype.by_default,
185
    )
186
187
    def clear(self):
188
        self._mimetype_functions = list(self._default_mimetype_functions)
189
        super(MimetypePluginManager, self).clear()
190
191
    def get_mimetype(self, path):
192
        for fnc in self._mimetype_functions:
193
            mime = fnc(path)
194
            if mime:
195
                return mime
196
        return mimetype.by_default(path)
197
198
    def register_mimetype_function(self, fnc):
199
        self._mimetype_functions.insert(0, fnc)
200
201
202
class ArgumentPluginManager(PluginManagerBase):
203
    _argparse_kwargs = {'add_help': False}
204
    _argparse_arguments = argparse.Namespace()
205
206
    def load_arguments(self, argv, base=None):
207
208
        plugin_parser = argparse.ArgumentParser(add_help=False)
209
        plugin_parser.add_argument(
210
            '--plugin',
211
            type=lambda x: x.split(',') if x else [],
212
            default=[]
213
            )
214
        parser = argparse.ArgumentParser(
215
            parents=(base or plugin_parser,),
216
            add_help=False
217
            )
218
        for plugin in plugin_parser.parse_known_args(argv)[0].plugin:
219
            module = self.import_plugin(plugin)
220
            if hasattr(module, 'register_arguments'):
221
                manager = ArgumentPluginManager()
222
                module.register_arguments(manager)
223
                group = parser.add_argument_group('%s arguments' % plugin)
224
                for argargs, argkwargs in manager._argparse_argkwargs:
225
                    group.add_argument(*argargs, **argkwargs)
226
        self._argparse_arguments = parser.parse_args(argv)
227
        return self._argparse_arguments
228
229
    def clear(self):
230
        self._argparse_argkwargs = []
231
        super(ArgumentPluginManager, self).clear()
232
233
    def register_argument(self, *args, **kwargs):
234
        self._argparse_argkwargs.append((args, kwargs))
235
236
    def get_argument(self, name, default=None):
237
        return getattr(self._argparse_arguments, name, default)
238
239
240
class PluginManager(BlueprintPluginManager, WidgetPluginManager,
241
                    MimetypePluginManager, ArgumentPluginManager):
242
    pass
243