Completed
Push — dev-4.1-unstable ( 3dbdf3...065df3 )
by Felipe A.
01:02
created

WidgetPluginManager._resolve_widget()   A

Complexity

Conditions 3

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import sys
5
import argparse
6
import warnings
7
import collections
8
9
from . import mimetype
10
11
12
def defaultsnamedtuple(name, fields, defaults=None):
13
    '''
14
    Generate namedtuple with default values.
15
16
    :param name: name
17
    :param fields: iterable with field names
18
    :param defaults: iterable or mapping with field defaults
19
    :returns: defaultdict with given fields and given defaults
20
    :rtype: collections.defaultdict
21
    '''
22
    nt = collections.namedtuple(name, fields)
23
    nt.__new__.__defaults__ = (None,) * len(nt._fields)
24
    if isinstance(defaults, collections.Mapping):
25
        nt.__new__.__defaults__ = tuple(nt(**defaults))
26
    elif defaults:
27
        nt.__new__.__defaults__ = tuple(nt(*defaults))
28
    return nt
29
30
31
class PluginNotFoundError(ImportError):
32
    pass
33
34
35
class WidgetException(Exception):
36
    pass
37
38
39
class WidgetParameterException(WidgetException):
40
    pass
41
42
43
class PluginManagerBase(object):
44
45
    @property
46
    def namespaces(self):
47
        return self.app.config['plugin_namespaces'] if self.app else []
48
49
    def __init__(self, app=None):
50
        if app is None:
51
            self.clear()
52
        else:
53
            self.init_app(app)
54
55
    def init_app(self, app):
56
        self.app = app
57
        if not hasattr(app, 'extensions'):
58
            app.extensions = {}
59
        app.extensions['plugin_manager'] = self
60
        self.reload()
61
62
    def reload(self):
63
        self.clear()
64
        for plugin in self.app.config.get('plugin_modules', ()):
65
            self.load_plugin(plugin)
66
67
    def clear(self):
68
        pass
69
70
    def import_plugin(self, plugin):
71
        names = [
72
            '%s.%s' % (namespace, plugin) if namespace else plugin
73
            for namespace in self.namespaces
74
            ]
75
76
        for name in names:
77
            if name in sys.modules:
78
                return sys.modules[name]
79
80
        for name in names:
81
            try:
82
                __import__(name)
83
                return sys.modules[name]
84
            except (ImportError, KeyError):
85
                pass
86
87
        raise PluginNotFoundError(
88
            'No plugin module %r found, tried %r' % (plugin, names),
89
            plugin, names)
90
91
    def load_plugin(self, plugin):
92
        return self.import_plugin(plugin)
93
94
95
class RegistrablePluginManager(PluginManagerBase):
96
    def load_plugin(self, plugin):
97
        module = super(RegistrablePluginManager, self).load_plugin(plugin)
98
        if hasattr(module, 'register_plugin'):
99
            module.register_plugin(self)
100
        return module
101
102
103
class BlueprintPluginManager(RegistrablePluginManager):
104
    '''
105
    Note: blueprints are not removed on `clear` nor reloaded on `reload`
106
    as flask does not allow it.
107
    '''
108
    def __init__(self, app=None):
109
        self._blueprint_known = set()
110
        super(BlueprintPluginManager, self).__init__(app=app)
111
112
    def register_blueprint(self, blueprint):
113
        if blueprint not in self._blueprint_known:
114
            self.app.register_blueprint(blueprint)
115
            self._blueprint_known.add(blueprint)
116
117
118
class WidgetPluginManager(RegistrablePluginManager):
119
    widget_types = {
120
        'base': defaultsnamedtuple(
121
            'Widget',
122
            ('place', 'type')),
123
        'link': defaultsnamedtuple(
124
            'Link',
125
            ('place', 'type', 'css', 'icon', 'text', 'endpoint', 'href'),
126
            {
127
                'text': lambda f: f.name,
128
                'icon': lambda f: f.category
129
            }),
130
        'button': defaultsnamedtuple(
131
            'Button',
132
            ('place', 'type', 'css', 'text', 'endpoint', 'href')),
133
        'upload': defaultsnamedtuple(
134
            'Upload',
135
            ('place', 'type', 'css', 'text', 'endpoint', 'action')),
136
        'stylesheet': defaultsnamedtuple(
137
            'Stylesheet',
138
            ('place', 'type', 'endpoint', 'filename', 'href')),
139
        'script': defaultsnamedtuple(
140
            'Script',
141
            ('place', 'type', 'endpoint', 'filename', 'src')),
142
        'html': defaultsnamedtuple(
143
            'Html',
144
            ('place', 'type', 'html')),
145
    }
