Completed
Push — master ( 8a89ea...06ed55 )
by
unknown
02:37
created

UploaderTestCase.test_hasRead_WithoutSeek_retry2()   A

Complexity

Conditions 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 8
rs 9.4285
1
# -*- coding: utf-8 -*-
2
# flake8: noqa
3
import os
4
import string
5
import random
6
import tempfile
7
import requests
8
9
import unittest
10
import pytest
11
12
from qiniu import Auth, set_default, etag, PersistentFop, build_op, op_save, Zone
13
from qiniu import put_data, put_file, put_stream
14
from qiniu import BucketManager, build_batch_copy, build_batch_rename, build_batch_move, build_batch_stat, \
15
    build_batch_delete
16
from qiniu import urlsafe_base64_encode, urlsafe_base64_decode
17
18
from qiniu.compat import is_py2, is_py3, b
19
20
from qiniu.services.storage.uploader import _form_put
21
22
import qiniu.config
23
24
if is_py2:
25
    import sys
26
    import StringIO
27
    import urllib
28
29
    reload(sys)
30
    sys.setdefaultencoding('utf-8')
31
    StringIO = StringIO.StringIO
32
    urlopen = urllib.urlopen
33
elif is_py3:
34
    import io
35
    import urllib
36
37
    StringIO = io.StringIO
38
    urlopen = urllib.request.urlopen
39
40
access_key = os.getenv('QINIU_ACCESS_KEY')
41
secret_key = os.getenv('QINIU_SECRET_KEY')
42
bucket_name = os.getenv('QINIU_TEST_BUCKET')
43
44
dummy_access_key = 'abcdefghklmnopq'
45
dummy_secret_key = '1234567890'
46
dummy_auth = Auth(dummy_access_key, dummy_secret_key)
47
48
49
def rand_string(length):
50
    lib = string.ascii_uppercase
51
    return ''.join([random.choice(lib) for i in range(0, length)])
52
53
54
def create_temp_file(size):
55
    t = tempfile.mktemp()
56
    f = open(t, 'wb')
57
    f.seek(size - 1)
58
    f.write(b('0'))
59
    f.close()
60
    return t
61
62
63
def remove_temp_file(file):
64
    try:
65
        os.remove(file)
66
    except OSError:
67
        pass
68
69
70
def is_travis():
71
    return os.environ['QINIU_TEST_ENV'] == 'travis'
72
73
74
class UtilsTest(unittest.TestCase):
75
    def test_urlsafe(self):
76
        a = 'hello\x96'
77
        u = urlsafe_base64_encode(a)
78
        assert b(a) == urlsafe_base64_decode(u)
79
80
81
class AuthTestCase(unittest.TestCase):
82
    def test_token(self):
83
        token = dummy_auth.token('test')
84
        assert token == 'abcdefghklmnopq:mSNBTR7uS2crJsyFr2Amwv1LaYg='
85
86
    def test_token_with_data(self):
87
        token = dummy_auth.token_with_data('test')
88
        assert token == 'abcdefghklmnopq:-jP8eEV9v48MkYiBGs81aDxl60E=:dGVzdA=='
89
90
    def test_noKey(self):
91
        with pytest.raises(ValueError):
92
            Auth(None, None).token('nokey')
93
        with pytest.raises(ValueError):
94
            Auth('', '').token('nokey')
95
96
    def test_token_of_request(self):
97
        token = dummy_auth.token_of_request('http://www.qiniu.com?go=1', 'test', '')
98
        assert token == 'abcdefghklmnopq:cFyRVoWrE3IugPIMP5YJFTO-O-Y='
99
        token = dummy_auth.token_of_request('http://www.qiniu.com?go=1', 'test', 'application/x-www-form-urlencoded')
100
        assert token == 'abcdefghklmnopq:svWRNcacOE-YMsc70nuIYdaa1e4='
101
102
    def test_verify_callback(self):
103
        body = 'name=sunflower.jpg&hash=Fn6qeQi4VDLQ347NiRm-RlQx_4O2&location=Shanghai&price=1500.00&uid=123'
104
        url = 'test.qiniu.com/callback'
105
        ok = dummy_auth.verify_callback('QBox abcdefghklmnopq:ZWyeM5ljWMRFwuPTPOwQ4RwSto4=', url, body)
106
        assert ok
107
108
109
class BucketTestCase(unittest.TestCase):
110
    q = Auth(access_key, secret_key)
111
    bucket = BucketManager(q)
112
113
    def test_list(self):
114
        ret, eof, info = self.bucket.list(bucket_name, limit=4)
115
        print(info)
116
        assert eof is False
