Completed
Push — dev-4.1-unstable ( fbeeec...3dbdf3 )
by Felipe A.
01:04
created

WidgetPluginManager.iter_widgets()   C

Complexity

Conditions 8

Size

Total Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
dl 0
loc 16
rs 6.6666
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import sys
5
import argparse
6
import warnings
7
import collections
8
9
from . import mimetype
10
11
12
def defaultsnamedtuple(name, fields, defaults=None):
13
    nt = collections.namedtuple(name, fields)
14
    nt.__new__.__defaults__ = (None,) * len(nt._fields)
15
    if isinstance(defaults, collections.Mapping):
16
        nt.__new__.__defaults__ = tuple(nt(**defaults))
17
    elif defaults:
18
        nt.__new__.__defaults__ = tuple(nt(*defaults))
19
    return nt
20
21
22
class PluginNotFoundError(ImportError):
23
    pass
24
25
26
class WidgetException(Exception):
27
    pass
28
29
30
class WidgetParameterException(WidgetException):
31
    pass
32
33
34
class PluginManagerBase(object):
35
36
    @property
37
    def namespaces(self):
38
        return self.app.config['plugin_namespaces'] if self.app else []
39
40
    def __init__(self, app=None):
41
        if app is None:
42
            self.clear()
43
        else:
44
            self.init_app(app)
45
46
    def init_app(self, app):
47
        self.app = app
48
        if not hasattr(app, 'extensions'):
49
            app.extensions = {}
50
        app.extensions['plugin_manager'] = self
51
        self.reload()
52
53
    def reload(self):
54
        self.clear()
55
        for plugin in self.app.config.get('plugin_modules', ()):
56
            self.load_plugin(plugin)
57
58
    def clear(self):
59
        pass
60
61
    def import_plugin(self, plugin):
62
        names = [
63
            '%s.%s' % (namespace, plugin) if namespace else plugin
64
            for namespace in self.namespaces
65
            ]
66
67
        for name in names:
68
            if name in sys.modules:
69
                return sys.modules[name]
70
71
        for name in names:
72
            try:
73
                __import__(name)
74
                return sys.modules[name]
75
            except (ImportError, KeyError):
76
                pass
77
78
        raise PluginNotFoundError(
79
            'No plugin module %r found, tried %r' % (plugin, names),
80
            plugin, names)
81
82
    def load_plugin(self, plugin):
83
        return self.import_plugin(plugin)
84
85
86
class RegistrablePluginManager(PluginManagerBase):
87
    def load_plugin(self, plugin):
88
        module = super(RegistrablePluginManager, self).load_plugin(plugin)
89
        if hasattr(module, 'register_plugin'):
90
            module.register_plugin(self)
91
        return module
92
93
94
class BlueprintPluginManager(RegistrablePluginManager):
95
    '''
96
    Note: blueprints are not removed on `clear` nor reloaded on `reload`
97
    as flask does not allow it.
98
    '''
99
    def __init__(self, app=None):
100
        self._blueprint_known = set()
101
        super(BlueprintPluginManager, self).__init__(app=app)
102
103
    def register_blueprint(self, blueprint):
104
        if blueprint not in self._blueprint_known:
105
            self.app.register_blueprint(blueprint)
106
            self._blueprint_known.add(blueprint)
107
108
109
class WidgetPluginManager(RegistrablePluginManager):
110
    widget_types = {
111
        'base': defaultsnamedtuple(
112
            'Widget',
113
            ('place', 'type')),
114
        'link': defaultsnamedtuple(
115
            'Link',
116
            ('place', 'type', 'css', 'icon', 'text', 'endpoint', 'href'),
117
            {
118
                'text': lambda f: f.name,
119
                'icon': lambda f: 'dir-icon' if f.is_directory else 'file-icon'
120
                }),
121
        'button': defaultsnamedtuple(
122
            'Button',
123
            ('place', 'type', 'css', 'text', 'endpoint', 'href')),
124
        'upload': defaultsnamedtuple(
125
            'Upload',
126
            ('place', 'type', 'css', 'text', 'endpoint', 'action')),
127
        'stylesheet': defaultsnamedtuple(
128
            'Stylesheet',
129
            ('place', 'type', 'endpoint', 'filename', 'href')),
130
        'script': defaultsnamedtuple(
131
            'Script',
132
            ('place', 'type', 'endpoint', 'filename', 'src')),
133
        'html': defaultsnamedtuple(
134
            'Html',
135
            ('place', 'type', 'html')),
136
    }
