Completed
Push — master ( b7ab4b...cdf730 )
by Felipe A.
56s
created

browsepy.tests.ListPage.from_source()   C

Complexity

Conditions 9

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 9
dl 0
loc 17
rs 6.4615
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
4
import sys
5
import unittest
6
import re
7
import os
8
import os.path
9
import shutil
10
import tempfile
11
import tarfile
12
import xml.etree.ElementTree as ET
13
import io
14
import stat
15
16
import flask
17
import browsepy
18
import browsepy.file
19
import browsepy.managers
20
import browsepy.__main__
21
import browsepy.compat
22
23
PY_LEGACY = browsepy.compat.PY_LEGACY
24
range = browsepy.compat.range
25
26
class Page(object):
27
    @classmethod
28
    def itertext(cls, element):
29
        # TODO(ergoithz) on python2 drop: replace by element.itertext()
30
        yield element.text or ''
31
        for child in element:
32
            for text in cls.itertext(child):
33
                yield text
34
            yield child.tail or ''
35
36
    @classmethod
37
    def innerText(cls, element):
38
        return ''.join(cls.itertext(element))
39
40
41
class ListPage(Page):
42
    path_strip_re = re.compile('\s+/\s+')
43
    def __init__(self, path, directories, files, removable, upload):
44
        self.path = path
45
        self.directories = directories
46
        self.files = files
47
        self.removable = removable
48
        self.upload = upload
49
50
51
    @classmethod
52
    def from_source(cls, source):
53
        html = ET.fromstring(source)
54
        rows = [
55
            (
56
                row[0].attrib.get('class') == 'dir-icon',
57
                row[1].find('.//a').attrib['href'],
58
                any(button.attrib.get('class') == 'remove button' for button in row[2].findall('.//a'))
59
            )
60
            for row in html.findall('.//table/tbody/tr')
61
        ]
62
        return cls(
63
            cls.path_strip_re.sub('/', cls.innerText(html.find('.//h1'))).strip(),
64
            [url for isdir, url, removable in rows if isdir],
65
            [url for isdir, url, removable in rows if not isdir],
66
            all(removable for isdir, url, removable in rows) if rows else False,
67
            html.find('.//form//input[@type=\'file\']') is not None
68
        )
69
70
71
class ConfirmPage(Page):
72
    def __init__(self, path, name, back):
73
        self.path = path
74
        self.name = name
75
        self.back = back
76
77
    @classmethod
78
    def from_source(cls, source):
79
        html = ET.fromstring(source)
80
        name = cls.innerText(html.find('.//form//strong')).strip()
81
        prefix = html.find('.//form//strong').attrib.get('data-prefix', '')
82
83
        return cls(
84
            prefix+name,
85
            name,
86
            html.find('.//form//a').attrib['href']
87
        )
88
89
90
class PageException(Exception):
91
    def __init__(self, status):
92
        self.status = status
93
94
95
class Page404Exception(PageException):
96
    pass
97
98
99
class TestApp(unittest.TestCase):
100
    module = browsepy
101
    list_page_class = ListPage
102
    confirm_page_class = ConfirmPage
103
    page_exceptions = {
104
        404: Page404Exception,
105
        None: PageException
106
    }
107
108
    def setUp(self):
109
        self.app = self.module.app
110
        self.base = tempfile.mkdtemp()
111
        self.start = os.path.join(self.base, 'start')
112
        self.remove = os.path.join(self.base, 'remove')
113
        self.upload = os.path.join(self.base, 'upload')
114
115
        os.mkdir(self.start)
116
        os.mkdir(self.remove)
117
        os.mkdir(self.upload)
118
119
        open(os.path.join(self.start, 'testfile.txt'), 'w').close()
120
        open(os.path.join(self.remove, 'testfile.txt'), 'w').close()
121
122
        self.app.config.update(
123
            directory_base = self.base,
124
            directory_start = self.start,
125
            directory_remove = self.remove,
126
            directory_upload = self.upload,
127
            SERVER_NAME = 'test',
128
        )
129
130
        self.base_directories = [
131
            self.url_for('browse', path='remove'),
132
            self.url_for('browse', path='start'),
133
            self.url_for('browse', path='upload'),
134
            ]
