Completed
Push — dev-4.1 ( dbd54d...c25dff )
by Felipe A.
01:42
created

PluginManagerBase.load_plugin()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 1 Features 0
Metric Value
cc 1
dl 0
loc 2
rs 10
c 1
b 1
f 0
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import sys
5
import collections
6
import argparse
7
import warnings
8
9
from . import mimetype
10
from . import widget
11
from .compat import isnonstriterable
12
13
14
class PluginNotFoundError(ImportError):
15
    pass
16
17
18
class PluginManagerBase(object):
19
20
    @property
21
    def namespaces(self):
22
        return self.app.config['plugin_namespaces'] if self.app else []
23
24
    def __init__(self, app=None):
25
        if app is None:
26
            self.clear()
27
        else:
28
            self.init_app(app)
29
30
    def init_app(self, app):
31
        self.app = app
32
        if not hasattr(app, 'extensions'):
33
            app.extensions = {}
34
        app.extensions['plugin_manager'] = self
35
        self.reload()
36
37
    def reload(self):
38
        self.clear()
39
        for plugin in self.app.config.get('plugin_modules', ()):
40
            self.load_plugin(plugin)
41
42
    def clear(self):
43
        pass
44
45
    def import_plugin(self, plugin):
46
        names = [
47
            '%s.%s' % (namespace, plugin) if namespace else plugin
48
            for namespace in self.namespaces
49
            ]
50
51
        for name in names:
52
            if name in sys.modules:
53
                return sys.modules[name]
54
55
        for name in names:
56
            try:
57
                __import__(name)
58
                return sys.modules[name]
59
            except (ImportError, KeyError):
60
                pass
61
62
        raise PluginNotFoundError(
63
            'No plugin module %r found, tried %r' % (plugin, names),
64
            plugin, names)
65
66
    def load_plugin(self, plugin):
67
        return self.import_plugin(plugin)
68
69
70
class RegistrablePluginManager(PluginManagerBase):
71
    def load_plugin(self, plugin):
72
        module = super(RegistrablePluginManager, self).load_plugin(plugin)
73
        if hasattr(module, 'register_plugin'):
74
            module.register_plugin(self)
75
        return module
76
77
78
class BlueprintPluginManager(RegistrablePluginManager):
79
    '''
80
    Note: blueprints are not removed on `clear`nor reloaded on `reload` 
81
    as flask does not allow it.
82
    '''
83
    def __init__(self, app=None):
84
        self._blueprint_known = set()
85
        super(BlueprintPluginManager, self).__init__(app=app)
86
87
    def register_blueprint(self, blueprint):
88
        if blueprint not in self._blueprint_known:
89
            self.app.register_blueprint(blueprint)
90
            self._blueprint_known.add(blueprint)
91
92
93
class ActionPluginManager(RegistrablePluginManager):
94
    action_class = collections.namedtuple(
95
        'CallbackAction', ('endpoint', 'widget'))
96
    button_class = widget.ButtonWidget
97
    style_class = widget.StyleWidget
98
    javascript_class = widget.JavascriptWidget
99
    link_class = widget.LinkWidget
100
101
    def clear(self):
102
        self._action_widgets = {}
103
        self._action_callback = []
104
        super(ActionPluginManager, self).clear()
105
106
    def get_actions(self, file):
107
        return list(self.iter_actions(file))
108
109
    def iter_actions(self, file):
110
        for callback, endpoint, cwidget in self._action_callback:
111
            try:
112
                check = callback(file)
113
            except BaseException as e:
114
                # Exception is handled  as this method execution is deffered,
115
                # making hard to debug for plugin developers.
116
                warnings.warn(
117
                    'Plugin action filtering failed with error: %s' % e,
118
                    RuntimeWarning
119
                    )
120
            else:
121
                if check:
122
                    yield self.action_class(endpoint, cwidget.for_file(file))
123
124
    def get_widgets(self, place):
125
        return self._action_widgets.get(place, [])
126
127
    def register_widget(self, widget):
128
        self._action_widgets.setdefault(widget.place, []).append(widget)
129
130
    def register_action(self, endpoint, widget, callback=None, **kwargs):
131
        if callable(callback):
132
            self._action_callback.append((callback, endpoint, widget))
133
134
135
class MimetypeActionPluginManager(ActionPluginManager):
136
    action_class = collections.namedtuple(
137
        'MimetypeAction', ('endpoint', 'widget'))
138
139
    _default_mimetype_functions = (
140
        mimetype.by_python,
141
        mimetype.by_file,
142
        mimetype.by_default,
143
    )
144
145
    def clear(self):
146
        self._mimetype_root = {}  # mimetype tree root node
147
        self._mimetype_functions = list(self._default_mimetype_functions)
148
        super(MimetypeActionPluginManager, self).clear()
149
150
    def get_mimetype(self, path):
151
        for fnc in self._mimetype_functions:
152
            mime = fnc(path)
153
            if mime:
154
                return mime
155
        return mimetype.by_default(path)
156
157
    def iter_actions(self, file):
158
        for action in super(MimetypeActionPluginManager, self)\
159
                        .iter_actions(file):
160
            yield action
161
162
        category, variant = file.mimetype.split('/')
163
        for tree_category in (category, '*'):
164
            for tree_variant in (variant, '*'):
165
                acts = self._mimetype_root\
166
                    .get(tree_category, {})\
167
                    .get(tree_variant, ())
168
                for endpoint, cwidget in acts:
169
                    yield self.action_class(endpoint, cwidget.for_file(file))
170
171
    def register_mimetype_function(self, fnc):
172
        self._mimetype_functions.insert(0, fnc)
173
174
    def register_action(self, endpoint, widget, mimetypes=(), **kwargs):
175
        if not mimetypes:
176
            super(MimetypeActionPluginManager, self)\
177
                    .register_action(endpoint, widget, **kwargs)
178
            return
179
        mimetypes = mimetypes if isnonstriterable(mimetypes) else (mimetypes,)
180
        action = (endpoint, widget)
181
        for mime in mimetypes:
182
            category, variant = mime.split('/')
183
            self._mimetype_root.setdefault(
184
                category, {}
185
                ).setdefault(variant, []).append(action)
186
187
188
class ArgumentPluginManager(PluginManagerBase):
189
    _argparse_kwargs = {'add_help': False}
190
    _argparse_arguments = argparse.Namespace()
191
192
    def load_arguments(self, argv, base):
193
        parser = argparse.ArgumentParser(
194
            parents=(base,),
195
            add_help=False
196
            )
197
        for plugin in base.parse_known_args(argv)[0].plugin:
198
            module = self.import_plugin(plugin)
199
            if hasattr(module, 'register_arguments'):
200
                module.register_arguments(self)
201
        for argargs, argkwargs in self._argparse_argkwargs:
202
            parser.add_argument(*argargs, **argkwargs)
203
        self._argparse_arguments = parser.parse_args(argv)
204
        return self._argparse_arguments
205
206
    def clear(self):
207
        self._argparse_argkwargs = []
208
        super(ArgumentPluginManager, self).clear()
209
210
    def register_argument(self, *args, **kwargs):
211
        self._argparse_argkwargs.append((args, kwargs))
212
213
    def get_argument(self, name, default=None):
214
        return getattr(self._argparse_arguments, name, default)
215
216
217
class PluginManager(BlueprintPluginManager, MimetypeActionPluginManager,
218
                    ArgumentPluginManager):
219
    pass
220