Completed
Push — dev-4.1-unstable ( 136355...f650fd )
by Felipe A.
01:01
created

MimetypeActionPluginManager   A

Complexity

Total Complexity 14

Size/Duplication

Total Lines 51
Duplicated Lines 0 %

Importance

Changes 1
Bugs 1 Features 0
Metric Value
dl 0
loc 51
rs 10
c 1
b 1
f 0
wmc 14

5 Methods

Rating   Name   Duplication   Size   Complexity  
A ArgumentPluginManager.get_argument() 0 2 1
B ArgumentPluginManager.load_arguments() 0 22 6
A MimetypePluginManager.register_mimetype_function() 0 2 1
A ArgumentPluginManager.register_argument() 0 2 1
A ArgumentPluginManager.clear() 0 3 1
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import sys
5
import argparse
6
import warnings
7
8
from . import mimetype
9
from . import widget
10
11
12
class PluginNotFoundError(ImportError):
13
    pass
14
15
16
class PluginManagerBase(object):
17
18
    @property
19
    def namespaces(self):
20
        return self.app.config['plugin_namespaces'] if self.app else []
21
22
    def __init__(self, app=None):
23
        if app is None:
24
            self.clear()
25
        else:
26
            self.init_app(app)
27
28
    def init_app(self, app):
29
        self.app = app
30
        if not hasattr(app, 'extensions'):
31
            app.extensions = {}
32
        app.extensions['plugin_manager'] = self
33
        self.reload()
34
35
    def reload(self):
36
        self.clear()
37
        for plugin in self.app.config.get('plugin_modules', ()):
38
            self.load_plugin(plugin)
39
40
    def clear(self):
41
        pass
42
43
    def import_plugin(self, plugin):
44
        names = [
45
            '%s.%s' % (namespace, plugin) if namespace else plugin
46
            for namespace in self.namespaces
47
            ]
48
49
        for name in names:
50
            if name in sys.modules:
51
                return sys.modules[name]
52
53
        for name in names:
54
            try:
55
                __import__(name)
56
                return sys.modules[name]
57
            except (ImportError, KeyError):
58
                pass
59
60
        raise PluginNotFoundError(
61
            'No plugin module %r found, tried %r' % (plugin, names),
62
            plugin, names)
63
64
    def load_plugin(self, plugin):
65
        return self.import_plugin(plugin)
66
67
68
class RegistrablePluginManager(PluginManagerBase):
69
    def load_plugin(self, plugin):
70
        module = super(RegistrablePluginManager, self).load_plugin(plugin)
71
        if hasattr(module, 'register_plugin'):
72
            module.register_plugin(self)
73
        return module
74
75
76
class BlueprintPluginManager(RegistrablePluginManager):
77
    '''
78
    Note: blueprints are not removed on `clear` nor reloaded on `reload`
79
    as flask does not allow it.
80
    '''
81
    def __init__(self, app=None):
82
        self._blueprint_known = set()
83
        super(BlueprintPluginManager, self).__init__(app=app)
84
85
    def register_blueprint(self, blueprint):
86
        if blueprint not in self._blueprint_known:
87
            self.app.register_blueprint(blueprint)
88
            self._blueprint_known.add(blueprint)
89
90
91
class WidgetPluginManager(RegistrablePluginManager):
92
    widget_class = widget.HTMLElement
93
94
    def clear(self):
95
        self._filter_widgets = []
96
        super(WidgetPluginManager, self).clear()
97
98
    def get_widgets(self, file=None, place=None):
99
        return list(self.iter_widgets(file, place))
100
101
    def iter_widgets(self, file=None, place=None):
102
        for filter, endpoint, cwidget in self._filter_widgets:
103
            try:
104
                check = filter(file) if filter else True
105
            except BaseException as e:
106
                # Exception is handled  as this method execution is deffered,
107
                # making hard to debug for plugin developers.
108
                warnings.warn(
109
                    'Plugin action filtering failed with error: %s' % e,
110
                    RuntimeWarning
111
                    )
112
                continue
113
            if check and (place is None or place == cwidget.place):
114
                yield self.widget_class(endpoint, cwidget)
115
116
    def register_widget(self, widget, **kwargs):
117
        self.register_action(None, widget, **kwargs)
118
119
    def register_action(self, endpoint, widget, filter=None, **kwargs):
120
        self._filter_widgets.append((filter, endpoint, widget))
121
122
123
class MimetypePluginManager(RegistrablePluginManager):
124
    _default_mimetype_functions = (
125
        mimetype.by_python,
126
        mimetype.by_file,
127
        mimetype.by_default,
128
    )
129
130
    def clear(self):
131
        self._mimetype_functions = list(self._default_mimetype_functions)
132
        super(MimetypePluginManager, self).clear()
133
134
    def get_mimetype(self, path):
135
        for fnc in self._mimetype_functions:
136
            mime = fnc(path)
137
            if mime:
138
                return mime
139
        return mimetype.by_default(path)
140
141
    def register_mimetype_function(self, fnc):
142
        self._mimetype_functions.insert(0, fnc)
143
144
145
class ArgumentPluginManager(PluginManagerBase):
146
    _argparse_kwargs = {'add_help': False}
147
    _argparse_arguments = argparse.Namespace()
148
149
    def load_arguments(self, argv, base=None):
150
151
        plugin_parser = argparse.ArgumentParser(add_help=False)
152
        plugin_parser.add_argument(
153
            '--plugin',
154
            type=lambda x: x.split(',') if x else [],
155
            default=[]
156
            )
157
        parser = argparse.ArgumentParser(
158
            parents=(base or plugin_parser,),
159
            add_help=False
160
            )
161
        for plugin in plugin_parser.parse_known_args(argv)[0].plugin:
162
            module = self.import_plugin(plugin)
163
            if hasattr(module, 'register_arguments'):
164
                manager = ArgumentPluginManager()
165
                module.register_arguments(manager)
166
                group = parser.add_argument_group('%s arguments' % plugin)
167
                for argargs, argkwargs in manager._argparse_argkwargs:
168
                    group.add_argument(*argargs, **argkwargs)
169
        self._argparse_arguments = parser.parse_args(argv)
170
        return self._argparse_arguments
171
172
    def clear(self):
173
        self._argparse_argkwargs = []
174
        super(ArgumentPluginManager, self).clear()
175
176
    def register_argument(self, *args, **kwargs):
177
        self._argparse_argkwargs.append((args, kwargs))
178
179
    def get_argument(self, name, default=None):
180
        return getattr(self._argparse_arguments, name, default)
181
182
183
class PluginManager(BlueprintPluginManager, WidgetPluginManager,
184
                    MimetypePluginManager, ArgumentPluginManager):
185
    pass
186