137
138
    def clear(self):
139
        self._widgets = []
140
        super(WidgetPluginManager, self).clear()
141
142
    def get_widgets(self, file=None, place=None):
143
        return list(self.iter_widgets(file, place))
144
145
    def iter_widgets(self, file=None, place=None):
146
        for filter, cwidget in self._widgets:
147
            try:
148
                if file and filter and not filter(file):
149
                    continue
150
            except BaseException as e:
151
                # Exception is handled  as this method execution is deffered,
152
                # making hard to debug for plugin developers.
153
                warnings.warn(
154
                    'Plugin action filtering failed with error: %s' % e,
155
                    RuntimeWarning
156
                    )
157
                continue
158
            if place and place != cwidget.place:
159
                continue
160
            yield self._resolve_widget(file, cwidget)
161
162
    def _resolve_widget(self, file, widget):
163
        if file and any(map(callable, widget)):
164
            return widget.__class__(*[
165
                value(file) if callable(value) else value
166
                for value in widget
167
                ])
168
        return widget
169
170
    def create_widget(self, place=None, type=None, file=None, **kwargs):
171
        widget_class = self.widget_types.get(type, self.widget_types['base'])
172
        kwargs.update(place=place, type=type)
173
        try:
174
            element = widget_class(**kwargs)
175
        except TypeError as e:
176
            message = e.args[0] if e.args else ''
177
            if (
178
              'unexpected keyword argument' in message or
179
              'required positional argument' in message
180
              ):
181
                raise WidgetParameterException(
182
                    'type %s; %s; available: %r'
183
                    % (type, message, widget_class._fields)
184
                    )
185
            raise e
186
        return self._resolve_widget(file, element)
187
188
    def register_widget(self, place=None, type=None, widget=None, filter=None,
189
                        **kwargs):
190
        element = widget or self.create_widget(place, type, **kwargs)
191
        self._widgets.append((filter, element))
192
193
194
class MimetypePluginManager(RegistrablePluginManager):
195
    _default_mimetype_functions = (
196
        mimetype.by_python,
197
        mimetype.by_file,
198
        mimetype.by_default,
199
    )
200
201
    def clear(self):
202
        self._mimetype_functions = list(self._default_mimetype_functions)
203
        super(MimetypePluginManager, self).clear()
204
205
    def get_mimetype(self, path):
206
        for fnc in self._mimetype_functions:
207
            mime = fnc(path)
208
            if mime:
209
                return mime
210
        return mimetype.by_default(path)
211
212
    def register_mimetype_function(self, fnc):
213
        self._mimetype_functions.insert(0, fnc)
214
215
216
class ArgumentPluginManager(PluginManagerBase):
217
    _argparse_kwargs = {'add_help': False}
218
    _argparse_arguments = argparse.Namespace()
219
220
    def load_arguments(self, argv, base=None):
221
222
        plugin_parser = argparse.ArgumentParser(add_help=False)
223
        plugin_parser.add_argument(
224
            '--plugin',
225
            type=lambda x: x.split(',') if x else [],
226
            default=[]
227
            )
228
        parser = argparse.ArgumentParser(
229
            parents=(base or plugin_parser,),
230
            add_help=False
231
            )
232
        for plugin in plugin_parser.parse_known_args(argv)[0].plugin:
233
            module = self.import_plugin(plugin)
234
            if hasattr(module, 'register_arguments'):
235
                manager = ArgumentPluginManager()
236
                module.register_arguments(manager)
237
                group = parser.add_argument_group('%s arguments' % plugin)
238
                for argargs, argkwargs in manager._argparse_argkwargs:
239
                    group.add_argument(*argargs, **argkwargs)
240
        self._argparse_arguments = parser.parse_args(argv)
241
        return self._argparse_arguments
242
243
    def clear(self):
244
        self._argparse_argkwargs = []
245
        super(ArgumentPluginManager, self).clear()
246
247
    def register_argument(self, *args, **kwargs):
248
        self._argparse_argkwargs.append((args, kwargs))
249
250
    def get_argument(self, name, default=None):
251
        return getattr(self._argparse_arguments, name, default)
252
253
254
class PluginManager(BlueprintPluginManager, WidgetPluginManager,
255
                    MimetypePluginManager, ArgumentPluginManager):
256
    pass
257