135
        self.start_files = [self.url_for('open', path='start/testfile.txt')]
136
        self.remove_files = [self.url_for('open', path='remove/testfile.txt')]
137
        self.upload_files = []
138
139
    def clear(self, path):
140
        assert path.startswith(self.base + os.sep), 'Cannot clear directories out of base'
141
142
        for sub in os.listdir(path):
143
            sub = os.path.join(path, sub)
144
            if os.path.isdir(sub):
145
                shutil.rmtree(sub)
146
            else:
147
                os.remove(sub)
148
149
    def tearDown(self):
150
        shutil.rmtree(self.base)
151
152
    def get(self, endpoint, **kwargs):
153
        if endpoint in ('index', 'browse'):
154
            page_class = self.list_page_class
155
        elif endpoint == 'remove':
156
            page_class = self.confirm_page_class
157
        else:
158
            page_class = None
159
160
        with self.app.test_client() as client:
161
            response = client.get(self.url_for(endpoint, **kwargs))
162
            if response.status_code != 200:
163
                raise self.page_exceptions.get(response.status_code, self.page_exceptions[None])(response.status_code)
164
            result = response.data if page_class is None else page_class.from_source(response.data)
165
            response.close()
166
            return result
167
168
    def post(self, endpoint, **kwargs):
169
        data = kwargs.pop('data') if 'data' in kwargs else {}
170
        with self.app.test_client() as client:
171
            response = client.post(self.url_for(endpoint, **kwargs), data=data, follow_redirects=True)
172
            if response.status_code != 200:
173
                raise self.page_exceptions.get(response.status_code, self.page_exceptions[None])(response.status_code)
174
            return self.list_page_class.from_source(response.data)
175
176
    def url_for(self, endpoint, **kwargs):
177
        with self.app.app_context():
178
            return flask.url_for(endpoint, _external=False, **kwargs)
179
180
    def test_index(self):
181
        page = self.get('index')
182
        self.assertEqual(page.path, '%s/start' % os.path.basename(self.base))
183
184
        self.app.config['directory_start'] = os.path.join(self.base, '..')
185
186
        self.assertRaises(
187
            Page404Exception,
188
            self.get, 'index'
189
        )
190
191
        self.app.config['directory_start'] = self.start
192
193
    def test_browse(self):
194
        basename = os.path.basename(self.base)
195
        page = self.get('browse')
196
        self.assertEqual(page.path, basename)
197
        self.assertEqual(page.directories, self.base_directories)
198
        self.assertFalse(page.removable)
199
        self.assertFalse(page.upload)
200
201
        page = self.get('browse', path='start')
202
        self.assertEqual(page.path, '%s/start' % basename)
203
        self.assertEqual(page.files, self.start_files)
204
        self.assertFalse(page.removable)
205
        self.assertFalse(page.upload)
206
207
        page = self.get('browse', path='remove')
208
        self.assertEqual(page.path, '%s/remove' % basename)
209
        self.assertEqual(page.files, self.remove_files)
210
        self.assertTrue(page.removable)
211
        self.assertFalse(page.upload)
212
213
        page = self.get('browse', path='upload')
214
        self.assertEqual(page.path, '%s/upload' % basename)
215
        self.assertEqual(page.files, self.upload_files)
216
        self.assertFalse(page.removable)
217
        self.assertTrue(page.upload)
218
219
        self.assertRaises(
220
            Page404Exception,
221
            self.get, 'browse', path='..'
222
        )
223
224
    def test_open(self):
225
        content = b'hello world'
226
        with open(os.path.join(self.start, 'testfile3.txt'), 'wb') as f:
227
            f.write(content)
228
229
        data = self.get('open', path='start/testfile3.txt')
230
        self.assertEqual(data, content)
231
232
        self.assertRaises(
233
            Page404Exception,
234
            self.get, 'open', path='../shall_not_pass.txt'
235
        )
236
237
    def test_remove(self):
238
        open(os.path.join(self.remove, 'testfile2.txt'), 'w').close()
239
        page = self.get('remove', path='remove/testfile2.txt')
