PluginAction   A
last analyzed

Complexity

Total Complexity 6

Size/Duplication

Total Lines 14
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 6
dl 0
loc 14
rs 10
c 0
b 0
f 0

1 Method

Rating   Name   Duplication   Size   Complexity  
B __call__() 0 13 6
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
import re
5
import sys
6
import os
7
import os.path
8
import argparse
9
import warnings
10
11
import flask
12
13
from . import app
14
from . import __meta__ as meta
15
from .compat import PY_LEGACY, getdebug, get_terminal_size
16
from .transform.glob import translate
17
18
19
class HelpFormatter(argparse.RawTextHelpFormatter):
20
    def __init__(self, prog, indent_increment=2, max_help_position=24,
21
                 width=None):
22
        if width is None:
23
            try:
24
                width = get_terminal_size().columns - 2
25
            except ValueError:  # https://bugs.python.org/issue24966
26
                pass
27
        super(HelpFormatter, self).__init__(
28
            prog, indent_increment, max_help_position, width)
29
30
31
class PluginAction(argparse.Action):
32
    def __call__(self, parser, namespace, value, option_string=None):
33
        warned = '%s_warning' % self.dest
34
        if ',' in value and not getattr(namespace, warned, False):
35
            setattr(namespace, warned, True)
36
            warnings.warn(
37
                'Comma-separated --plugin value is deprecated, '
38
                'use multiple --plugin options instead.'
39
                )
40
        values = value.split(',')
41
        prev = getattr(namespace, self.dest, None)
42
        if isinstance(prev, list):
43
            values = prev + [p for p in values if p not in prev]
44
        setattr(namespace, self.dest, values)
45
46
47
class ArgParse(argparse.ArgumentParser):
48
    default_directory = app.config['directory_base']
49
    default_initial = (
50
        None
51
        if app.config['directory_start'] == app.config['directory_base'] else
52
        app.config['directory_start']
53
        )
54
    default_removable = app.config['directory_remove']
55
    default_upload = app.config['directory_upload']
56
57
    default_host = os.getenv('BROWSEPY_HOST', '127.0.0.1')
58
    default_port = os.getenv('BROWSEPY_PORT', '8080')
59
    plugin_action_class = PluginAction
60
61
    defaults = {
62
        'prog': meta.app,
63
        'formatter_class': HelpFormatter,
64
        'description': 'description: starts a %s web file browser' % meta.app
65
        }
66
67
    def __init__(self, sep=os.sep):
68
        super(ArgParse, self).__init__(**self.defaults)
69
        self.add_argument(
70
            'host', nargs='?',
71
            default=self.default_host,
72
            help='address to listen (default: %(default)s)')
73
        self.add_argument(
74
            'port', nargs='?', type=int,
75
            default=self.default_port,
76
            help='port to listen (default: %(default)s)')
77
        self.add_argument(
78
            '--directory', metavar='PATH', type=self._directory,
79
            default=self.default_directory,
80
            help='serving directory (default: %(default)s)')
81
        self.add_argument(
82
            '--initial', metavar='PATH',
83
            type=lambda x: self._directory(x) if x else None,
84
            default=self.default_initial,
85
            help='default directory (default: same as --directory)')
86
        self.add_argument(
87
            '--removable', metavar='PATH', type=self._directory,
88
            default=self.default_removable,
89
            help='base directory allowing remove (default: %(default)s)')
90
        self.add_argument(
91
            '--upload', metavar='PATH', type=self._directory,
92
            default=self.default_upload,
93
            help='base directory allowing upload (default: %(default)s)')
94
        self.add_argument(
95
            '--exclude', metavar='PATTERN',
96
            action='append',
97
            default=[],
98
            help='exclude paths by pattern (multiple)')
99
        self.add_argument(
100
            '--exclude-from', metavar='PATH', type=self._file,
101
            action='append',
102
            default=[],
103
            help='exclude paths by pattern file (multiple)')
104
        self.add_argument(
105
            '--plugin', metavar='MODULE',
106
            action=self.plugin_action_class,
107
            default=[],
108
            help='load plugin module (multiple)')
109
        self.add_argument(
110
            '--debug', action='store_true',
111
            help=argparse.SUPPRESS)
112
113
    def _path(self, arg):
114
        if PY_LEGACY and hasattr(sys.stdin, 'encoding'):
115
            encoding = sys.stdin.encoding or sys.getdefaultencoding()
116
            arg = arg.decode(encoding)
117
        return os.path.abspath(arg)
118
119
    def _file(self, arg):
120
        path = self._path(arg)
121
        if os.path.isfile(path):
122
            return path
123
        self.error('%s is not a valid file' % arg)
124
125
    def _directory(self, arg):
126
        path = self._path(arg)
127
        if os.path.isdir(path):
128
            return path
129
        self.error('%s is not a valid directory' % arg)
130
131
132
def create_exclude_fnc(patterns, base, sep=os.sep):
133
    if patterns:
134
        regex = '|'.join(translate(pattern, sep, base) for pattern in patterns)
135
        return re.compile(regex).search
136
    return None
137
138
139
def collect_exclude_patterns(paths):
140
    patterns = []
141
    for path in paths:
142
        with open(path, 'r') as f:
143
            for line in f:
144
                line = line.split('#')[0].strip()
145
                if line:
146
                    patterns.append(line)
147
    return patterns
148
149
150
def list_union(*lists):
151
    lst = [i for l in lists for i in l]
152
    return sorted(frozenset(lst), key=lst.index)
153
154
155
def filter_union(*functions):
156
    filtered = [fnc for fnc in functions if fnc]
157
    if filtered:
158
        if len(filtered) == 1:
159
            return filtered[0]
160
        return lambda data: any(fnc(data) for fnc in filtered)
161
    return None
162
163
164
def main(argv=sys.argv[1:], app=app, parser=ArgParse, run_fnc=flask.Flask.run):
165
    plugin_manager = app.extensions['plugin_manager']
166
    args = plugin_manager.load_arguments(argv, parser())
167
    patterns = args.exclude + collect_exclude_patterns(args.exclude_from)
168
    if args.debug:
169
        os.environ['DEBUG'] = 'true'
170
    app.config.update(
171
        directory_base=args.directory,
172
        directory_start=args.initial or args.directory,
173
        directory_remove=args.removable,
174
        directory_upload=args.upload,
175
        plugin_modules=list_union(
176
            app.config['plugin_modules'],
177
            args.plugin,
178
            ),
179
        exclude_fnc=filter_union(
180
            app.config['exclude_fnc'],
181
            create_exclude_fnc(patterns, args.directory),
182
            ),
183
        )
184
    plugin_manager.reload()
185
    run_fnc(
186
        app,
187
        host=args.host,
188
        port=args.port,
189
        debug=getdebug(),
190
        use_reloader=False,
191
        threaded=True
192
        )
193
194
195
if __name__ == '__main__':  # pragma: no cover
196
    main()
197