Completed
Push — dev-4.1 ( dab2a4...5e66f2 )
by Felipe A.
01:11
created

ArgumentPluginManager   A

Complexity

Total Complexity 9

Size/Duplication

Total Lines 36
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
dl 0
loc 36
rs 10
c 0
b 0
f 0
wmc 9

4 Methods

Rating   Name   Duplication   Size   Complexity  
A get_argument() 0 2 1
B load_arguments() 0 22 6
A register_argument() 0 2 1
A clear() 0 3 1
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import sys
5
import collections
6
import argparse
7
import warnings
8
9
from . import mimetype
10
from . import widget
11
from .compat import isnonstriterable
12
13
14
class PluginNotFoundError(ImportError):
15
    pass
16
17
18
class PluginManagerBase(object):
19
20
    @property
21
    def namespaces(self):
22
        return self.app.config['plugin_namespaces'] if self.app else []
23
24
    def __init__(self, app=None):
25
        if app is None:
26
            self.clear()
27
        else:
28
            self.init_app(app)
29
30
    def init_app(self, app):
31
        self.app = app
32
        if not hasattr(app, 'extensions'):
33
            app.extensions = {}
34
        app.extensions['plugin_manager'] = self
35
        self.reload()
36
37
    def reload(self):
38
        self.clear()
39
        for plugin in self.app.config.get('plugin_modules', ()):
40
            self.load_plugin(plugin)
41
42
    def clear(self):
43
        pass
44
45
    def import_plugin(self, plugin):
46
        names = [
47
            '%s.%s' % (namespace, plugin) if namespace else plugin
48
            for namespace in self.namespaces
49
            ]
50
51
        for name in names:
52
            if name in sys.modules:
53
                return sys.modules[name]
54
55
        for name in names:
56
            try:
57
                __import__(name)
58
                return sys.modules[name]
59
            except (ImportError, KeyError):
60
                pass
61
62
        raise PluginNotFoundError(
63
            'No plugin module %r found, tried %r' % (plugin, names),
64
            plugin, names)
65
66
    def load_plugin(self, plugin):
67
        return self.import_plugin(plugin)
68
69
70
class RegistrablePluginManager(PluginManagerBase):
71
    def load_plugin(self, plugin):
72
        module = super(RegistrablePluginManager, self).load_plugin(plugin)
73
        if hasattr(module, 'register_plugin'):
74
            module.register_plugin(self)
75
        return module
76
77
78
class BlueprintPluginManager(RegistrablePluginManager):
79
    '''
80
    Note: blueprints are not removed on `clear` nor reloaded on `reload`
81
    as flask does not allow it.
82
    '''
83
    def __init__(self, app=None):
84
        self._blueprint_known = set()
85
        super(BlueprintPluginManager, self).__init__(app=app)
86
87
    def register_blueprint(self, blueprint):
88
        if blueprint not in self._blueprint_known:
89
            self.app.register_blueprint(blueprint)
90
            self._blueprint_known.add(blueprint)
91
92
93
class ActionPluginManager(RegistrablePluginManager):
94
    action_class = collections.namedtuple(
95
        'CallbackAction', ('endpoint', 'widget'))
96
    button_class = widget.ButtonWidget
97
    head_button_class = widget.HeadButtonWidget
98
    style_class = widget.StyleWidget
99
    javascript_class = widget.JavascriptWidget
100
    link_class = widget.LinkWidget
101
102
    def clear(self):
103
        self._action_widgets = {}
104
        self._action_callback = []
105
        super(ActionPluginManager, self).clear()
106
107
    def get_actions(self, file):
108
        return list(self.iter_actions(file))
109
110
    def iter_actions(self, file):
111
        for callback, endpoint, cwidget in self._action_callback:
112
            try:
113
                check = callback(file)
114
            except BaseException as e:
115
                # Exception is handled  as this method execution is deffered,
116
                # making hard to debug for plugin developers.
117
                warnings.warn(
118
                    'Plugin action filtering failed with error: %s' % e,
119
                    RuntimeWarning
120
                    )
121
            else:
122
                if check:
