Completed
Push — dev-4.1 ( 57d16a...5679a9 )
by Felipe A.
02:24 queued 49s
created

MimetypePluginManager.clear()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import sys
5
import argparse
6
import warnings
7
import collections
8
9
from . import mimetype
10
11
12
def defaultsnamedtuple(name, fields, defaults=None):
13
    '''
14
    Generate namedtuple with default values.
15
16
    :param name: name
17
    :param fields: iterable with field names
18
    :param defaults: iterable or mapping with field defaults
19
    :returns: defaultdict with given fields and given defaults
20
    :rtype: collections.defaultdict
21
    '''
22
    nt = collections.namedtuple(name, fields)
23
    nt.__new__.__defaults__ = (None,) * len(nt._fields)
24
    if isinstance(defaults, collections.Mapping):
25
        nt.__new__.__defaults__ = tuple(nt(**defaults))
26
    elif defaults:
27
        nt.__new__.__defaults__ = tuple(nt(*defaults))
28
    return nt
29
30
31
class PluginNotFoundError(ImportError):
32
    pass
33
34
35
class WidgetException(Exception):
36
    pass
37
38
39
class WidgetParameterException(WidgetException):
40
    pass
41
42
43
class PluginManagerBase(object):
44
45
    @property
46
    def namespaces(self):
47
        '''
48
        List of plugin namespaces taken from app config.
49
        '''
50
        return self.app.config['plugin_namespaces'] if self.app else []
51
52
    def __init__(self, app=None):
53
        if app is None:
54
            self.clear()
55
        else:
56
            self.init_app(app)
57
58
    def init_app(self, app):
59
        self.app = app
60
        if not hasattr(app, 'extensions'):
61
            app.extensions = {}
62
        app.extensions['plugin_manager'] = self
63
        self.reload()
64
65
    def reload(self):
66
        self.clear()
67
        for plugin in self.app.config.get('plugin_modules', ()):
68
            self.load_plugin(plugin)
69
70
    def clear(self):
71
        pass
72
73
    def import_plugin(self, plugin):
74
        names = [
75
            '%s.%s' % (namespace, plugin) if namespace else plugin
76
            for namespace in self.namespaces
77
            ]
78
79
        for name in names:
80
            if name in sys.modules:
81
                return sys.modules[name]
82
83
        for name in names:
84
            try:
85
                __import__(name)
86
                return sys.modules[name]
87
            except (ImportError, KeyError):
88
                pass
89
90
        raise PluginNotFoundError(
91
            'No plugin module %r found, tried %r' % (plugin, names),
92
            plugin, names)
93
94
    def load_plugin(self, plugin):
95
        return self.import_plugin(plugin)
96
97
98
class RegistrablePluginManager(PluginManagerBase):
99
    def load_plugin(self, plugin):
100
        module = super(RegistrablePluginManager, self).load_plugin(plugin)
101
        if hasattr(module, 'register_plugin'):
102
            module.register_plugin(self)
103
        return module
104
105
106
class BlueprintPluginManager(RegistrablePluginManager):
107
    '''
108
    Note: blueprints are not removed on `clear` nor reloaded on `reload`
109
    as flask does not allow it.
110
    '''
111
    def __init__(self, app=None):
112
        self._blueprint_known = set()
113
        super(BlueprintPluginManager, self).__init__(app=app)
114
115
    def register_blueprint(self, blueprint):
116
        if blueprint not in self._blueprint_known:
117
            self.app.register_blueprint(blueprint)
118
            self._blueprint_known.add(blueprint)
119
120
121
class WidgetPluginManager(RegistrablePluginManager):
122
    widget_types = {
123
        'base': defaultsnamedtuple(
124
            'Widget',
125
            ('place', 'type')),
126
        'link': defaultsnamedtuple(
127
            'Link',
128
            ('place', 'type', 'css', 'icon', 'text', 'endpoint', 'href'),
129
            {
130
                'text': lambda f: f.name,
131
                'icon': lambda f: f.category
132
            }),
133
        'button': defaultsnamedtuple(
134
            'Button',
135
            ('place', 'type', 'css', 'text', 'endpoint', 'href')),
136
        'upload': defaultsnamedtuple(
137
            'Upload',
138
            ('place', 'type', 'css', 'text', 'endpoint', 'action')),
139
        'stylesheet': defaultsnamedtuple(
140
            'Stylesheet',
141
            ('place', 'type', 'endpoint', 'filename', 'href')),
142
        'script': defaultsnamedtuple(
143
            'Script',
144
            ('place', 'type', 'endpoint', 'filename', 'src')),
145
        'html': defaultsnamedtuple(
146
            'Html',
147
            ('place', 'type', 'html')),
148
    }