146
147
    def clear(self):
148
        self._widgets = []
149
        super(WidgetPluginManager, self).clear()
150
151
    def get_widgets(self, file=None, place=None):
152
        return list(self.iter_widgets(file, place))
153
154
    def _resolve_widget(self, file, widget):
155
        return widget.__class__(*[
156
            value(file) if callable(value) else value
157
            for value in widget
158
            ])
159
160
    def iter_widgets(self, file=None, place=None):
161
        for filter, dynamic, cwidget in self._widgets:
162
            try:
163
                if file and filter and not filter(file):
164
                    continue
165
            except BaseException as e:
166
                # Exception is handled  as this method execution is deffered,
167
                # making hard to debug for plugin developers.
168
                warnings.warn(
169
                    'Plugin action filtering failed with error: %s' % e,
170
                    RuntimeWarning
171
                    )
172
                continue
173
            if place and place != cwidget.place:
174
                continue
175
            if file and dynamic:
176
                cwidget = self._resolve_widget(file, cwidget)
177
            yield cwidget
178
179
    def create_widget(self, place=None, type=None, file=None, **kwargs):
180
        widget_class = self.widget_types.get(type, self.widget_types['base'])
181
        kwargs.update(place=place, type=type)
182
        try:
183
            element = widget_class(**kwargs)
184
        except TypeError as e:
185
            message = e.args[0] if e.args else ''
186
            if (
187
              'unexpected keyword argument' in message or
188
              'required positional argument' in message
189
              ):
190
                raise WidgetParameterException(
191
                    'type %s; %s; available: %r'
192
                    % (type, message, widget_class._fields)
193
                    )
194
            raise e
195
        if file and any(map(callable, element)):
196
            return self._resolve_widget(file, element)
197
        return element
198
199
    def register_widget(self, place=None, type=None, widget=None, filter=None,
200
                        **kwargs):
201
        widget = widget or self.create_widget(place, type, **kwargs)
202
        dynamic = any(map(callable, widget))
203
        self._widgets.append((filter, dynamic, widget))
204
205
206
class MimetypePluginManager(RegistrablePluginManager):
207
    _default_mimetype_functions = (
208
        mimetype.by_python,
209
        mimetype.by_file,
210
        mimetype.by_default,
211
    )
212
213
    def clear(self):
214
        self._mimetype_functions = list(self._default_mimetype_functions)
215
        super(MimetypePluginManager, self).clear()
216
217
    def get_mimetype(self, path):
218
        for fnc in self._mimetype_functions:
219
            mime = fnc(path)
220
            if mime:
221
                return mime
222
        return mimetype.by_default(path)
223
224
    def register_mimetype_function(self, fnc):
225
        self._mimetype_functions.insert(0, fnc)
226
227
228
class ArgumentPluginManager(PluginManagerBase):
229
    _argparse_kwargs = {'add_help': False}
230
    _argparse_arguments = argparse.Namespace()
231
232
    def load_arguments(self, argv, base=None):
233
        plugin_parser = argparse.ArgumentParser(add_help=False)
234
        plugin_parser.add_argument(
235
            '--plugin',
236
            type=lambda x: x.split(',') if x else [],
237
            default=[]
238
            )
239
        parser = argparse.ArgumentParser(
240
            parents=(base or plugin_parser,),
241
            add_help=False
242
            )
243
        for plugin in plugin_parser.parse_known_args(argv)[0].plugin:
244
            module = self.import_plugin(plugin)
245
            if hasattr(module, 'register_arguments'):
246
                manager = ArgumentPluginManager()
247
                module.register_arguments(manager)
248
                group = parser.add_argument_group('%s arguments' % plugin)
249
                for argargs, argkwargs in manager._argparse_argkwargs:
250
                    group.add_argument(*argargs, **argkwargs)
251
        self._argparse_arguments = parser.parse_args(argv)
252
        return self._argparse_arguments
253
254
    def clear(self):
255
        self._argparse_argkwargs = []
256
        super(ArgumentPluginManager, self).clear()
257
258
    def register_argument(self, *args, **kwargs):
259
        self._argparse_argkwargs.append((args, kwargs))
260
261
    def get_argument(self, name, default=None):
262
        return getattr(self._argparse_arguments, name, default)
263
264
265
class PluginManager(BlueprintPluginManager, WidgetPluginManager,
266
                    MimetypePluginManager, ArgumentPluginManager):
267
    pass
268