123
                    yield self.action_class(endpoint, cwidget.for_file(file))
124
125
    def get_widgets(self, place):
126
        return self._action_widgets.get(place, [])
127
128
    def register_widget(self, widget):
129
        self._action_widgets.setdefault(widget.place, []).append(widget)
130
131
    def register_action(self, endpoint, widget, callback=None, **kwargs):
132
        if callable(callback):
133
            self._action_callback.append((callback, endpoint, widget))
134
135
136
class MimetypeActionPluginManager(ActionPluginManager):
137
    action_class = collections.namedtuple(
138
        'MimetypeAction', ('endpoint', 'widget'))
139
140
    _default_mimetype_functions = (
141
        mimetype.by_python,
142
        mimetype.by_file,
143
        mimetype.by_default,
144
    )
145
146
    def clear(self):
147
        self._mimetype_root = {}  # mimetype tree root node
148
        self._mimetype_functions = list(self._default_mimetype_functions)
149
        super(MimetypeActionPluginManager, self).clear()
150
151
    def get_mimetype(self, path):
152
        for fnc in self._mimetype_functions:
153
            mime = fnc(path)
154
            if mime:
155
                return mime
156
        return mimetype.by_default(path)
157
158
    def iter_actions(self, file):
159
        for action in super(MimetypeActionPluginManager, self)\
160
                        .iter_actions(file):
161
            yield action
162
163
        category, variant = file.mimetype.split('/')
164
        for tree_category in (category, '*'):
165
            for tree_variant in (variant, '*'):
166
                acts = self._mimetype_root\
167
                    .get(tree_category, {})\
168
                    .get(tree_variant, ())
169
                for endpoint, cwidget in acts:
170
                    yield self.action_class(endpoint, cwidget.for_file(file))
171
172
    def register_mimetype_function(self, fnc):
173
        self._mimetype_functions.insert(0, fnc)
174
175
    def register_action(self, endpoint, widget, mimetypes=(), **kwargs):
176
        if not mimetypes:
177
            super(MimetypeActionPluginManager, self)\
178
                    .register_action(endpoint, widget, **kwargs)
179
            return
180
        mimetypes = mimetypes if isnonstriterable(mimetypes) else (mimetypes,)
181
        action = (endpoint, widget)
182
        for mime in mimetypes:
183
            category, variant = mime.split('/')
184
            self._mimetype_root.setdefault(
185
                category, {}
186
                ).setdefault(variant, []).append(action)
187
188
189
class ArgumentPluginManager(PluginManagerBase):
190
    _argparse_kwargs = {'add_help': False}
191
    _argparse_arguments = argparse.Namespace()
192
193
    def load_arguments(self, argv, base=None):
194
195
        plugin_parser = argparse.ArgumentParser(add_help=False)
196
        plugin_parser.add_argument(
197
            '--plugin',
198
            type=lambda x: x.split(',') if x else [],
199
            default=[]
200
            )
201
        parser = argparse.ArgumentParser(
202
            parents=(base or plugin_parser,),
203
            add_help=False
204
            )
205
        for plugin in plugin_parser.parse_known_args(argv)[0].plugin:
206
            module = self.import_plugin(plugin)
207
            if hasattr(module, 'register_arguments'):
208
                manager = ArgumentPluginManager()
209
                module.register_arguments(manager)
210
                group = parser.add_argument_group('%s arguments' % plugin)
211
                for argargs, argkwargs in manager._argparse_argkwargs:
212
                    group.add_argument(*argargs, **argkwargs)
213
        self._argparse_arguments = parser.parse_args(argv)
214
        return self._argparse_arguments
215
216
    def clear(self):
217
        self._argparse_argkwargs = []
218
        super(ArgumentPluginManager, self).clear()
219
220
    def register_argument(self, *args, **kwargs):
221
        self._argparse_argkwargs.append((args, kwargs))
222
223
    def get_argument(self, name, default=None):
224
        return getattr(self._argparse_arguments, name, default)
225
226
227
class PluginManager(BlueprintPluginManager, MimetypeActionPluginManager,
228
                    ArgumentPluginManager):
229
    pass
230