240
        self.assertEqual(page.name, 'testfile2.txt')
241
        self.assertEqual(page.path, 'remove/testfile2.txt')
242
        self.assertEqual(page.back, self.url_for('browse', path='remove'))
243
244
        basename = os.path.basename(self.base)
245
        page = self.post('remove', path='remove/testfile2.txt')
246
        self.assertEqual(page.path, '%s/remove' % basename)
247
        self.assertEqual(page.files, self.remove_files)
248
249
        os.mkdir(os.path.join(self.remove, 'directory'))
250
        page = self.post('remove', path='remove/directory')
251
        self.assertEqual(page.path, '%s/remove' % basename)
252
        self.assertEqual(page.files, self.remove_files)
253
254
        self.assertRaises(
255
            Page404Exception,
256
            self.get, 'remove', path='start/testfile.txt'
257
        )
258
259
        self.assertRaises(
260
            Page404Exception,
261
            self.post, 'remove', path='start/testfile.txt'
262
        )
263
264
        self.app.config['directory_remove'] = None
265
266
        self.assertRaises(
267
            Page404Exception,
268
            self.get, 'remove', path='remove/testfile.txt'
269
        )
270
271
        self.app.config['directory_remove'] = self.remove
272
273
        self.assertRaises(
274
            Page404Exception,
275
            self.get, 'remove', path='../shall_not_pass.txt'
276
        )
277
278
    def test_download_file(self):
279
        binfile = os.path.join(self.base, 'testfile.bin')
280
        bindata = bytes(range(256))
281
282
        with open(binfile, 'wb') as f:
283
            f.write(bindata)
284
        data = self.get('download_file', path='testfile.bin')
285
        os.remove(binfile)
286
287
        self.assertEqual(data, bindata)
288
289
        self.assertRaises(
290
            Page404Exception,
291
            self.get, 'download_file', path='../shall_not_pass.txt'
292
        )
293
294
    def test_download_directory(self):
295
        binfile = os.path.join(self.start, 'testfile.bin')
296
        bindata = bytes(range(256))
297
298
        with open(binfile, 'wb') as f:
299
            f.write(bindata)
300
        data = self.get('download_directory', path='start')
301
        os.remove(binfile)
302
303
        iodata = io.BytesIO(data)
304
        with tarfile.open('start.tgz', mode="r:gz", fileobj=iodata) as tgz:
305
            tgz_files = [member.name for member in tgz.getmembers() if member.name]
306
        tgz_files.sort()
307
308
        self.assertEqual(tgz_files, ['testfile.bin', 'testfile.txt',])
309
310
        self.assertRaises(
311
            Page404Exception,
312
            self.get, 'download_directory', path='../../shall_not_pass'
313
        )
314
315
    def test_upload(self):
316
        c = unichr if PY_LEGACY else chr
317
318
        files = {
319
            'testfile.txt': io.BytesIO(''.join(map(c, range(127))).encode('ascii')),
320
            'testfile.bin': io.BytesIO(''.join(map(c, range(255))).encode('utf-8')),
321
        }
322
        output = self.post('upload',
323
                           path='upload',
324
                           data={'file%d' % n: (data, name) for n, (name, data) in enumerate(files.items())}
325
                           )
326
        expected_links = sorted(self.url_for('open', path='upload/%s' % i) for i in files)
327
        self.assertEqual(sorted(output.files), expected_links)
328
        self.clear(self.upload)
329
330
    def test_upload_duplicate(self):
331
        c = unichr if PY_LEGACY else chr
332
333
        files = (
334
            ('testfile.txt', 'something'),
335
            ('testfile.txt', 'something_new'),
336
        )
337
        output = self.post('upload',
338
                           path='upload',
339
                           data={
340
                               'file%d' % n: (io.BytesIO(data.encode('ascii')), name)
341
                               for n, (name, data) in enumerate(files)
342
                               }
343
                           )
344
345
        self.assertEqual(len(files), len(output.files))
346
347
        first_file_url = self.url_for('open', path='upload/%s' % files[0][0])
348
        self.assertIn(first_file_url, output.files)
