|
1
|
|
|
#!/usr/bin/env python |
|
2
|
|
|
# -*- coding: UTF-8 -*- |
|
3
|
|
|
|
|
4
|
|
|
import sys |
|
5
|
|
|
import unittest |
|
6
|
|
|
import re |
|
7
|
|
|
import os |
|
8
|
|
|
import os.path |
|
9
|
|
|
import shutil |
|
10
|
|
|
import tempfile |
|
11
|
|
|
import tarfile |
|
12
|
|
|
import xml.etree.ElementTree as ET |
|
13
|
|
|
import io |
|
14
|
|
|
import stat |
|
15
|
|
|
|
|
16
|
|
|
import flask |
|
17
|
|
|
import browsepy |
|
18
|
|
|
import browsepy.file |
|
19
|
|
|
import browsepy.managers |
|
20
|
|
|
import browsepy.__main__ |
|
21
|
|
|
import browsepy.compat |
|
22
|
|
|
|
|
23
|
|
|
PY_LEGACY = browsepy.compat.PY_LEGACY |
|
24
|
|
|
range = browsepy.compat.range |
|
25
|
|
|
|
|
26
|
|
|
class Page(object): |
|
27
|
|
|
@classmethod |
|
28
|
|
|
def itertext(cls, element): |
|
29
|
|
|
# TODO(ergoithz) on python2 drop: replace by element.itertext() |
|
30
|
|
|
yield element.text or '' |
|
31
|
|
|
for child in element: |
|
32
|
|
|
for text in cls.itertext(child): |
|
33
|
|
|
yield text |
|
34
|
|
|
yield child.tail or '' |
|
35
|
|
|
|
|
36
|
|
|
@classmethod |
|
37
|
|
|
def innerText(cls, element): |
|
38
|
|
|
return ''.join(cls.itertext(element)) |
|
39
|
|
|
|
|
40
|
|
|
|
|
41
|
|
|
class ListPage(Page): |
|
42
|
|
|
path_strip_re = re.compile('\s+/\s+') |
|
43
|
|
|
def __init__(self, path, directories, files, removable, upload): |
|
44
|
|
|
self.path = path |
|
45
|
|
|
self.directories = directories |
|
46
|
|
|
self.files = files |
|
47
|
|
|
self.removable = removable |
|
48
|
|
|
self.upload = upload |
|
49
|
|
|
|
|
50
|
|
|
|
|
51
|
|
|
@classmethod |
|
52
|
|
|
def from_source(cls, source): |
|
53
|
|
|
html = ET.fromstring(source) |
|
54
|
|
|
rows = [ |
|
55
|
|
|
( |
|
56
|
|
|
row[0].attrib.get('class') == 'dir-icon', |
|
57
|
|
|
row[1].find('.//a').attrib['href'], |
|
58
|
|
|
any(button.attrib.get('class') == 'remove button' for button in row[2].findall('.//a')) |
|
59
|
|
|
) |
|
60
|
|
|
for row in html.findall('.//table/tbody/tr') |
|
61
|
|
|
] |
|
62
|
|
|
return cls( |
|
63
|
|
|
cls.path_strip_re.sub('/', cls.innerText(html.find('.//h1'))).strip(), |
|
64
|
|
|
[url for isdir, url, removable in rows if isdir], |
|
65
|
|
|
[url for isdir, url, removable in rows if not isdir], |
|
66
|
|
|
all(removable for isdir, url, removable in rows) if rows else False, |
|
67
|
|
|
html.find('.//form//input[@type=\'file\']') is not None |
|
68
|
|
|
) |
|
69
|
|
|
|
|
70
|
|
|
|
|
71
|
|
|
class ConfirmPage(Page): |
|
72
|
|
|
def __init__(self, path, name, back): |
|
73
|
|
|
self.path = path |
|
74
|
|
|
self.name = name |
|
75
|
|
|
self.back = back |
|
76
|
|
|
|
|
77
|
|
|
@classmethod |
|
78
|
|
|
def from_source(cls, source): |
|
79
|
|
|
html = ET.fromstring(source) |
|
80
|
|
|
name = cls.innerText(html.find('.//form//strong')).strip() |
|
81
|
|
|
prefix = html.find('.//form//strong').attrib.get('data-prefix', '') |
|
82
|
|
|
|
|
83
|
|
|
return cls( |
|
84
|
|
|
prefix+name, |
|
85
|
|
|
name, |
|
86
|
|
|
html.find('.//form//a').attrib['href'] |
|
87
|
|
|
) |
|
88
|
|
|
|
|
89
|
|
|
|
|
90
|
|
|
class PageException(Exception): |
|
91
|
|
|
def __init__(self, status): |
|
92
|
|
|
self.status = status |
|
93
|
|
|
|
|
94
|
|
|
|
|
95
|
|
|
class Page404Exception(PageException): |
|
96
|
|
|
pass |
|
97
|
|
|
|
|
98
|
|
|
|
|
99
|
|
|
class TestApp(unittest.TestCase): |
|
100
|
|
|
module = browsepy |
|
101
|
|
|
list_page_class = ListPage |
|
102
|
|
|
confirm_page_class = ConfirmPage |
|
103
|
|
|
page_exceptions = { |
|
104
|
|
|
404: Page404Exception, |
|
105
|
|
|
None: PageException |
|
106
|
|
|
} |
|
107
|
|
|
|
|
108
|
|
|
def setUp(self): |
|
109
|
|
|
self.app = self.module.app |
|
110
|
|
|
self.base = tempfile.mkdtemp() |
|
111
|
|
|
self.start = os.path.join(self.base, 'start') |
|
112
|
|
|
self.remove = os.path.join(self.base, 'remove') |
|
113
|
|
|
self.upload = os.path.join(self.base, 'upload') |
|
114
|
|
|
|
|
115
|
|
|
os.mkdir(self.start) |
|
116
|
|
|
os.mkdir(self.remove) |
|
117
|
|
|
os.mkdir(self.upload) |
|
118
|
|
|
|
|
119
|
|
|
open(os.path.join(self.start, 'testfile.txt'), 'w').close() |
|
120
|
|
|
open(os.path.join(self.remove, 'testfile.txt'), 'w').close() |
|
121
|
|
|
|
|
122
|
|
|
self.app.config.update( |
|
123
|
|
|
directory_base = self.base, |
|
124
|
|
|
directory_start = self.start, |
|
125
|
|
|
directory_remove = self.remove, |
|
126
|
|
|
directory_upload = self.upload, |
|
127
|
|
|
SERVER_NAME = 'test', |
|
128
|
|
|
) |
|
129
|
|
|
|
|
130
|
|
|
self.base_directories = [ |
|
131
|
|
|
self.url_for('browse', path='remove'), |
|
132
|
|
|
self.url_for('browse', path='start'), |
|
133
|
|
|
self.url_for('browse', path='upload'), |
|
134
|
|
|
] |
|
135
|
|
|
self.start_files = [self.url_for('open', path='start/testfile.txt')] |
|
136
|
|
|
self.remove_files = [self.url_for('open', path='remove/testfile.txt')] |
|
137
|
|
|
self.upload_files = [] |
|
138
|
|
|
|
|
139
|
|
|
def clear(self, path): |
|
140
|
|
|
assert path.startswith(self.base + os.sep), 'Cannot clear directories out of base' |
|
141
|
|
|
|
|
142
|
|
|
for sub in os.listdir(path): |
|
143
|
|
|
sub = os.path.join(path, sub) |
|
144
|
|
|
if os.path.isdir(sub): |
|
145
|
|
|
shutil.rmtree(sub) |
|
146
|
|
|
else: |
|
147
|
|
|
os.remove(sub) |
|
148
|
|
|
|
|
149
|
|
|
def tearDown(self): |
|
150
|
|
|
shutil.rmtree(self.base) |
|
151
|
|
|
|
|
152
|
|
|
def get(self, endpoint, **kwargs): |
|
153
|
|
|
if endpoint in ('index', 'browse'): |
|
154
|
|
|
page_class = self.list_page_class |
|
155
|
|
|
elif endpoint == 'remove': |
|
156
|
|
|
page_class = self.confirm_page_class |
|
157
|
|
|
else: |
|
158
|
|
|
page_class = None |
|
159
|
|
|
|
|
160
|
|
|
with self.app.test_client() as client: |
|
161
|
|
|
response = client.get(self.url_for(endpoint, **kwargs)) |
|
162
|
|
|
if response.status_code != 200: |
|
163
|
|
|
raise self.page_exceptions.get(response.status_code, self.page_exceptions[None])(response.status_code) |
|
164
|
|
|
result = response.data if page_class is None else page_class.from_source(response.data) |
|
165
|
|
|
response.close() |
|
166
|
|
|
return result |
|
167
|
|
|
|
|
168
|
|
|
def post(self, endpoint, **kwargs): |
|
169
|
|
|
data = kwargs.pop('data') if 'data' in kwargs else {} |
|
170
|
|
|
with self.app.test_client() as client: |
|
171
|
|
|
response = client.post(self.url_for(endpoint, **kwargs), data=data, follow_redirects=True) |
|
172
|
|
|
if response.status_code != 200: |
|
173
|
|
|
raise self.page_exceptions.get(response.status_code, self.page_exceptions[None])(response.status_code) |
|
174
|
|
|
return self.list_page_class.from_source(response.data) |
|
175
|
|
|
|
|
176
|
|
|
def url_for(self, endpoint, **kwargs): |
|
177
|
|
|
with self.app.app_context(): |
|
178
|
|
|
return flask.url_for(endpoint, _external=False, **kwargs) |
|
179
|
|
|
|
|
180
|
|
|
def test_index(self): |
|
181
|
|
|
page = self.get('index') |
|
182
|
|
|
self.assertEqual(page.path, '%s/start' % os.path.basename(self.base)) |
|
183
|
|
|
|
|
184
|
|
|
self.app.config['directory_start'] = os.path.join(self.base, '..') |
|
185
|
|
|
|
|
186
|
|
|
self.assertRaises( |
|
187
|
|
|
Page404Exception, |
|
188
|
|
|
self.get, 'index' |
|
189
|
|
|
) |
|
190
|
|
|
|
|
191
|
|
|
self.app.config['directory_start'] = self.start |
|
192
|
|
|
|
|
193
|
|
|
def test_browse(self): |
|
194
|
|
|
basename = os.path.basename(self.base) |
|
195
|
|
|
page = self.get('browse') |
|
196
|
|
|
self.assertEqual(page.path, basename) |
|
197
|
|
|
self.assertEqual(page.directories, self.base_directories) |
|
198
|
|
|
self.assertFalse(page.removable) |
|
199
|
|
|
self.assertFalse(page.upload) |
|
200
|
|
|
|
|
201
|
|
|
page = self.get('browse', path='start') |
|
202
|
|
|
self.assertEqual(page.path, '%s/start' % basename) |
|
203
|
|
|
self.assertEqual(page.files, self.start_files) |
|
204
|
|
|
self.assertFalse(page.removable) |
|
205
|
|
|
self.assertFalse(page.upload) |
|
206
|
|
|
|
|
207
|
|
|
page = self.get('browse', path='remove') |
|
208
|
|
|
self.assertEqual(page.path, '%s/remove' % basename) |
|
209
|
|
|
self.assertEqual(page.files, self.remove_files) |
|
210
|
|
|
self.assertTrue(page.removable) |
|
211
|
|
|
self.assertFalse(page.upload) |
|
212
|
|
|
|
|
213
|
|
|
page = self.get('browse', path='upload') |
|
214
|
|
|
self.assertEqual(page.path, '%s/upload' % basename) |
|
215
|
|
|
self.assertEqual(page.files, self.upload_files) |
|
216
|
|
|
self.assertFalse(page.removable) |
|
217
|
|
|
self.assertTrue(page.upload) |
|
218
|
|
|
|
|
219
|
|
|
self.assertRaises( |
|
220
|
|
|
Page404Exception, |
|
221
|
|
|
self.get, 'browse', path='..' |
|
222
|
|
|
) |
|
223
|
|
|
|
|
224
|
|
|
def test_open(self): |
|
225
|
|
|
content = b'hello world' |
|
226
|
|
|
with open(os.path.join(self.start, 'testfile3.txt'), 'wb') as f: |
|
227
|
|
|
f.write(content) |
|
228
|
|
|
|
|
229
|
|
|
data = self.get('open', path='start/testfile3.txt') |
|
230
|
|
|
self.assertEqual(data, content) |
|
231
|
|
|
|
|
232
|
|
|
self.assertRaises( |
|
233
|
|
|
Page404Exception, |
|
234
|
|
|
self.get, 'open', path='../shall_not_pass.txt' |
|
235
|
|
|
) |
|
236
|
|
|
|
|
237
|
|
|
def test_remove(self): |
|
238
|
|
|
open(os.path.join(self.remove, 'testfile2.txt'), 'w').close() |
|
239
|
|
|
page = self.get('remove', path='remove/testfile2.txt') |
|
240
|
|
|
self.assertEqual(page.name, 'testfile2.txt') |
|
241
|
|
|
self.assertEqual(page.path, 'remove/testfile2.txt') |
|
242
|
|
|
self.assertEqual(page.back, self.url_for('browse', path='remove')) |
|
243
|
|
|
|
|
244
|
|
|
basename = os.path.basename(self.base) |
|
245
|
|
|
page = self.post('remove', path='remove/testfile2.txt') |
|
246
|
|
|
self.assertEqual(page.path, '%s/remove' % basename) |
|
247
|
|
|
self.assertEqual(page.files, self.remove_files) |
|
248
|
|
|
|
|
249
|
|
|
os.mkdir(os.path.join(self.remove, 'directory')) |
|
250
|
|
|
page = self.post('remove', path='remove/directory') |
|
251
|
|
|
self.assertEqual(page.path, '%s/remove' % basename) |
|
252
|
|
|
self.assertEqual(page.files, self.remove_files) |
|
253
|
|
|
|
|
254
|
|
|
self.assertRaises( |
|
255
|
|
|
Page404Exception, |
|
256
|
|
|
self.get, 'remove', path='start/testfile.txt' |
|
257
|
|
|
) |
|
258
|
|
|
|
|
259
|
|
|
self.assertRaises( |
|
260
|
|
|
Page404Exception, |
|
261
|
|
|
self.post, 'remove', path='start/testfile.txt' |
|
262
|
|
|
) |
|
263
|
|
|
|
|
264
|
|
|
self.app.config['directory_remove'] = None |
|
265
|
|
|
|
|
266
|
|
|
self.assertRaises( |
|
267
|
|
|
Page404Exception, |
|
268
|
|
|
self.get, 'remove', path='remove/testfile.txt' |
|
269
|
|
|
) |
|
270
|
|
|
|
|
271
|
|
|
self.app.config['directory_remove'] = self.remove |
|
272
|
|
|
|
|
273
|
|
|
self.assertRaises( |
|
274
|
|
|
Page404Exception, |
|
275
|
|
|
self.get, 'remove', path='../shall_not_pass.txt' |
|
276
|
|
|
) |
|
277
|
|
|
|
|
278
|
|
|
def test_download_file(self): |
|
279
|
|
|
binfile = os.path.join(self.base, 'testfile.bin') |
|
280
|
|
|
bindata = bytes(range(256)) |
|
281
|
|
|
|
|
282
|
|
|
with open(binfile, 'wb') as f: |
|
283
|
|
|
f.write(bindata) |
|
284
|
|
|
data = self.get('download_file', path='testfile.bin') |
|
285
|
|
|
os.remove(binfile) |
|
286
|
|
|
|
|
287
|
|
|
self.assertEqual(data, bindata) |
|
288
|
|
|
|
|
289
|
|
|
self.assertRaises( |
|
290
|
|
|
Page404Exception, |
|
291
|
|
|
self.get, 'download_file', path='../shall_not_pass.txt' |
|
292
|
|
|
) |
|
293
|
|
|
|
|
294
|
|
|
def test_download_directory(self): |
|
295
|
|
|
binfile = os.path.join(self.start, 'testfile.bin') |
|
296
|
|
|
bindata = bytes(range(256)) |
|
297
|
|
|
|
|
298
|
|
|
with open(binfile, 'wb') as f: |
|
299
|
|
|
f.write(bindata) |
|
300
|
|
|
data = self.get('download_directory', path='start') |
|
301
|
|
|
os.remove(binfile) |
|
302
|
|
|
|
|
303
|
|
|
iodata = io.BytesIO(data) |
|
304
|
|
|
with tarfile.open('start.tgz', mode="r:gz", fileobj=iodata) as tgz: |
|
305
|
|
|
tgz_files = [member.name for member in tgz.getmembers() if member.name] |
|
306
|
|
|
tgz_files.sort() |
|
307
|
|
|
|
|
308
|
|
|
self.assertEqual(tgz_files, ['testfile.bin', 'testfile.txt',]) |
|
309
|
|
|
|
|
310
|
|
|
self.assertRaises( |
|
311
|
|
|
Page404Exception, |
|
312
|
|
|
self.get, 'download_directory', path='../../shall_not_pass' |
|
313
|
|
|
) |
|
314
|
|
|
|
|
315
|
|
|
def test_upload(self): |
|
316
|
|
|
c = unichr if PY_LEGACY else chr |
|
317
|
|
|
|
|
318
|
|
|
files = { |
|
319
|
|
|
'testfile.txt': io.BytesIO(''.join(map(c, range(127))).encode('ascii')), |
|
320
|
|
|
'testfile.bin': io.BytesIO(''.join(map(c, range(255))).encode('utf-8')), |
|
321
|
|
|
} |
|
322
|
|
|
output = self.post('upload', |
|
323
|
|
|
path='upload', |
|
324
|
|
|
data={'file%d' % n: (data, name) for n, (name, data) in enumerate(files.items())} |
|
325
|
|
|
) |
|
326
|
|
|
expected_links = sorted(self.url_for('open', path='upload/%s' % i) for i in files) |
|
327
|
|
|
self.assertEqual(sorted(output.files), expected_links) |
|
328
|
|
|
self.clear(self.upload) |
|
329
|
|
|
|
|
330
|
|
|
def test_upload_duplicate(self): |
|
331
|
|
|
c = unichr if PY_LEGACY else chr |
|
332
|
|
|
|
|
333
|
|
|
files = ( |
|
334
|
|
|
('testfile.txt', 'something'), |
|
335
|
|
|
('testfile.txt', 'something_new'), |
|
336
|
|
|
) |
|
337
|
|
|
output = self.post('upload', |
|
338
|
|
|
path='upload', |
|
339
|
|
|
data={ |
|
340
|
|
|
'file%d' % n: (io.BytesIO(data.encode('ascii')), name) |
|
341
|
|
|
for n, (name, data) in enumerate(files) |
|
342
|
|
|
} |
|
343
|
|
|
) |
|
344
|
|
|
|
|
345
|
|
|
self.assertEqual(len(files), len(output.files)) |
|
346
|
|
|
|
|
347
|
|
|
first_file_url = self.url_for('open', path='upload/%s' % files[0][0]) |
|
348
|
|
|
self.assertIn(first_file_url, output.files) |
|
349
|
|
|
|
|
350
|
|
|
file_contents = [] |
|
351
|
|
|
for filename in os.listdir(self.upload): |
|
352
|
|
|
with open(os.path.join(self.upload, filename), 'r') as f: |
|
353
|
|
|
file_contents.append(f.read()) |
|
354
|
|
|
file_contents.sort() |
|
355
|
|
|
|
|
356
|
|
|
expected_file_contents = sorted(content for filename, content in files) |
|
357
|
|
|
|
|
358
|
|
|
self.assertEqual(file_contents, expected_file_contents) |
|
359
|
|
|
self.clear(self.upload) |
|
360
|
|
|
|
|
361
|
|
|
|
|
362
|
|
|
class TestFile(unittest.TestCase): |
|
363
|
|
|
module = browsepy.file |
|
364
|
|
|
|
|
365
|
|
|
def setUp(self): |
|
366
|
|
|
self.app = browsepy.app # FIXME |
|
367
|
|
|
self.workbench = tempfile.mkdtemp() |
|
368
|
|
|
|
|
369
|
|
|
def tearDown(self): |
|
370
|
|
|
shutil.rmtree(self.workbench) |
|
371
|
|
|
|
|
372
|
|
|
def test_mime(self): |
|
373
|
|
|
f = self.module.File('non_working_path') |
|
374
|
|
|
self.assertEqual(f.mimetype, 'application/octet-stream') |
|
375
|
|
|
|
|
376
|
|
|
f = self.module.File('non_working_path_with_ext.txt') |
|
377
|
|
|
self.assertEqual(f.mimetype, 'text/plain') |
|
378
|
|
|
|
|
379
|
|
|
tmp_txt = os.path.join(self.workbench, 'ascii_text_file') |
|
380
|
|
|
with open(tmp_txt, 'w') as f: |
|
381
|
|
|
f.write('ascii text') |
|
382
|
|
|
|
|
383
|
|
|
# test file command |
|
384
|
|
|
f = self.module.File(tmp_txt) |
|
385
|
|
|
self.assertEqual(f.mimetype, 'text/plain; charset=us-ascii') |
|
386
|
|
|
self.assertEqual(f.type, 'text/plain') |
|
387
|
|
|
self.assertEqual(f.encoding, 'us-ascii') |
|
388
|
|
|
|
|
389
|
|
|
# test non-working file command |
|
390
|
|
|
bad_path = os.path.join(self.workbench, 'path') |
|
391
|
|
|
os.mkdir(bad_path) |
|
392
|
|
|
|
|
393
|
|
|
bad_file = os.path.join(bad_path, 'file') |
|
394
|
|
|
with open(bad_file, 'w') as f: |
|
395
|
|
|
f.write('#!/usr/bin/env bash\nexit 1\n') |
|
396
|
|
|
os.chmod(bad_file, os.stat(bad_file).st_mode | stat.S_IEXEC) |
|
397
|
|
|
|
|
398
|
|
|
old_path = os.environ['PATH'] |
|
399
|
|
|
os.environ['PATH'] = bad_path + os.pathsep + old_path |
|
400
|
|
|
|
|
401
|
|
|
f = self.module.File(tmp_txt) |
|
402
|
|
|
self.assertEqual(f.mimetype, 'application/octet-stream') |
|
403
|
|
|
|
|
404
|
|
|
os.environ['PATH'] = old_path |
|
405
|
|
|
|
|
406
|
|
|
def test_size(self): |
|
407
|
|
|
test_file = os.path.join(self.workbench, 'test.csv') |
|
408
|
|
|
with open(test_file, 'w') as f: |
|
409
|
|
|
f.write(',\n'*512) |
|
410
|
|
|
f = self.module.File(test_file) |
|
411
|
|
|
|
|
412
|
|
|
default = self.app.config['use_binary_multiples'] |
|
413
|
|
|
|
|
414
|
|
|
self.app.config['use_binary_multiples'] = True |
|
415
|
|
|
self.assertEqual(f.size, '1.00 KiB') |
|
416
|
|
|
|
|
417
|
|
|
self.app.config['use_binary_multiples'] = False |
|
418
|
|
|
self.assertEqual(f.size, '1.02 KB') |
|
419
|
|
|
|
|
420
|
|
|
self.app.config['use_binary_multiples'] = default |
|
421
|
|
|
|
|
422
|
|
|
self.assertEqual(f.encoding, 'default') |
|
423
|
|
|
|
|
424
|
|
|
def test_properties(self): |
|
425
|
|
|
empty_file = os.path.join(self.workbench, 'empty.txt') |
|
426
|
|
|
open(empty_file, 'w').close() |
|
427
|
|
|
f = self.module.File(empty_file) |
|
428
|
|
|
|
|
429
|
|
|
self.assertEqual(f.name, 'empty.txt') |
|
430
|
|
|
self.assertEqual(f.can_download, True) |
|
431
|
|
|
self.assertEqual(f.can_remove, False) |
|
432
|
|
|
self.assertEqual(f.can_upload, False) |
|
433
|
|
|
self.assertEqual(f.parent.path, self.workbench) |
|
434
|
|
|
self.assertEqual(f.is_directory, False) |
|
435
|
|
|
|
|
436
|
|
|
def test_choose_filename(self): |
|
437
|
|
|
f = self.module.File(self.workbench) |
|
438
|
|
|
first_file = os.path.join(self.workbench, 'testfile.txt') |
|
439
|
|
|
|
|
440
|
|
|
filename = f.choose_filename('testfile.txt', attempts=0) |
|
441
|
|
|
self.assertEqual(filename, 'testfile.txt') |
|
442
|
|
|
|
|
443
|
|
|
open(first_file, 'w').close() |
|
444
|
|
|
|
|
445
|
|
|
filename = f.choose_filename('testfile.txt', attempts=0) |
|
446
|
|
|
self.assertNotEqual(filename, 'testfile (2).txt') |
|
447
|
|
|
|
|
448
|
|
|
filename = f.choose_filename('testfile.txt', attempts=2) |
|
449
|
|
|
self.assertEqual(filename, 'testfile (2).txt') |
|
450
|
|
|
|
|
451
|
|
|
second_file = os.path.join(self.workbench, filename) |
|
452
|
|
|
open(second_file, 'w').close() |
|
453
|
|
|
|
|
454
|
|
|
filename = f.choose_filename('testfile.txt', attempts=3) |
|
455
|
|
|
self.assertEqual(filename, 'testfile (3).txt') |
|
456
|
|
|
|
|
457
|
|
|
filename = f.choose_filename('testfile.txt', attempts=2) |
|
458
|
|
|
self.assertNotEqual(filename, 'testfile (2).txt') |
|
459
|
|
|
|
|
460
|
|
|
|
|
461
|
|
|
class TestFileFunctions(unittest.TestCase): |
|
462
|
|
|
module = browsepy.file |
|
463
|
|
|
def test_fmt_size(self): |
|
464
|
|
|
fnc = self.module.fmt_size |
|
465
|
|
|
for n, unit in enumerate(self.module.binary_units): |
|
466
|
|
|
self.assertEqual(fnc(2**(10*n)), (1, unit)) |
|
467
|
|
|
for n, unit in enumerate(self.module.standard_units): |
|
468
|
|
|
self.assertEqual(fnc(1000**n, False), (1, unit)) |
|
469
|
|
|
|
|
470
|
|
|
def test_secure_filename(self): |
|
471
|
|
|
self.assertEqual(self.module.secure_filename('/path'), 'path') |
|
472
|
|
|
self.assertEqual(self.module.secure_filename('..'), '') |
|
473
|
|
|
self.assertEqual(self.module.secure_filename('::'), '') |
|
474
|
|
|
self.assertEqual(self.module.secure_filename('\0'), '_') |
|
475
|
|
|
self.assertEqual(self.module.secure_filename('/'), '') |
|
476
|
|
|
self.assertEqual(self.module.secure_filename('C:\\'), '') |
|
477
|
|
|
self.assertEqual(self.module.secure_filename('COM1.asdf', destiny_os='nt'), '') |
|
478
|
|
|
self.assertEqual(self.module.secure_filename('\xf1', fs_encoding='ascii'), '_') |
|
479
|
|
|
|
|
480
|
|
|
if PY_LEGACY: |
|
481
|
|
|
expected = unicode('\xf1', encoding='latin-1') |
|
482
|
|
|
self.assertEqual(self.module.secure_filename('\xf1', fs_encoding='utf-8'), expected) |
|
483
|
|
|
self.assertEqual(self.module.secure_filename(expected, fs_encoding='utf-8'), expected) |
|
484
|
|
|
else: |
|
485
|
|
|
self.assertEqual(self.module.secure_filename('\xf1', fs_encoding='utf-8'), '\xf1') |
|
486
|
|
|
|
|
487
|
|
|
def test_alternative_filename(self): |
|
488
|
|
|
self.assertEqual(self.module.alternative_filename('test', 2), 'test (2)') |
|
489
|
|
|
self.assertEqual(self.module.alternative_filename('test.txt', 2), 'test (2).txt') |
|
490
|
|
|
self.assertEqual(self.module.alternative_filename('test.tar.gz', 2), 'test (2).tar.gz') |
|
491
|
|
|
self.assertEqual(self.module.alternative_filename('test.longextension', 2), 'test (2).longextension') |
|
492
|
|
|
self.assertEqual(self.module.alternative_filename('test.tar.tar.tar', 2), 'test.tar (2).tar.tar') |
|
493
|
|
|
self.assertNotEqual(self.module.alternative_filename('test'), 'test') |
|
494
|
|
|
|
|
495
|
|
|
def test_relativize_path(self): |
|
496
|
|
|
self.assertEqual(self.module.relativize_path('/parent/child', '/parent'), 'child') |
|
497
|
|
|
self.assertEqual(self.module.relativize_path('/grandpa/parent/child', '/grandpa/parent'), 'child') |
|
498
|
|
|
self.assertEqual(self.module.relativize_path('/grandpa/parent/child', '/grandpa'), 'parent/child') |
|
499
|
|
|
self.assertRaises( |
|
500
|
|
|
browsepy.OutsideDirectoryBase, |
|
501
|
|
|
browsepy.relativize_path, '/other', '/parent' |
|
502
|
|
|
) |
|
503
|
|
|
|
|
504
|
|
|
def test_under_base(self): |
|
505
|
|
|
self.assertTrue(self.module.check_under_base('C:\\as\\df\\gf', 'C:\\as\\df', '\\')) |
|
506
|
|
|
self.assertTrue(self.module.check_under_base('/as/df', '/as', '/')) |
|
507
|
|
|
|
|
508
|
|
|
self.assertFalse(self.module.check_under_base('C:\\cc\\df\\gf', 'C:\\as\\df', '\\')) |
|
509
|
|
|
self.assertFalse(self.module.check_under_base('/cc/df', '/as', '/')) |
|
510
|
|
|
|
|
511
|
|
|
|
|
512
|
|
|
class TestFunctions(unittest.TestCase): |
|
513
|
|
|
module = browsepy |
|
514
|
|
|
def test_empty_iterable(self): |
|
515
|
|
|
fnc = self.module.empty_iterable |
|
516
|
|
|
empty, iterable = fnc(i for i in ()) |
|
517
|
|
|
self.assertTrue(empty) |
|
518
|
|
|
self.assertRaises(StopIteration, next, iterable) |
|
519
|
|
|
empty, iterable = fnc(i for i in (1, 2)) |
|
520
|
|
|
self.assertFalse(empty) |
|
521
|
|
|
self.assertEqual(tuple(iterable), (1, 2)) |
|
522
|
|
|
|
|
523
|
|
|
|
|
524
|
|
|
class TestMain(unittest.TestCase): |
|
525
|
|
|
module = browsepy.__main__ |
|
526
|
|
|
|
|
527
|
|
|
def setUp(self): |
|
528
|
|
|
self.app = browsepy.app |
|
529
|
|
|
self.parser = self.module.ArgParse() |
|
530
|
|
|
self.base = tempfile.mkdtemp() |
|
531
|
|
|
|
|
532
|
|
|
def tearDown(self): |
|
533
|
|
|
shutil.rmtree(self.base) |
|
534
|
|
|
|
|
535
|
|
|
def test_defaults(self): |
|
536
|
|
|
result = self.parser.parse_args([]) |
|
537
|
|
|
self.assertEqual(result.host, '127.0.0.1') |
|
538
|
|
|
self.assertEqual(result.port, 8080) |
|
539
|
|
|
self.assertEqual(result.directory, os.getcwd()) |
|
540
|
|
|
self.assertEqual(result.initial, None) |
|
541
|
|
|
self.assertEqual(result.removable, None) |
|
542
|
|
|
self.assertEqual(result.upload, None) |
|
543
|
|
|
self.assertEqual(result.plugin, []) |
|
544
|
|
|
|
|
545
|
|
|
def test_params(self): |
|
546
|
|
|
plugins = ['plugin_1', 'plugin_2', 'namespace.plugin_3'] |
|
547
|
|
|
result = self.parser.parse_args(['127.1.1.1', '5000', |
|
548
|
|
|
'--directory=%s' % self.base, |
|
549
|
|
|
'--initial=%s' % self.base, |
|
550
|
|
|
'--removable=%s' % self.base, |
|
551
|
|
|
'--upload=%s' % self.base, |
|
552
|
|
|
'--plugin=%s' % ','.join(plugins), |
|
553
|
|
|
]) |
|
554
|
|
|
self.assertEqual(result.host, '127.1.1.1') |
|
555
|
|
|
self.assertEqual(result.port, 5000) |
|
556
|
|
|
self.assertEqual(result.directory, self.base) |
|
557
|
|
|
self.assertEqual(result.initial, self.base) |
|
558
|
|
|
self.assertEqual(result.removable, self.base) |
|
559
|
|
|
self.assertEqual(result.upload, self.base) |
|
560
|
|
|
self.assertEqual(result.plugin, plugins) |
|
561
|
|
|
|
|
562
|
|
|
def test_main(self): |
|
563
|
|
|
params = {} |
|
564
|
|
|
self.module.main(argv=[], run_fnc=lambda app, **kwargs: params.update(kwargs)) |
|
565
|
|
|
|
|
566
|
|
|
defaults = {'host': '127.0.0.1', 'port': 8080, 'debug': False, 'threaded': True} |
|
567
|
|
|
params_subset = {k: v for k, v in params.items() if k in defaults} |
|
568
|
|
|
self.assertEqual(defaults, params_subset) |
|
569
|
|
|
|
|
570
|
|
|
class TestPlugins(unittest.TestCase): |
|
571
|
|
|
app_module = browsepy |
|
572
|
|
|
managers_module = browsepy.managers |
|
573
|
|
|
def setUp(self): |
|
574
|
|
|
self.app = self.app_module.app |
|
575
|
|
|
self.manager = self.managers_module.PluginManager(self.app) |
|
576
|
|
|
self.original_namespaces = self.app.config['plugin_namespaces'] |
|
577
|
|
|
self.plugin_namespace, self.plugin_name = __name__.rsplit('.', 1) |
|
578
|
|
|
self.app.config['plugin_namespaces'] = (self.plugin_namespace,) |
|
579
|
|
|
|
|
580
|
|
|
def tearDown(self): |
|
581
|
|
|
self.app.config['plugin_namespaces'] = self.original_namespaces |
|
582
|
|
|
|
|
583
|
|
|
def test_manager(self): |
|
584
|
|
|
self.manager.load_plugin(self.plugin_name) |
|
585
|
|
|
self.assertTrue(self.manager._plugin_loaded) |
|
586
|
|
|
|
|
587
|
|
|
endpoints = sorted( |
|
588
|
|
|
action.endpoint |
|
589
|
|
|
for action in self.manager.get_actions('a/a') |
|
590
|
|
|
) |
|
591
|
|
|
|
|
592
|
|
|
self.assertEqual(endpoints, sorted(['test_x_x', 'test_a_x', 'test_x_a', 'test_a_a'])) |
|
593
|
|
|
self.assertEqual(self.app.view_functions['test_plugin.root'](), 'test_plugin_root') |
|
594
|
|
|
self.assertIn('test_plugin', self.app.blueprints) |
|
595
|
|
|
|
|
596
|
|
|
self.assertRaises( |
|
597
|
|
|
self.managers_module.PluginNotFoundError, |
|
598
|
|
|
self.manager.load_plugin, |
|
599
|
|
|
'non_existent_plugin_module' |
|
600
|
|
|
) |
|
601
|
|
|
|
|
602
|
|
|
|
|
603
|
|
|
def register_plugin(manager): |
|
604
|
|
|
manager._plugin_loaded = True |
|
605
|
|
|
manager.register_action('test_x_x', 'test_x_x', ('*/*',)) |
|
606
|
|
|
manager.register_action('test_a_x', 'test_a_x', ('a/*',)) |
|
607
|
|
|
manager.register_action('test_x_a', 'test_x_a', ('*/a',)) |
|
608
|
|
|
manager.register_action('test_a_a', 'test_a_a', ('a/a',)) |
|
609
|
|
|
manager.register_action('test_b_x', 'test_b_x', ('b/*',)) |
|
610
|
|
|
|
|
611
|
|
|
test_plugin_blueprint = flask.Blueprint('test_plugin', __name__, url_prefix = '/test_plugin_blueprint') |
|
612
|
|
|
test_plugin_blueprint.add_url_rule('/', endpoint='root', view_func=lambda: 'test_plugin_root') |
|
613
|
|
|
|
|
614
|
|
|
manager.register_blueprint(test_plugin_blueprint) |
|
615
|
|
|
|
|
616
|
|
|
|
|
617
|
|
|
if __name__ == '__main__': |
|
618
|
|
|
unittest.main() |
|
619
|
|
|
|