149
150
    def clear(self):
151
        self._widgets = []
152
        super(WidgetPluginManager, self).clear()
153
154
    def get_widgets(self, file=None, place=None):
155
        return list(self.iter_widgets(file, place))
156
157
    def _resolve_widget(self, file, widget):
158
        return widget.__class__(*[
159
            value(file) if callable(value) else value
160
            for value in widget
161
            ])
162
163
    def iter_widgets(self, file=None, place=None):
164
        for filter, dynamic, cwidget in self._widgets:
165
            try:
166
                if file and filter and not filter(file):
167
                    continue
168
            except BaseException as e:
169
                # Exception is handled  as this method execution is deffered,
170
                # making hard to debug for plugin developers.
171
                warnings.warn(
172
                    'Plugin action filtering failed with error: %s' % e,
173
                    RuntimeWarning
174
                    )
175
                continue
176
            if place and place != cwidget.place:
177
                continue
178
            if file and dynamic:
179
                cwidget = self._resolve_widget(file, cwidget)
180
            yield cwidget
181
182
    def create_widget(self, place, type, file=None, **kwargs):
183
        widget_class = self.widget_types.get(type, self.widget_types['base'])
184
        kwargs.update(place=place, type=type)
185
        try:
186
            element = widget_class(**kwargs)
187
        except TypeError as e:
188
            message = e.args[0] if e.args else ''
189
            if (
190
              'unexpected keyword argument' in message or
191
              'required positional argument' in message
192
              ):
193
                raise WidgetParameterException(
194
                    'type %s; %s; available: %r'
195
                    % (type, message, widget_class._fields)
196
                    )
197
            raise e
198
        if file and any(map(callable, element)):
199
            return self._resolve_widget(file, element)
200
        return element
201
202
    def register_widget(self, place=None, type=None, widget=None, filter=None,
203
                        **kwargs):
204
        if not widget and not (place and type):
205
            raise TypeError(
206
                'register_widget takes either place and type or widget'
207
                )
208
        widget = widget or self.create_widget(place, type, **kwargs)
209
        dynamic = any(map(callable, widget))
210
        self._widgets.append((filter, dynamic, widget))
211
212
213
class MimetypePluginManager(RegistrablePluginManager):
214
    _default_mimetype_functions = (
215
        mimetype.by_python,
216
        mimetype.by_file,
217
        mimetype.by_default,
218
    )
219
220
    def clear(self):
221
        self._mimetype_functions = list(self._default_mimetype_functions)
222
        super(MimetypePluginManager, self).clear()
223
224
    def get_mimetype(self, path):
225
        for fnc in self._mimetype_functions:
226
            mime = fnc(path)
227
            if mime:
228
                return mime
229
        return mimetype.by_default(path)
230
231
    def register_mimetype_function(self, fnc):
232
        self._mimetype_functions.insert(0, fnc)
233
234
235
class ArgumentPluginManager(PluginManagerBase):
236
    _argparse_kwargs = {'add_help': False}
237
    _argparse_arguments = argparse.Namespace()
238
239
    def load_arguments(self, argv, base=None):
240
        plugin_parser = argparse.ArgumentParser(add_help=False)
241
        plugin_parser.add_argument(
242
            '--plugin',
243
            type=lambda x: x.split(',') if x else [],
244
            default=[]
245
            )
246
        parser = argparse.ArgumentParser(
247
            parents=(base or plugin_parser,),
248
            add_help=False
249
            )
250
        for plugin in plugin_parser.parse_known_args(argv)[0].plugin:
251
            module = self.import_plugin(plugin)
252
            if hasattr(module, 'register_arguments'):
253
                manager = ArgumentPluginManager()
254
                module.register_arguments(manager)
255
                group = parser.add_argument_group('%s arguments' % plugin)
256
                for argargs, argkwargs in manager._argparse_argkwargs:
257
                    group.add_argument(*argargs, **argkwargs)
258
        self._argparse_arguments = parser.parse_args(argv)
259
        return self._argparse_arguments
260
261
    def clear(self):
262
        self._argparse_argkwargs = []
263
        super(ArgumentPluginManager, self).clear()
264
265
    def register_argument(self, *args, **kwargs):
266
        self._argparse_argkwargs.append((args, kwargs))
267
268
    def get_argument(self, name, default=None):
269
        return getattr(self._argparse_arguments, name, default)
270
271
272
class PluginManager(BlueprintPluginManager, WidgetPluginManager,
273
                    MimetypePluginManager, ArgumentPluginManager):
274
    pass
275