117
        assert len(ret.get('items')) == 4
118
        ret, eof, info = self.bucket.list(bucket_name, limit=1000)
119
        print(info)
120
        assert eof is True
121
122
    def test_buckets(self):
123
        ret, info = self.bucket.buckets()
124
        print(info)
125
        assert bucket_name in ret
126
127
    def test_prefetch(self):
128
        ret, info = self.bucket.prefetch(bucket_name, 'python-sdk.html')
129
        print(info)
130
        assert ret['key'] == 'python-sdk.html'
131
132
    def test_fetch(self):
133
        ret, info = self.bucket.fetch('http://developer.qiniu.com/docs/v6/sdk/python-sdk.html', bucket_name,
134
                                      'fetch.html')
135
        print(info)
136
        assert ret['key'] == 'fetch.html'
137
        assert 'hash' in ret
138
139
    def test_fetch_without_key(self):
140
        ret, info = self.bucket.fetch('http://developer.qiniu.com/docs/v6/sdk/python-sdk.html', bucket_name)
141
        print(info)
142
        assert ret['key'] == ret['hash']
143
        assert 'hash' in ret
144
145
    def test_stat(self):
146
        ret, info = self.bucket.stat(bucket_name, 'python-sdk.html')
147
        print(info)
148
        assert 'hash' in ret
149
150
    def test_delete(self):
151
        ret, info = self.bucket.delete(bucket_name, 'del')
152
        print(info)
153
        assert ret is None
154
        assert info.status_code == 612
155
156
    def test_rename(self):
157
        key = 'renameto' + rand_string(8)
158
        self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key)
159
        key2 = key + 'move'
160
        ret, info = self.bucket.rename(bucket_name, key, key2)
161
        print(info)
162
        assert ret == {}
163
        ret, info = self.bucket.delete(bucket_name, key2)
164
        print(info)
165
        assert ret == {}
166
167
    def test_copy(self):
168
        key = 'copyto' + rand_string(8)
169
        ret, info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key)
170
        print(info)
171
        assert ret == {}
172
        ret, info = self.bucket.delete(bucket_name, key)
173
        print(info)
174
        assert ret == {}
175
176
    def test_change_mime(self):
177
        ret, info = self.bucket.change_mime(bucket_name, 'python-sdk.html', 'text/html')
178
        print(info)
179
        assert ret == {}
180
181
    def test_change_type(self):
182
        target_key = 'copyto' + rand_string(8)
183
        self.bucket.copy(bucket_name, 'copyfrom', bucket_name, target_key)
184
        ret, info = self.bucket.change_type(bucket_name, target_key, 1)
185
        print(info)
186
        assert ret == {}
187
        ret, info = self.bucket.stat(bucket_name, target_key)
188
        print(info)
189
        assert 'type' in ret
190
        self.bucket.delete(bucket_name, target_key)
191
192
    def test_copy_force(self):
193
        ret, info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, 'copyfrom', force='true')
194
        print(info)
195
        assert info.status_code == 200
196
197
    def test_batch_copy(self):
198
        key = 'copyto' + rand_string(8)
199
        ops = build_batch_copy(bucket_name, {'copyfrom': key}, bucket_name)
200
        ret, info = self.bucket.batch(ops)
201
        print(info)
202
        assert ret[0]['code'] == 200
203
        ops = build_batch_delete(bucket_name, [key])
204
        ret, info = self.bucket.batch(ops)
205
        print(info)
206
        assert ret[0]['code'] == 200
207
208
    def test_batch_copy_force(self):
209
        ops = build_batch_copy(bucket_name, {'copyfrom': 'copyfrom'}, bucket_name, force='true')
210
        ret, info = self.bucket.batch(ops)
211
        print(info)
212
        assert ret[0]['code'] == 200
213
214
    def test_batch_move(self):
215
        key = 'moveto' + rand_string(8)
216
        self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key)
217
        key2 = key + 'move'
218
        ops = build_batch_move(bucket_name, {key: key2}, bucket_name)
219
        ret, info = self.bucket.batch(ops)
220
        print(info)
221
        assert ret[0]['code'] == 200
222
        ret, info = self.bucket.delete(bucket_name, key2)
223
        print(info)
224
        assert ret == {}
225
226 View Code Duplication
    def test_batch_move_force(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
227
        ret, info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, 'copyfrom', force='true')
228
        print(info)
229
        assert info.status_code == 200
230
        ops = build_batch_move(bucket_name, {'copyfrom': 'copyfrom'}, bucket_name, force='true')
231
        ret, info = self.bucket.batch(ops)
232
        print(info)
233
        assert ret[0]['code'] == 200
