Completed
Push — wip-ergoithz ( cc842c )
by Felipe A.
01:07
created

iter_cookie_browse_sorting()   A

Complexity

Conditions 3

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 12
rs 9.4285
cc 3
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import logging
5
import os
6
import os.path
7
8
from datetime import timedelta
9
10
from flask import Response, request, render_template, redirect, \
11
                  url_for, send_from_directory, stream_with_context, \
12
                  make_response
13
from werkzeug.exceptions import NotFound
14
15
from .http import DataCookie
16
from .appconfig import Flask
17
from .manager import PluginManager
18
from .file import Node, secure_filename
19
from .exceptions import OutsideRemovableBase, OutsideDirectoryBase, \
20
                        InvalidFilenameError, InvalidPathError, \
21
                        InvalidCookieSizeError
22
from . import compat
23
from . import __meta__ as meta
24
25
__app__ = meta.app  # noqa
26
__version__ = meta.version  # noqa
27
__license__ = meta.license  # noqa
28
__author__ = meta.author  # noqa
29
__basedir__ = os.path.abspath(os.path.dirname(compat.fsdecode(__file__)))
30
31
logger = logging.getLogger(__name__)
32
33
app = Flask(
34
    __name__,
35
    static_url_path='/static',
36
    static_folder=os.path.join(__basedir__, "static"),
37
    template_folder=os.path.join(__basedir__, "templates")
38
    )
39
app.config.update(
40
    directory_base=compat.getcwd(),
41
    directory_start=None,
42
    directory_remove=None,
43
    directory_upload=None,
44
    directory_tar_buffsize=262144,
45
    directory_downloadable=True,
46
    use_binary_multiples=True,
47
    plugin_modules=[],
48
    plugin_namespaces=(
49
        'browsepy.plugin',
50
        'browsepy_',
51
        '',
52
        ),
53
    exclude_fnc=None,
54
    )
55
app.jinja_env.add_extension('browsepy.transform.htmlcompress.HTMLCompress')
56
57
if 'BROWSEPY_SETTINGS' in os.environ:
58
    app.config.from_envvar('BROWSEPY_SETTINGS')
59
60
plugin_manager = PluginManager(app)
61
sorting_cookie = DataCookie('browse-sorting', max_age=timedelta(days=90))
62
63
64
def iter_cookie_browse_sorting(cookies):
65
    '''
66
    Get sorting-cookie from cookies dictionary.
67
68
    :yields: tuple of path and sorting property
69
    :ytype: 2-tuple of strings
70
    '''
71
    try:
72
        for path, prop in sorting_cookie.load_cookies(cookies, ()):
73
            yield path, prop
74
    except ValueError as e:
75
        logger.exception(e)
76
77
78
def get_cookie_browse_sorting(path, default):
79
    '''
80
    Get sorting-cookie data for path of current request.
81
82
    :returns: sorting property
83
    :rtype: string
84
    '''
85
    if request:
86
        for cpath, cprop in iter_cookie_browse_sorting(request.cookies):
87
            if path == cpath:
88
                return cprop
89
    return default
90
91
92
def browse_sortkey_reverse(prop):
93
    '''
94
    Get sorting function for directory listing based on given attribute
95
    name, with some caveats:
96
    * Directories will be first.
97
    * If *name* is given, link widget lowercase text will be used instead.
98
    * If *size* is given, bytesize will be used.
99
100
    :param prop: file attribute name
101
    :type prop: str
102
    :returns: tuple with sorting function and reverse bool
103
    :rtype: tuple of a dict and a bool
104
    '''
105
    if prop.startswith('-'):
106
        prop = prop[1:]
107
        reverse = True
108
    else:
109
        reverse = False
110
111
    if prop == 'text':
112
        return (
113
            lambda x: (
114
                x.is_directory == reverse,
115
                x.link.text.lower() if x.link and x.link.text else x.name
116
                ),
117
            reverse
118
            )
119
    if prop == 'size':
120
        return (
121
            lambda x: (
122
                x.is_directory == reverse,
123
                x.stats.st_size
124
                ),
125
            reverse
126
            )
127
    return (
128
        lambda x: (
129
            x.is_directory == reverse,
130
            getattr(x, prop, None)
131
            ),
132
        reverse
133
        )
134
135
136
def stream_template(template_name, **context):
137
    '''
138
    Some templates can be huge, this function returns an streaming response,
139
    sending the content in chunks and preventing from timeout.
140
141
    :param template_name: template
142
    :param **context: parameters for templates.
143
    :yields: HTML strings
144
    '''
145
    app.update_template_context(context)
146
    template = app.jinja_env.get_template(template_name)
147
    stream = template.generate(context)
148
    return Response(stream_with_context(stream))
149
150
151
@app.context_processor
152
def template_globals():
153
    return {
154
        'manager': app.extensions['plugin_manager'],
155
        'len': len,
156
        }
157
158
159
@app.route('/sort/<string:property>', defaults={"path": ""})
160
@app.route('/sort/<string:property>/<path:path>')
161
def sort(property, path):
162
    try:
163
        directory = Node.from_urlpath(path)