349
350
        file_contents = []
351
        for filename in os.listdir(self.upload):
352
            with open(os.path.join(self.upload, filename), 'r') as f:
353
                file_contents.append(f.read())
354
        file_contents.sort()
355
356
        expected_file_contents = sorted(content for filename, content in files)
357
358
        self.assertEqual(file_contents, expected_file_contents)
359
        self.clear(self.upload)
360
361
362
class TestFile(unittest.TestCase):
363
    module = browsepy.file
364
365
    def setUp(self):
366
        self.app = browsepy.app # FIXME
367
        self.workbench = tempfile.mkdtemp()
368
369
    def tearDown(self):
370
        shutil.rmtree(self.workbench)
371
372
    def test_mime(self):
373
        f = self.module.File('non_working_path')
374
        self.assertEqual(f.mimetype, 'application/octet-stream')
375
376
        f = self.module.File('non_working_path_with_ext.txt')
377
        self.assertEqual(f.mimetype, 'text/plain')
378
379
        tmp_txt = os.path.join(self.workbench, 'ascii_text_file')
380
        with open(tmp_txt, 'w') as f:
381
            f.write('ascii text')
382
383
        # test file command
384
        f = self.module.File(tmp_txt)
385
        self.assertEqual(f.mimetype, 'text/plain; charset=us-ascii')
386
        self.assertEqual(f.type, 'text/plain')
387
        self.assertEqual(f.encoding, 'us-ascii')
388
389
        # test non-working file command
390
        bad_path = os.path.join(self.workbench, 'path')
391
        os.mkdir(bad_path)
392
393
        bad_file = os.path.join(bad_path, 'file')
394
        with open(bad_file, 'w') as f:
395
            f.write('#!/usr/bin/env bash\nexit 1\n')
396
        os.chmod(bad_file, os.stat(bad_file).st_mode | stat.S_IEXEC)
397
398
        old_path = os.environ['PATH']
399
        os.environ['PATH'] = bad_path + os.pathsep + old_path
400
401
        f = self.module.File(tmp_txt)
402
        self.assertEqual(f.mimetype, 'application/octet-stream')
403
404
        os.environ['PATH'] = old_path
405
406
    def test_size(self):
407
        test_file = os.path.join(self.workbench, 'test.csv')
408
        with open(test_file, 'w') as f:
409
            f.write(',\n'*512)
410
        f = self.module.File(test_file)
411
412
        default = self.app.config['use_binary_multiples']
413
414
        self.app.config['use_binary_multiples'] = True
415
        self.assertEqual(f.size, '1.00 KiB')
416
417
        self.app.config['use_binary_multiples'] = False
418
        self.assertEqual(f.size, '1.02 KB')
419
420
        self.app.config['use_binary_multiples'] = default
421
422
        self.assertEqual(f.encoding, 'default')
423
424
    def test_properties(self):
425
        empty_file = os.path.join(self.workbench, 'empty.txt')
426
        open(empty_file, 'w').close()
427
        f = self.module.File(empty_file)
428
429
        self.assertEqual(f.name, 'empty.txt')
430
        self.assertEqual(f.can_download, True)
431
        self.assertEqual(f.can_remove, False)
432
        self.assertEqual(f.can_upload, False)
433
        self.assertEqual(f.parent.path, self.workbench)
434
        self.assertEqual(f.is_directory, False)
435
436
    def test_choose_filename(self):
437
        f = self.module.File(self.workbench)
438
        first_file =  os.path.join(self.workbench, 'testfile.txt')
439
440
        filename = f.choose_filename('testfile.txt', attempts=0)
441
        self.assertEqual(filename, 'testfile.txt')
442
443
        open(first_file, 'w').close()
444
445
        filename = f.choose_filename('testfile.txt', attempts=0)
446
        self.assertNotEqual(filename, 'testfile (2).txt')
447
448
        filename = f.choose_filename('testfile.txt', attempts=2)
449
        self.assertEqual(filename, 'testfile (2).txt')
450
451
        second_file = os.path.join(self.workbench, filename)
452
        open(second_file, 'w').close()
453
454
        filename = f.choose_filename('testfile.txt', attempts=3)