234
235
    def test_batch_rename(self):
236
        key = 'rename' + rand_string(8)
237
        self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key)
238
        key2 = key + 'rename'
239
        ops = build_batch_move(bucket_name, {key: key2}, bucket_name)
240
        ret, info = self.bucket.batch(ops)
241
        print(info)
242
        assert ret[0]['code'] == 200
243
        ret, info = self.bucket.delete(bucket_name, key2)
244
        print(info)
245
        assert ret == {}
246
247 View Code Duplication
    def test_batch_rename_force(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
248
        ret, info = self.bucket.rename(bucket_name, 'copyfrom', 'copyfrom', force='true')
249
        print(info)
250
        assert info.status_code == 200
251
        ops = build_batch_rename(bucket_name, {'copyfrom': 'copyfrom'}, force='true')
252
        ret, info = self.bucket.batch(ops)
253
        print(info)
254
        assert ret[0]['code'] == 200
255
256
    def test_batch_stat(self):
257
        ops = build_batch_stat(bucket_name, ['python-sdk.html'])
258
        ret, info = self.bucket.batch(ops)
259
        print(info)
260
        assert ret[0]['code'] == 200
261
262
    def test_delete_after_days(self):
263
        days = '5'
264
        ret, info = self.bucket.delete_after_days(bucket_name, 'invaild.html', days)
265
        assert info.status_code == 612
266
        key = 'copyto' + rand_string(8)
267
        ret, info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key)
268
        ret, info = self.bucket.delete_after_days(bucket_name, key, days)
269
        assert info.status_code == 200
270
271
272
class UploaderTestCase(unittest.TestCase):
273
    mime_type = "text/plain"
274
    params = {'x:a': 'a'}
275
    q = Auth(access_key, secret_key)
276
277
    def test_put(self):
278
        key = 'a\\b\\c"hello'
279
        data = 'hello bubby!'
280
        token = self.q.upload_token(bucket_name)
281
        ret, info = put_data(token, key, data)
282
        print(info)
283
        assert ret['key'] == key
284
285
    def test_put_crc(self):
286
        key = ''
287
        data = 'hello bubby!'
288
        token = self.q.upload_token(bucket_name, key)
289
        ret, info = put_data(token, key, data, check_crc=True)
290
        print(info)
291
        assert ret['key'] == key
292
293
    def test_putfile(self):
294
        localfile = __file__
295
        key = 'test_file'
296
297
        token = self.q.upload_token(bucket_name, key)
298
        ret, info = put_file(token, key, localfile, mime_type=self.mime_type, check_crc=True)
299
        print(info)
300
        assert ret['key'] == key
301
        assert ret['hash'] == etag(localfile)
302
303
    def test_putInvalidCrc(self):
304
        key = 'test_invalid'
305
        data = 'hello bubby!'
306
        crc32 = 'wrong crc32'
307
        token = self.q.upload_token(bucket_name)
308
        ret, info = _form_put(token, key, data, None, None, crc=crc32)
309
        print(info)
310
        assert ret is None
311
        assert info.status_code == 400
312
313
    def test_putWithoutKey(self):
314
        key = None
315
        data = 'hello bubby!'
316
        token = self.q.upload_token(bucket_name)
317
        ret, info = put_data(token, key, data)
318
        print(info)
319
        assert ret['hash'] == ret['key']
320
321
        data = 'hello bubby!'
322
        token = self.q.upload_token(bucket_name, 'nokey2')
323
        ret, info = put_data(token, None, data)
324
        print(info)
325
        assert ret is None
326
        assert info.status_code == 403  # key not match
327
328
    def test_withoutRead_withoutSeek_retry(self):
329
        key = 'retry'
330
        data = 'hello retry!'
331
        set_default(default_zone=Zone('http://a', 'http://upload.qiniu.com'))
332
        token = self.q.upload_token(bucket_name)
333
        ret, info = put_data(token, key, data)
334
        print(info)
335
        assert ret['key'] == key
336
        assert ret['hash'] == 'FlYu0iBR1WpvYi4whKXiBuQpyLLk'
337
338
    def test_putData_without_fname(self):
339
        if is_travis():
340
            return
341
        localfile = create_temp_file(30 * 1024 * 1024)
342
        key = 'test_putData_without_fname'
343
        with open(localfile, 'rb') as input_stream:
344
            token = self.q.upload_token(bucket_name)
345
            ret, info = put_data(token, key, input_stream)
346
            print(info)
347
            assert ret is not None
348
349
    def test_putData_without_fname1(self):
350
        if is_travis():
351
            return
352
        localfile = create_temp_file(30 * 1024 * 1024)