164
    except OutsideDirectoryBase:
165
        return NotFound()
166
167
    if not directory.is_directory or directory.is_excluded:
168
        return NotFound()
169
170
    data = [(path, property)]
171
    try:
172
        data.extend(
173
            (cpath, cprop)
174
            for cpath, cprop in iter_cookie_browse_sorting(request.cookies)
175
            if cpath != path
176
            )
177
    except BaseException:
178
        pass
179
180
    # handle cookie becoming too large
181
    headers = ()
182
    while data:
183
        try:
184
            headers = sorting_cookie.dump_headers(data, request.headers)
185
            break
186
        except InvalidCookieSizeError:
187 View Code Duplication
            data.pop(0)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
188
189
    response = redirect(url_for(".browse", path=directory.urlpath))
190
    response.headers.extend(headers)
191
    return response
192
193
194
@app.route("/browse", defaults={"path": ""})
195
@app.route('/browse/<path:path>')
196
def browse(path):
197
    sort_property = get_cookie_browse_sorting(path, 'text')
198
    sort_fnc, sort_reverse = browse_sortkey_reverse(sort_property)
199
200
    try:
201
        directory = Node.from_urlpath(path)
202
        if directory.is_directory and not directory.is_excluded:
203
            return stream_template(
204
                'browse.html',
205
                file=directory,
206
                sort_property=sort_property,
207
                sort_fnc=sort_fnc,
208
                sort_reverse=sort_reverse
209
                )
210
    except OutsideDirectoryBase:
211
        pass
212
    return NotFound()
213
214
215
@app.route('/open/<path:path>', endpoint="open")
216
def open_file(path):
217
    try:
218
        file = Node.from_urlpath(path)
219
        if file.is_file and not file.is_excluded:
220
            return send_from_directory(file.parent.path, file.name)
221
    except OutsideDirectoryBase:
222
        pass
223
    return NotFound()
224
225
226
@app.route("/download/file/<path:path>")
227
def download_file(path):
228
    try:
229
        file = Node.from_urlpath(path)
230
        if file.is_file and not file.is_excluded:
231
            return file.download()
232
    except OutsideDirectoryBase:
233
        pass
234
    return NotFound()
235
236
237
@app.route("/download/directory/<path:path>.tgz")
238
def download_directory(path):
239
    try:
240
        directory = Node.from_urlpath(path)
241
        if directory.is_directory and not directory.is_excluded:
242
            return directory.download()
243
    except OutsideDirectoryBase:
244
        pass
245
    return NotFound()
246
247
248
@app.route("/remove/<path:path>", methods=("GET", "POST"))
249
def remove(path):
250
    try:
251
        file = Node.from_urlpath(path)
252
    except OutsideDirectoryBase:
253
        return NotFound()
254
255
    if not file.can_remove or file.is_excluded:
256
        return NotFound()
257
258
    if request.method == 'GET':
259
        return render_template('remove.html', file=file)
260
261
    file.remove()
262
    return redirect(url_for(".browse", path=file.parent.urlpath))
263
264
265
@app.route("/upload", defaults={'path': ''}, methods=("POST",))
266
@app.route("/upload/<path:path>", methods=("POST",))
267
def upload(path):
268
    try:
269
        directory = Node.from_urlpath(path)
270
    except OutsideDirectoryBase:
271
        return NotFound()
272
273
    if (
274
      not directory.is_directory or
275
      not directory.can_upload or
276
      directory.is_excluded
277
      ):
278
        return NotFound()
279
280
    for v in request.files.listvalues():
281
        for f in v:
282
            filename = secure_filename(f.filename)
283
            if filename:
284
                filename = directory.choose_filename(filename)
285
                filepath = os.path.join(directory.path, filename)
286
                f.save(filepath)
287
            else:
288
                raise InvalidFilenameError(
289
                    path=directory.path,
290
                    filename=f.filename
291
                    )
292
    return redirect(url_for(".browse", path=directory.urlpath))
293
294
295
@app.route("/")
296
def index():
297
    path = app.config["directory_start"] or app.config["directory_base"]
298
    try:
299
        urlpath = Node(path).urlpath
300
    except OutsideDirectoryBase:
301
        return NotFound()
302
    return browse(urlpath)
303
304
305
@app.after_request
306
def page_not_found(response):
307
    if response.status_code == 404:
308
        return make_response((render_template('404.html'), 404))
309
    return response
310
311
312
@app.errorhandler(InvalidPathError)
313
def bad_request_error(e):
314
    file = None
315
    if hasattr(e, 'path'):
316
        if isinstance(e, InvalidFilenameError):
317
            file = Node(e.path)
318
        else:
319
            file = Node(e.path).parent
320
    return render_template('400.html', file=file, error=e), 400
321
322
323
@app.errorhandler(OutsideRemovableBase)
324
@app.errorhandler(404)
325
def page_not_found_error(e):
326
    return render_template('404.html'), 404
327
328
329
@app.errorhandler(500)
330
def internal_server_error(e):  # pragma: no cover
331
    logger.exception(e)
332
    return getattr(e, 'message', 'Internal server error'), 500
333