455
        self.assertEqual(filename, 'testfile (3).txt')
456
457
        filename = f.choose_filename('testfile.txt', attempts=2)
458
        self.assertNotEqual(filename, 'testfile (2).txt')
459
460
461
class TestFileFunctions(unittest.TestCase):
462
    module = browsepy.file
463
    def test_fmt_size(self):
464
        fnc = self.module.fmt_size
465
        for n, unit in enumerate(self.module.binary_units):
466
            self.assertEqual(fnc(2**(10*n)), (1, unit))
467
        for n, unit in enumerate(self.module.standard_units):
468
            self.assertEqual(fnc(1000**n, False), (1, unit))
469
470
    def test_secure_filename(self):
471
        self.assertEqual(self.module.secure_filename('/path'), 'path')
472
        self.assertEqual(self.module.secure_filename('..'), '')
473
        self.assertEqual(self.module.secure_filename('::'), '')
474
        self.assertEqual(self.module.secure_filename('\0'), '_')
475
        self.assertEqual(self.module.secure_filename('/'), '')
476
        self.assertEqual(self.module.secure_filename('C:\\'), '')
477
        self.assertEqual(self.module.secure_filename('COM1.asdf', destiny_os='nt'), '')
478
        self.assertEqual(self.module.secure_filename('\xf1', fs_encoding='ascii'), '_')
479
480
        if PY_LEGACY:
481
            expected = unicode('\xf1', encoding='latin-1')
482
            self.assertEqual(self.module.secure_filename('\xf1', fs_encoding='utf-8'), expected)
483
            self.assertEqual(self.module.secure_filename(expected, fs_encoding='utf-8'), expected)
484
        else:
485
            self.assertEqual(self.module.secure_filename('\xf1', fs_encoding='utf-8'), '\xf1')
486
487
    def test_alternative_filename(self):
488
        self.assertEqual(self.module.alternative_filename('test', 2), 'test (2)')
489
        self.assertEqual(self.module.alternative_filename('test.txt', 2), 'test (2).txt')
490
        self.assertEqual(self.module.alternative_filename('test.tar.gz', 2), 'test (2).tar.gz')
491
        self.assertEqual(self.module.alternative_filename('test.longextension', 2), 'test (2).longextension')
492
        self.assertEqual(self.module.alternative_filename('test.tar.tar.tar', 2), 'test.tar (2).tar.tar')
493
        self.assertNotEqual(self.module.alternative_filename('test'), 'test')
494
495
    def test_relativize_path(self):
496
        self.assertEqual(self.module.relativize_path('/parent/child', '/parent'), 'child')
497
        self.assertEqual(self.module.relativize_path('/grandpa/parent/child', '/grandpa/parent'), 'child')
498
        self.assertEqual(self.module.relativize_path('/grandpa/parent/child', '/grandpa'), 'parent/child')
499
        self.assertRaises(
500
            browsepy.OutsideDirectoryBase,
501
            browsepy.relativize_path, '/other', '/parent'
502
        )
503
504
    def test_under_base(self):
505
        self.assertTrue(self.module.check_under_base('C:\\as\\df\\gf', 'C:\\as\\df', '\\'))
506
        self.assertTrue(self.module.check_under_base('/as/df', '/as', '/'))
507
508
        self.assertFalse(self.module.check_under_base('C:\\cc\\df\\gf', 'C:\\as\\df', '\\'))
509
        self.assertFalse(self.module.check_under_base('/cc/df', '/as', '/'))
510
511
512
class TestFunctions(unittest.TestCase):
513
    module = browsepy
514
    def test_empty_iterable(self):
515
        fnc = self.module.empty_iterable
516
        empty, iterable = fnc(i for i in ())
517
        self.assertTrue(empty)
518
        self.assertRaises(StopIteration, next, iterable)
519
        empty, iterable = fnc(i for i in (1, 2))
520
        self.assertFalse(empty)
521
        self.assertEqual(tuple(iterable), (1, 2))
522
523
524
class TestMain(unittest.TestCase):
525
    module = browsepy.__main__
526
527
    def setUp(self):
528
        self.app = browsepy.app