353
        key = 'test_putData_without_fname1'
354
        with open(localfile, 'rb') as input_stream:
355
            token = self.q.upload_token(bucket_name)
356
            ret, info = put_data(token, key, input_stream, self.params, self.mime_type, False, None, "")
357
            print(info)
358
            assert ret is not None
359
360
    def test_putData_without_fname2(self):
361
        if is_travis():
362
            return
363
        localfile = create_temp_file(30 * 1024 * 1024)
364
        key = 'test_putData_without_fname2'
365
        with open(localfile, 'rb') as input_stream:
366
            token = self.q.upload_token(bucket_name)
367
            ret, info = put_data(token, key, input_stream, self.params, self.mime_type, False, None, "  ")
368
            print(info)
369
            assert ret is not None
370
371
372
class ResumableUploaderTestCase(unittest.TestCase):
373
    mime_type = "text/plain"
374
    params = {'x:a': 'a'}
375
    q = Auth(access_key, secret_key)
376
377
    def test_put_stream(self):
378
        localfile = __file__
379
        key = 'test_file_r'
380
        size = os.stat(localfile).st_size
381
        with open(localfile, 'rb') as input_stream:
382
            token = self.q.upload_token(bucket_name, key)
383
            ret, info = put_stream(token, key, input_stream, os.path.basename(__file__), size, self.params,
384
                                   self.mime_type)
385
            print(info)
386
            assert ret['key'] == key
387
388
    def test_big_file(self):
389
        key = 'big'
390
        token = self.q.upload_token(bucket_name, key)
391
        localfile = create_temp_file(4 * 1024 * 1024 + 1)
392
        progress_handler = lambda progress, total: progress
393
        qiniu.set_default(default_zone=Zone('http://a', 'http://upload.qiniu.com'))
394
        ret, info = put_file(token, key, localfile, self.params, self.mime_type, progress_handler=progress_handler)
395
        print(info)
396
        assert ret['key'] == key
397
        remove_temp_file(localfile)
398
399
    def test_retry(self):
400
        localfile = __file__
401
        key = 'test_file_r_retry'
402
        qiniu.set_default(default_zone=Zone('http://a', 'http://upload.qiniu.com'))
403
        token = self.q.upload_token(bucket_name, key)
404
        ret, info = put_file(token, key, localfile, self.params, self.mime_type)
405
        print(info)
406
        assert ret['key'] == key
407
        assert ret['hash'] == etag(localfile)
408
409
410
class DownloadTestCase(unittest.TestCase):
411
    q = Auth(access_key, secret_key)
412
413
    def test_private_url(self):
414
        private_bucket = 'private-res'
415
        private_key = 'gogopher.jpg'
416
        base_url = 'http://%s/%s' % (private_bucket + '.qiniudn.com', private_key)
417
        private_url = self.q.private_download_url(base_url, expires=3600)
418
        print(private_url)
419
        r = requests.get(private_url)
420
        assert r.status_code == 200
421
422
423
class MediaTestCase(unittest.TestCase):
424
    def test_pfop(self):
425
        q = Auth(access_key, secret_key)
426
        pfop = PersistentFop(q, 'testres', 'sdktest')
427
        op = op_save('avthumb/m3u8/segtime/10/vcodec/libx264/s/320x240', 'pythonsdk', 'pfoptest')
428
        ops = []
429
        ops.append(op)
430
        ret, info = pfop.execute('sintel_trailer.mp4', ops, 1)
431
        print(info)
432
        assert ret['persistentId'] is not None
433
434
435
class EtagTestCase(unittest.TestCase):
436
    def test_zero_size(self):
437
        open("x", 'a').close()
438
        hash = etag("x")
439
        assert hash == 'Fto5o-5ea0sNMlW_75VgGJCv2AcJ'
440
        remove_temp_file("x")
441
442
    def test_small_size(self):
443
        localfile = create_temp_file(1024 * 1024)
444
        hash = etag(localfile)
445
        assert hash == 'FnlAdmDasGTQOIgrU1QIZaGDv_1D'
446
        remove_temp_file(localfile)
447
448
    def test_large_size(self):
449
        localfile = create_temp_file(4 * 1024 * 1024 + 1)
450
        hash = etag(localfile)
451
        assert hash == 'ljF323utglY3GI6AvLgawSJ4_dgk'
452
        remove_temp_file(localfile)
453
454
455
class ReadWithoutSeek(object):
456
    def __init__(self, str):
457
        self.str = str
458
        pass
459
460
    def read(self):
461
        print(self.str)
462
463
464
if __name__ == '__main__':
465
    unittest.main()
466