1
|
|
|
#!/usr/bin/env python |
2
|
|
|
# -*- coding: UTF-8 -*- |
3
|
|
|
|
4
|
|
|
import sys |
5
|
|
|
import unittest |
6
|
|
|
import re |
7
|
|
|
import os |
8
|
|
|
import os.path |
9
|
|
|
import shutil |
10
|
|
|
import tempfile |
11
|
|
|
import tarfile |
12
|
|
|
import xml.etree.ElementTree as ET |
13
|
|
|
import io |
14
|
|
|
import stat |
15
|
|
|
|
16
|
|
|
import flask |
17
|
|
|
import browsepy |
18
|
|
|
import browsepy.file |
19
|
|
|
import browsepy.managers |
20
|
|
|
import browsepy.__main__ |
21
|
|
|
import browsepy.compat |
22
|
|
|
|
23
|
|
|
PY_LEGACY = browsepy.compat.PY_LEGACY |
24
|
|
|
range = browsepy.compat.range |
25
|
|
|
|
26
|
|
|
class Page(object): |
27
|
|
|
@classmethod |
28
|
|
|
def itertext(cls, element): |
29
|
|
|
# TODO(ergoithz) on python2 drop: replace by element.itertext() |
30
|
|
|
yield element.text or '' |
31
|
|
|
for child in element: |
32
|
|
|
for text in cls.itertext(child): |
33
|
|
|
yield text |
34
|
|
|
yield child.tail or '' |
35
|
|
|
|
36
|
|
|
@classmethod |
37
|
|
|
def innerText(cls, element): |
38
|
|
|
return ''.join(cls.itertext(element)) |
39
|
|
|
|
40
|
|
|
|
41
|
|
|
class ListPage(Page): |
42
|
|
|
path_strip_re = re.compile('\s+/\s+') |
43
|
|
|
def __init__(self, path, directories, files, removable, upload): |
44
|
|
|
self.path = path |
45
|
|
|
self.directories = directories |
46
|
|
|
self.files = files |
47
|
|
|
self.removable = removable |
48
|
|
|
self.upload = upload |
49
|
|
|
|
50
|
|
|
|
51
|
|
|
@classmethod |
52
|
|
|
def from_source(cls, source): |
53
|
|
|
html = ET.fromstring(source) |
54
|
|
|
rows = [ |
55
|
|
|
( |
56
|
|
|
row[0].attrib.get('class') == 'dir-icon', |
57
|
|
|
row[1].find('.//a').attrib['href'], |
58
|
|
|
any(button.attrib.get('class') == 'remove button' for button in row[2].findall('.//a')) |
59
|
|
|
) |
60
|
|
|
for row in html.findall('.//table/tbody/tr') |
61
|
|
|
] |
62
|
|
|
return cls( |
63
|
|
|
cls.path_strip_re.sub('/', cls.innerText(html.find('.//h1'))).strip(), |
64
|
|
|
[url for isdir, url, removable in rows if isdir], |
65
|
|
|
[url for isdir, url, removable in rows if not isdir], |
66
|
|
|
all(removable for isdir, url, removable in rows) if rows else False, |
67
|
|
|
html.find('.//form//input[@type=\'file\']') is not None |
68
|
|
|
) |
69
|
|
|
|
70
|
|
|
|
71
|
|
|
class ConfirmPage(Page): |
72
|
|
|
def __init__(self, path, name, back): |
73
|
|
|
self.path = path |
74
|
|
|
self.name = name |
75
|
|
|
self.back = back |
76
|
|
|
|
77
|
|
|
@classmethod |
78
|
|
|
def from_source(cls, source): |
79
|
|
|
html = ET.fromstring(source) |
80
|
|
|
name = cls.innerText(html.find('.//form//strong')).strip() |
81
|
|
|
prefix = html.find('.//form//strong').attrib.get('data-prefix', '') |
82
|
|
|
|
83
|
|
|
return cls( |
84
|
|
|
prefix+name, |
85
|
|
|
name, |
86
|
|
|
html.find('.//form//a').attrib['href'] |
87
|
|
|
) |
88
|
|
|
|
89
|
|
|
|
90
|
|
|
class PageException(Exception): |
91
|
|
|
def __init__(self, status): |
92
|
|
|
self.status = status |
93
|
|
|
|
94
|
|
|
|
95
|
|
|
class Page404Exception(PageException): |
96
|
|
|
pass |
97
|
|
|
|
98
|
|
|
|
99
|
|
|
class TestApp(unittest.TestCase): |
100
|
|
|
module = browsepy |
101
|
|
|
list_page_class = ListPage |
102
|
|
|
confirm_page_class = ConfirmPage |
103
|
|
|
page_exceptions = { |
104
|
|
|
404: Page404Exception, |
105
|
|
|
None: PageException |
106
|
|
|
} |
107
|
|
|
|
108
|
|
|
def setUp(self): |
109
|
|
|
self.app = self.module.app |
110
|
|
|
self.base = tempfile.mkdtemp() |
111
|
|
|
self.start = os.path.join(self.base, 'start') |
112
|
|
|
self.remove = os.path.join(self.base, 'remove') |
113
|
|
|
self.upload = os.path.join(self.base, 'upload') |
114
|
|
|
|
115
|
|
|
os.mkdir(self.start) |
116
|
|
|
os.mkdir(self.remove) |
117
|
|
|
os.mkdir(self.upload) |
118
|
|
|
|
119
|
|
|
open(os.path.join(self.start, 'testfile.txt'), 'w').close() |
120
|
|
|
open(os.path.join(self.remove, 'testfile.txt'), 'w').close() |
121
|
|
|
|
122
|
|
|
self.app.config.update( |
123
|
|
|
directory_base = self.base, |
124
|
|
|
directory_start = self.start, |
125
|
|
|
directory_remove = self.remove, |
126
|
|
|
directory_upload = self.upload, |
127
|
|
|
SERVER_NAME = 'test', |
128
|
|
|
) |
129
|
|
|
|
130
|
|
|
self.base_directories = [ |
131
|
|
|
self.url_for('browse', path='remove'), |
132
|
|
|
self.url_for('browse', path='start'), |
133
|
|
|
self.url_for('browse', path='upload'), |
134
|
|
|
] |
135
|
|
|
self.start_files = [self.url_for('open', path='start/testfile.txt')] |
136
|
|
|
self.remove_files = [self.url_for('open', path='remove/testfile.txt')] |
137
|
|
|
self.upload_files = [] |
138
|
|
|
|
139
|
|
|
def clear(self, path): |
140
|
|
|
assert path.startswith(self.base + os.sep), 'Cannot clear directories out of base' |
141
|
|
|
|
142
|
|
|
for sub in os.listdir(path): |
143
|
|
|
sub = os.path.join(path, sub) |
144
|
|
|
if os.path.isdir(sub): |
145
|
|
|
shutil.rmtree(sub) |
146
|
|
|
else: |
147
|
|
|
os.remove(sub) |
148
|
|
|
|
149
|
|
|
def tearDown(self): |
150
|
|
|
shutil.rmtree(self.base) |
151
|
|
|
|
152
|
|
|
def get(self, endpoint, **kwargs): |
153
|
|
|
if endpoint in ('index', 'browse'): |
154
|
|
|
page_class = self.list_page_class |
155
|
|
|
elif endpoint == 'remove': |
156
|
|
|
page_class = self.confirm_page_class |
157
|
|
|
else: |
158
|
|
|
page_class = None |
159
|
|
|
|
160
|
|
|
with self.app.test_client() as client: |
161
|
|
|
response = client.get(self.url_for(endpoint, **kwargs)) |
162
|
|
|
if response.status_code != 200: |
163
|
|
|
raise self.page_exceptions.get(response.status_code, self.page_exceptions[None])(response.status_code) |
164
|
|
|
result = response.data if page_class is None else page_class.from_source(response.data) |
165
|
|
|
response.close() |
166
|
|
|
return result |
167
|
|
|
|
168
|
|
|
def post(self, endpoint, **kwargs): |
169
|
|
|
data = kwargs.pop('data') if 'data' in kwargs else {} |
170
|
|
|
with self.app.test_client() as client: |
171
|
|
|
response = client.post(self.url_for(endpoint, **kwargs), data=data, follow_redirects=True) |
172
|
|
|
if response.status_code != 200: |
173
|
|
|
raise self.page_exceptions.get(response.status_code, self.page_exceptions[None])(response.status_code) |
174
|
|
|
return self.list_page_class.from_source(response.data) |
175
|
|
|
|
176
|
|
|
def url_for(self, endpoint, **kwargs): |
177
|
|
|
with self.app.app_context(): |
178
|
|
|
return flask.url_for(endpoint, _external=False, **kwargs) |
179
|
|
|
|
180
|
|
|
def test_index(self): |
181
|
|
|
page = self.get('index') |
182
|
|
|
self.assertEqual(page.path, '%s/start' % os.path.basename(self.base)) |
183
|
|
|
|
184
|
|
|
self.app.config['directory_start'] = os.path.join(self.base, '..') |
185
|
|
|
|
186
|
|
|
self.assertRaises( |
187
|
|
|
Page404Exception, |
188
|
|
|
self.get, 'index' |
189
|
|
|
) |
190
|
|
|
|
191
|
|
|
self.app.config['directory_start'] = self.start |
192
|
|
|
|
193
|
|
|
def test_browse(self): |
194
|
|
|
basename = os.path.basename(self.base) |
195
|
|
|
page = self.get('browse') |
196
|
|
|
self.assertEqual(page.path, basename) |
197
|
|
|
self.assertEqual(page.directories, self.base_directories) |
198
|
|
|
self.assertFalse(page.removable) |
199
|
|
|
self.assertFalse(page.upload) |
200
|
|
|
|
201
|
|
|
page = self.get('browse', path='start') |
202
|
|
|
self.assertEqual(page.path, '%s/start' % basename) |
203
|
|
|
self.assertEqual(page.files, self.start_files) |
204
|
|
|
self.assertFalse(page.removable) |
205
|
|
|
self.assertFalse(page.upload) |
206
|
|
|
|
207
|
|
|
page = self.get('browse', path='remove') |
208
|
|
|
self.assertEqual(page.path, '%s/remove' % basename) |
209
|
|
|
self.assertEqual(page.files, self.remove_files) |
210
|
|
|
self.assertTrue(page.removable) |
211
|
|
|
self.assertFalse(page.upload) |
212
|
|
|
|
213
|
|
|
page = self.get('browse', path='upload') |
214
|
|
|
self.assertEqual(page.path, '%s/upload' % basename) |
215
|
|
|
self.assertEqual(page.files, self.upload_files) |
216
|
|
|
self.assertFalse(page.removable) |
217
|
|
|
self.assertTrue(page.upload) |
218
|
|
|
|
219
|
|
|
self.assertRaises( |
220
|
|
|
Page404Exception, |
221
|
|
|
self.get, 'browse', path='..' |
222
|
|
|
) |
223
|
|
|
|
224
|
|
|
def test_open(self): |
225
|
|
|
content = b'hello world' |
226
|
|
|
with open(os.path.join(self.start, 'testfile3.txt'), 'wb') as f: |
227
|
|
|
f.write(content) |
228
|
|
|
|
229
|
|
|
data = self.get('open', path='start/testfile3.txt') |
230
|
|
|
self.assertEqual(data, content) |
231
|
|
|
|
232
|
|
|
self.assertRaises( |
233
|
|
|
Page404Exception, |
234
|
|
|
self.get, 'open', path='../shall_not_pass.txt' |
235
|
|
|
) |
236
|
|
|
|
237
|
|
|
def test_remove(self): |
238
|
|
|
open(os.path.join(self.remove, 'testfile2.txt'), 'w').close() |
239
|
|
|
page = self.get('remove', path='remove/testfile2.txt') |
240
|
|
|
self.assertEqual(page.name, 'testfile2.txt') |
241
|
|
|
self.assertEqual(page.path, 'remove/testfile2.txt') |
242
|
|
|
self.assertEqual(page.back, self.url_for('browse', path='remove')) |
243
|
|
|
|
244
|
|
|
basename = os.path.basename(self.base) |
245
|
|
|
page = self.post('remove', path='remove/testfile2.txt') |
246
|
|
|
self.assertEqual(page.path, '%s/remove' % basename) |
247
|
|
|
self.assertEqual(page.files, self.remove_files) |
248
|
|
|
|
249
|
|
|
os.mkdir(os.path.join(self.remove, 'directory')) |
250
|
|
|
page = self.post('remove', path='remove/directory') |
251
|
|
|
self.assertEqual(page.path, '%s/remove' % basename) |
252
|
|
|
self.assertEqual(page.files, self.remove_files) |
253
|
|
|
|
254
|
|
|
self.assertRaises( |
255
|
|
|
Page404Exception, |
256
|
|
|
self.get, 'remove', path='start/testfile.txt' |
257
|
|
|
) |
258
|
|
|
|
259
|
|
|
self.assertRaises( |
260
|
|
|
Page404Exception, |
261
|
|
|
self.post, 'remove', path='start/testfile.txt' |
262
|
|
|
) |
263
|
|
|
|
264
|
|
|
self.app.config['directory_remove'] = None |
265
|
|
|
|
266
|
|
|
self.assertRaises( |
267
|
|
|
Page404Exception, |
268
|
|
|
self.get, 'remove', path='remove/testfile.txt' |
269
|
|
|
) |
270
|
|
|
|
271
|
|
|
self.app.config['directory_remove'] = self.remove |
272
|
|
|
|
273
|
|
|
self.assertRaises( |
274
|
|
|
Page404Exception, |
275
|
|
|
self.get, 'remove', path='../shall_not_pass.txt' |
276
|
|
|
) |
277
|
|
|
|
278
|
|
|
def test_download_file(self): |
279
|
|
|
binfile = os.path.join(self.base, 'testfile.bin') |
280
|
|
|
bindata = bytes(range(256)) |
281
|
|
|
|
282
|
|
|
with open(binfile, 'wb') as f: |
283
|
|
|
f.write(bindata) |
284
|
|
|
data = self.get('download_file', path='testfile.bin') |
285
|
|
|
os.remove(binfile) |
286
|
|
|
|
287
|
|
|
self.assertEqual(data, bindata) |
288
|
|
|
|
289
|
|
|
self.assertRaises( |
290
|
|
|
Page404Exception, |
291
|
|
|
self.get, 'download_file', path='../shall_not_pass.txt' |
292
|
|
|
) |
293
|
|
|
|
294
|
|
|
def test_download_directory(self): |
295
|
|
|
binfile = os.path.join(self.start, 'testfile.bin') |
296
|
|
|
bindata = bytes(range(256)) |
297
|
|
|
|
298
|
|
|
with open(binfile, 'wb') as f: |
299
|
|
|
f.write(bindata) |
300
|
|
|
data = self.get('download_directory', path='start') |
301
|
|
|
os.remove(binfile) |
302
|
|
|
|
303
|
|
|
iodata = io.BytesIO(data) |
304
|
|
|
with tarfile.open('start.tgz', mode="r:gz", fileobj=iodata) as tgz: |
305
|
|
|
tgz_files = [member.name for member in tgz.getmembers() if member.name] |
306
|
|
|
tgz_files.sort() |
307
|
|
|
|
308
|
|
|
self.assertEqual(tgz_files, ['testfile.bin', 'testfile.txt',]) |
309
|
|
|
|
310
|
|
|
self.assertRaises( |
311
|
|
|
Page404Exception, |
312
|
|
|
self.get, 'download_directory', path='../../shall_not_pass' |
313
|
|
|
) |
314
|
|
|
|
315
|
|
|
def test_upload(self): |
316
|
|
|
c = unichr if PY_LEGACY else chr |
317
|
|
|
|
318
|
|
|
files = { |
319
|
|
|
'testfile.txt': io.BytesIO(''.join(map(c, range(127))).encode('ascii')), |
320
|
|
|
'testfile.bin': io.BytesIO(''.join(map(c, range(255))).encode('utf-8')), |
321
|
|
|
} |
322
|
|
|
output = self.post('upload', |
323
|
|
|
path='upload', |
324
|
|
|
data={'file%d' % n: (data, name) for n, (name, data) in enumerate(files.items())} |
325
|
|
|
) |
326
|
|
|
expected_links = sorted(self.url_for('open', path='upload/%s' % i) for i in files) |
327
|
|
|
self.assertEqual(sorted(output.files), expected_links) |
328
|
|
|
self.clear(self.upload) |
329
|
|
|
|
330
|
|
|
def test_upload_duplicate(self): |
331
|
|
|
c = unichr if PY_LEGACY else chr |
332
|
|
|
|
333
|
|
|
files = ( |
334
|
|
|
('testfile.txt', 'something'), |
335
|
|
|
('testfile.txt', 'something_new'), |
336
|
|
|
) |
337
|
|
|
output = self.post('upload', |
338
|
|
|
path='upload', |
339
|
|
|
data={ |
340
|
|
|
'file%d' % n: (io.BytesIO(data.encode('ascii')), name) |
341
|
|
|
for n, (name, data) in enumerate(files) |
342
|
|
|
} |
343
|
|
|
) |
344
|
|
|
|
345
|
|
|
self.assertEqual(len(files), len(output.files)) |
346
|
|
|
|
347
|
|
|
first_file_url = self.url_for('open', path='upload/%s' % files[0][0]) |
348
|
|
|
self.assertIn(first_file_url, output.files) |
349
|
|
|
|
350
|
|
|
file_contents = [] |
351
|
|
|
for filename in os.listdir(self.upload): |
352
|
|
|
with open(os.path.join(self.upload, filename), 'r') as f: |
353
|
|
|
file_contents.append(f.read()) |
354
|
|
|
file_contents.sort() |
355
|
|
|
|
356
|
|
|
expected_file_contents = sorted(content for filename, content in files) |
357
|
|
|
|
358
|
|
|
self.assertEqual(file_contents, expected_file_contents) |
359
|
|
|
self.clear(self.upload) |
360
|
|
|
|
361
|
|
|
|
362
|
|
|
class TestFile(unittest.TestCase): |
363
|
|
|
module = browsepy.file |
364
|
|
|
|
365
|
|
|
def setUp(self): |
366
|
|
|
self.app = browsepy.app # FIXME |
367
|
|
|
self.workbench = tempfile.mkdtemp() |
368
|
|
|
|
369
|
|
|
def tearDown(self): |
370
|
|
|
shutil.rmtree(self.workbench) |
371
|
|
|
|
372
|
|
|
def test_mime(self): |
373
|
|
|
f = self.module.File('non_working_path') |
374
|
|
|
self.assertEqual(f.mimetype, 'application/octet-stream') |
375
|
|
|
|
376
|
|
|
f = self.module.File('non_working_path_with_ext.txt') |
377
|
|
|
self.assertEqual(f.mimetype, 'text/plain') |
378
|
|
|
|
379
|
|
|
tmp_txt = os.path.join(self.workbench, 'ascii_text_file') |
380
|
|
|
with open(tmp_txt, 'w') as f: |
381
|
|
|
f.write('ascii text') |
382
|
|
|
|
383
|
|
|
# test file command |
384
|
|
|
f = self.module.File(tmp_txt) |
385
|
|
|
self.assertEqual(f.mimetype, 'text/plain; charset=us-ascii') |
386
|
|
|
self.assertEqual(f.type, 'text/plain') |
387
|
|
|
self.assertEqual(f.encoding, 'us-ascii') |
388
|
|
|
|
389
|
|
|
# test non-working file command |
390
|
|
|
bad_path = os.path.join(self.workbench, 'path') |
391
|
|
|
os.mkdir(bad_path) |
392
|
|
|
|
393
|
|
|
bad_file = os.path.join(bad_path, 'file') |
394
|
|
|
with open(bad_file, 'w') as f: |
395
|
|
|
f.write('#!/usr/bin/env bash\nexit 1\n') |
396
|
|
|
os.chmod(bad_file, os.stat(bad_file).st_mode | stat.S_IEXEC) |
397
|
|
|
|
398
|
|
|
old_path = os.environ['PATH'] |
399
|
|
|
os.environ['PATH'] = bad_path + os.pathsep + old_path |
400
|
|
|
|
401
|
|
|
f = self.module.File(tmp_txt) |
402
|
|
|
self.assertEqual(f.mimetype, 'application/octet-stream') |
403
|
|
|
|
404
|
|
|
os.environ['PATH'] = old_path |
405
|
|
|
|
406
|
|
|
def test_size(self): |
407
|
|
|
test_file = os.path.join(self.workbench, 'test.csv') |
408
|
|
|
with open(test_file, 'w') as f: |
409
|
|
|
f.write(',\n'*512) |
410
|
|
|
f = self.module.File(test_file) |
411
|
|
|
|
412
|
|
|
default = self.app.config['use_binary_multiples'] |
413
|
|
|
|
414
|
|
|
self.app.config['use_binary_multiples'] = True |
415
|
|
|
self.assertEqual(f.size, '1.00 KiB') |
416
|
|
|
|
417
|
|
|
self.app.config['use_binary_multiples'] = False |
418
|
|
|
self.assertEqual(f.size, '1.02 KB') |
419
|
|
|
|
420
|
|
|
self.app.config['use_binary_multiples'] = default |
421
|
|
|
|
422
|
|
|
self.assertEqual(f.encoding, 'default') |
423
|
|
|
|
424
|
|
|
def test_properties(self): |
425
|
|
|
empty_file = os.path.join(self.workbench, 'empty.txt') |
426
|
|
|
open(empty_file, 'w').close() |
427
|
|
|
f = self.module.File(empty_file) |
428
|
|
|
|
429
|
|
|
self.assertEqual(f.name, 'empty.txt') |
430
|
|
|
self.assertEqual(f.can_download, True) |
431
|
|
|
self.assertEqual(f.can_remove, False) |
432
|
|
|
self.assertEqual(f.can_upload, False) |
433
|
|
|
self.assertEqual(f.parent.path, self.workbench) |
434
|
|
|
self.assertEqual(f.is_directory, False) |
435
|
|
|
|
436
|
|
|
def test_choose_filename(self): |
437
|
|
|
f = self.module.File(self.workbench) |
438
|
|
|
first_file = os.path.join(self.workbench, 'testfile.txt') |
439
|
|
|
|
440
|
|
|
filename = f.choose_filename('testfile.txt', attempts=0) |
441
|
|
|
self.assertEqual(filename, 'testfile.txt') |
442
|
|
|
|
443
|
|
|
open(first_file, 'w').close() |
444
|
|
|
|
445
|
|
|
filename = f.choose_filename('testfile.txt', attempts=0) |
446
|
|
|
self.assertNotEqual(filename, 'testfile (2).txt') |
447
|
|
|
|
448
|
|
|
filename = f.choose_filename('testfile.txt', attempts=2) |
449
|
|
|
self.assertEqual(filename, 'testfile (2).txt') |
450
|
|
|
|
451
|
|
|
second_file = os.path.join(self.workbench, filename) |
452
|
|
|
open(second_file, 'w').close() |
453
|
|
|
|
454
|
|
|
filename = f.choose_filename('testfile.txt', attempts=3) |
455
|
|
|
self.assertEqual(filename, 'testfile (3).txt') |
456
|
|
|
|
457
|
|
|
filename = f.choose_filename('testfile.txt', attempts=2) |
458
|
|
|
self.assertNotEqual(filename, 'testfile (2).txt') |
459
|
|
|
|
460
|
|
|
|
461
|
|
|
class TestFileFunctions(unittest.TestCase): |
462
|
|
|
module = browsepy.file |
463
|
|
|
def test_fmt_size(self): |
464
|
|
|
fnc = self.module.fmt_size |
465
|
|
|
for n, unit in enumerate(self.module.binary_units): |
466
|
|
|
self.assertEqual(fnc(2**(10*n)), (1, unit)) |
467
|
|
|
for n, unit in enumerate(self.module.standard_units): |
468
|
|
|
self.assertEqual(fnc(1000**n, False), (1, unit)) |
469
|
|
|
|
470
|
|
|
def test_secure_filename(self): |
471
|
|
|
self.assertEqual(self.module.secure_filename('/path'), 'path') |
472
|
|
|
self.assertEqual(self.module.secure_filename('..'), '') |
473
|
|
|
self.assertEqual(self.module.secure_filename('::'), '') |
474
|
|
|
self.assertEqual(self.module.secure_filename('\0'), '_') |
475
|
|
|
self.assertEqual(self.module.secure_filename('/'), '') |
476
|
|
|
self.assertEqual(self.module.secure_filename('C:\\'), '') |
477
|
|
|
self.assertEqual(self.module.secure_filename('COM1.asdf', destiny_os='nt'), '') |
478
|
|
|
self.assertEqual(self.module.secure_filename('\xf1', fs_encoding='ascii'), '_') |
479
|
|
|
|
480
|
|
|
if PY_LEGACY: |
481
|
|
|
expected = unicode('\xf1', encoding='latin-1') |
482
|
|
|
self.assertEqual(self.module.secure_filename('\xf1', fs_encoding='utf-8'), expected) |
483
|
|
|
self.assertEqual(self.module.secure_filename(expected, fs_encoding='utf-8'), expected) |
484
|
|
|
else: |
485
|
|
|
self.assertEqual(self.module.secure_filename('\xf1', fs_encoding='utf-8'), '\xf1') |
486
|
|
|
|
487
|
|
|
def test_alternative_filename(self): |
488
|
|
|
self.assertEqual(self.module.alternative_filename('test', 2), 'test (2)') |
489
|
|
|
self.assertEqual(self.module.alternative_filename('test.txt', 2), 'test (2).txt') |
490
|
|
|
self.assertEqual(self.module.alternative_filename('test.tar.gz', 2), 'test (2).tar.gz') |
491
|
|
|
self.assertEqual(self.module.alternative_filename('test.longextension', 2), 'test (2).longextension') |
492
|
|
|
self.assertEqual(self.module.alternative_filename('test.tar.tar.tar', 2), 'test.tar (2).tar.tar') |
493
|
|
|
self.assertNotEqual(self.module.alternative_filename('test'), 'test') |
494
|
|
|
|
495
|
|
|
def test_relativize_path(self): |
496
|
|
|
self.assertEqual(self.module.relativize_path('/parent/child', '/parent'), 'child') |
497
|
|
|
self.assertEqual(self.module.relativize_path('/grandpa/parent/child', '/grandpa/parent'), 'child') |
498
|
|
|
self.assertEqual(self.module.relativize_path('/grandpa/parent/child', '/grandpa'), 'parent/child') |
499
|
|
|
self.assertRaises( |
500
|
|
|
browsepy.OutsideDirectoryBase, |
501
|
|
|
browsepy.relativize_path, '/other', '/parent' |
502
|
|
|
) |
503
|
|
|
|
504
|
|
|
def test_under_base(self): |
505
|
|
|
self.assertTrue(self.module.check_under_base('C:\\as\\df\\gf', 'C:\\as\\df', '\\')) |
506
|
|
|
self.assertTrue(self.module.check_under_base('/as/df', '/as', '/')) |
507
|
|
|
|
508
|
|
|
self.assertFalse(self.module.check_under_base('C:\\cc\\df\\gf', 'C:\\as\\df', '\\')) |
509
|
|
|
self.assertFalse(self.module.check_under_base('/cc/df', '/as', '/')) |
510
|
|
|
|
511
|
|
|
|
512
|
|
|
class TestFunctions(unittest.TestCase): |
513
|
|
|
module = browsepy |
514
|
|
|
def test_empty_iterable(self): |
515
|
|
|
fnc = self.module.empty_iterable |
516
|
|
|
empty, iterable = fnc(i for i in ()) |
517
|
|
|
self.assertTrue(empty) |
518
|
|
|
self.assertRaises(StopIteration, next, iterable) |
519
|
|
|
empty, iterable = fnc(i for i in (1, 2)) |
520
|
|
|
self.assertFalse(empty) |
521
|
|
|
self.assertEqual(tuple(iterable), (1, 2)) |
522
|
|
|
|
523
|
|
|
|
524
|
|
|
class TestMain(unittest.TestCase): |
525
|
|
|
module = browsepy.__main__ |
526
|
|
|
|
527
|
|
|
def setUp(self): |
528
|
|
|
self.app = browsepy.app |
529
|
|
|
self.parser = self.module.ArgParse() |
530
|
|
|
self.base = tempfile.mkdtemp() |
531
|
|
|
|
532
|
|
|
def tearDown(self): |
533
|
|
|
shutil.rmtree(self.base) |
534
|
|
|
|
535
|
|
|
def test_defaults(self): |
536
|
|
|
result = self.parser.parse_args([]) |
537
|
|
|
self.assertEqual(result.host, '127.0.0.1') |
538
|
|
|
self.assertEqual(result.port, 8080) |
539
|
|
|
self.assertEqual(result.directory, os.getcwd()) |
540
|
|
|
self.assertEqual(result.initial, None) |
541
|
|
|
self.assertEqual(result.removable, None) |
542
|
|
|
self.assertEqual(result.upload, None) |
543
|
|
|
self.assertEqual(result.plugin, []) |
544
|
|
|
|
545
|
|
|
def test_params(self): |
546
|
|
|
plugins = ['plugin_1', 'plugin_2', 'namespace.plugin_3'] |
547
|
|
|
result = self.parser.parse_args(['127.1.1.1', '5000', |
548
|
|
|
'--directory=%s' % self.base, |
549
|
|
|
'--initial=%s' % self.base, |
550
|
|
|
'--removable=%s' % self.base, |
551
|
|
|
'--upload=%s' % self.base, |
552
|
|
|
'--plugin=%s' % ','.join(plugins), |
553
|
|
|
]) |
554
|
|
|
self.assertEqual(result.host, '127.1.1.1') |
555
|
|
|
self.assertEqual(result.port, 5000) |
556
|
|
|
self.assertEqual(result.directory, self.base) |
557
|
|
|
self.assertEqual(result.initial, self.base) |
558
|
|
|
self.assertEqual(result.removable, self.base) |
559
|
|
|
self.assertEqual(result.upload, self.base) |
560
|
|
|
self.assertEqual(result.plugin, plugins) |
561
|
|
|
|
562
|
|
|
def test_main(self): |
563
|
|
|
params = {} |
564
|
|
|
self.module.main(argv=[], run_fnc=lambda app, **kwargs: params.update(kwargs)) |
565
|
|
|
|
566
|
|
|
defaults = {'host': '127.0.0.1', 'port': 8080, 'debug': False, 'threaded': True} |
567
|
|
|
params_subset = {k: v for k, v in params.items() if k in defaults} |
568
|
|
|
self.assertEqual(defaults, params_subset) |
569
|
|
|
|
570
|
|
|
class TestPlugins(unittest.TestCase): |
571
|
|
|
app_module = browsepy |
572
|
|
|
managers_module = browsepy.managers |
573
|
|
|
def setUp(self): |
574
|
|
|
self.app = self.app_module.app |
575
|
|
|
self.manager = self.managers_module.PluginManager(self.app) |
576
|
|
|
self.original_namespaces = self.app.config['plugin_namespaces'] |
577
|
|
|
self.plugin_namespace, self.plugin_name = __name__.rsplit('.', 1) |
578
|
|
|
self.app.config['plugin_namespaces'] = (self.plugin_namespace,) |
579
|
|
|
|
580
|
|
|
def tearDown(self): |
581
|
|
|
self.app.config['plugin_namespaces'] = self.original_namespaces |
582
|
|
|
|
583
|
|
|
def test_manager(self): |
584
|
|
|
self.manager.load_plugin(self.plugin_name) |
585
|
|
|
self.assertTrue(self.manager._plugin_loaded) |
586
|
|
|
|
587
|
|
|
endpoints = sorted( |
588
|
|
|
action.endpoint |
589
|
|
|
for action in self.manager.get_actions('a/a') |
590
|
|
|
) |
591
|
|
|
|
592
|
|
|
self.assertEqual(endpoints, sorted(['test_x_x', 'test_a_x', 'test_x_a', 'test_a_a'])) |
593
|
|
|
self.assertEqual(self.app.view_functions['test_plugin.root'](), 'test_plugin_root') |
594
|
|
|
self.assertIn('test_plugin', self.app.blueprints) |
595
|
|
|
|
596
|
|
|
self.assertRaises( |
597
|
|
|
self.managers_module.PluginNotFoundError, |
598
|
|
|
self.manager.load_plugin, |
599
|
|
|
'non_existent_plugin_module' |
600
|
|
|
) |
601
|
|
|
|
602
|
|
|
|
603
|
|
|
def register_plugin(manager): |
604
|
|
|
manager._plugin_loaded = True |
605
|
|
|
manager.register_action('test_x_x', 'test_x_x', ('*/*',)) |
606
|
|
|
manager.register_action('test_a_x', 'test_a_x', ('a/*',)) |
607
|
|
|
manager.register_action('test_x_a', 'test_x_a', ('*/a',)) |
608
|
|
|
manager.register_action('test_a_a', 'test_a_a', ('a/a',)) |
609
|
|
|
manager.register_action('test_b_x', 'test_b_x', ('b/*',)) |
610
|
|
|
|
611
|
|
|
test_plugin_blueprint = flask.Blueprint('test_plugin', __name__, url_prefix = '/test_plugin_blueprint') |
612
|
|
|
test_plugin_blueprint.add_url_rule('/', endpoint='root', view_func=lambda: 'test_plugin_root') |
613
|
|
|
|
614
|
|
|
manager.register_blueprint(test_plugin_blueprint) |
615
|
|
|
|
616
|
|
|
|
617
|
|
|
if __name__ == '__main__': |
618
|
|
|
unittest.main() |
619
|
|
|
|