529
        self.parser = self.module.ArgParse()
530
        self.base = tempfile.mkdtemp()
531
532
    def tearDown(self):
533
        shutil.rmtree(self.base)
534
535
    def test_defaults(self):
536
        result = self.parser.parse_args([])
537
        self.assertEqual(result.host, '127.0.0.1')
538
        self.assertEqual(result.port, 8080)
539
        self.assertEqual(result.directory, os.getcwd())
540
        self.assertEqual(result.initial, None)
541
        self.assertEqual(result.removable, None)
542
        self.assertEqual(result.upload, None)
543
        self.assertEqual(result.plugin, [])
544
545
    def test_params(self):
546
        plugins = ['plugin_1', 'plugin_2', 'namespace.plugin_3']
547
        result = self.parser.parse_args(['127.1.1.1', '5000',
548
            '--directory=%s' % self.base,
549
            '--initial=%s' % self.base,
550
            '--removable=%s' % self.base,
551
            '--upload=%s' % self.base,
552
            '--plugin=%s' % ','.join(plugins),
553
            ])
554
        self.assertEqual(result.host, '127.1.1.1')
555
        self.assertEqual(result.port, 5000)
556
        self.assertEqual(result.directory, self.base)
557
        self.assertEqual(result.initial, self.base)
558
        self.assertEqual(result.removable, self.base)
559
        self.assertEqual(result.upload, self.base)
560
        self.assertEqual(result.plugin, plugins)
561
562
    def test_main(self):
563
        params = {}
564
        self.module.main(argv=[], run_fnc=lambda app, **kwargs: params.update(kwargs))
565
566
        defaults = {'host': '127.0.0.1', 'port': 8080, 'debug': False, 'threaded': True}
567
        params_subset = {k: v for k, v in params.items() if k in defaults}
568
        self.assertEqual(defaults, params_subset)
569
570
class TestPlugins(unittest.TestCase):
571
    app_module = browsepy
572
    managers_module = browsepy.managers
573
    def setUp(self):
574
        self.app = self.app_module.app
575
        self.manager = self.managers_module.PluginManager(self.app)
576
        self.original_namespaces = self.app.config['plugin_namespaces']
577
        self.plugin_namespace, self.plugin_name = __name__.rsplit('.', 1)
578
        self.app.config['plugin_namespaces'] = (self.plugin_namespace,)
579
580
    def tearDown(self):
581
        self.app.config['plugin_namespaces'] = self.original_namespaces
582
583
    def test_manager(self):
584
        self.manager.load_plugin(self.plugin_name)
585
        self.assertTrue(self.manager._plugin_loaded)
586
587
        endpoints = sorted(
588
            action.endpoint
589
            for action in self.manager.get_actions('a/a')
590
            )
591
592
        self.assertEqual(endpoints, sorted(['test_x_x', 'test_a_x', 'test_x_a', 'test_a_a']))
593
        self.assertEqual(self.app.view_functions['test_plugin.root'](), 'test_plugin_root')
594
        self.assertIn('test_plugin', self.app.blueprints)
595
596
        self.assertRaises(
597
            self.managers_module.PluginNotFoundError,
598
            self.manager.load_plugin,
599
            'non_existent_plugin_module'
600
            )
601
602
603
def register_plugin(manager):
604
    manager._plugin_loaded = True
605
    manager.register_action('test_x_x', 'test_x_x', ('*/*',))
606
    manager.register_action('test_a_x', 'test_a_x', ('a/*',))
607
    manager.register_action('test_x_a', 'test_x_a', ('*/a',))
608
    manager.register_action('test_a_a', 'test_a_a', ('a/a',))
609
    manager.register_action('test_b_x', 'test_b_x', ('b/*',))
610
611
    test_plugin_blueprint = flask.Blueprint('test_plugin', __name__, url_prefix = '/test_plugin_blueprint')
612
    test_plugin_blueprint.add_url_rule('/', endpoint='root', view_func=lambda: 'test_plugin_root')
613
614
    manager.register_blueprint(test_plugin_blueprint)
615
616
617
if __name__ == '__main__':
618
    unittest.main()
619