1
|
|
|
# -*- coding: utf-8 -*- |
2
|
|
|
# flake8: noqa |
3
|
|
|
import os |
4
|
|
|
import string |
5
|
|
|
import random |
6
|
|
|
import tempfile |
7
|
|
|
import requests |
8
|
|
|
|
9
|
|
|
import unittest |
10
|
|
|
import pytest |
11
|
|
|
|
12
|
|
|
from qiniu import Auth, set_default, etag, PersistentFop, build_op, op_save, Zone |
13
|
|
|
from qiniu import put_data, put_file, put_stream |
14
|
|
|
from qiniu import BucketManager, build_batch_copy, build_batch_rename, build_batch_move, build_batch_stat, build_batch_delete |
15
|
|
|
from qiniu import urlsafe_base64_encode, urlsafe_base64_decode |
16
|
|
|
|
17
|
|
|
from qiniu.compat import is_py2, is_py3, b |
18
|
|
|
|
19
|
|
|
from qiniu.services.storage.uploader import _form_put |
20
|
|
|
|
21
|
|
|
import qiniu.config |
22
|
|
|
|
23
|
|
|
if is_py2: |
24
|
|
|
import sys |
25
|
|
|
import StringIO |
26
|
|
|
import urllib |
27
|
|
|
reload(sys) |
28
|
|
|
sys.setdefaultencoding('utf-8') |
29
|
|
|
StringIO = StringIO.StringIO |
30
|
|
|
urlopen = urllib.urlopen |
31
|
|
|
elif is_py3: |
32
|
|
|
import io |
33
|
|
|
import urllib |
34
|
|
|
StringIO = io.StringIO |
35
|
|
|
urlopen = urllib.request.urlopen |
36
|
|
|
|
37
|
|
|
access_key = os.getenv('QINIU_ACCESS_KEY') |
38
|
|
|
secret_key = os.getenv('QINIU_SECRET_KEY') |
39
|
|
|
bucket_name = os.getenv('QINIU_TEST_BUCKET') |
40
|
|
|
|
41
|
|
|
dummy_access_key = 'abcdefghklmnopq' |
42
|
|
|
dummy_secret_key = '1234567890' |
43
|
|
|
dummy_auth = Auth(dummy_access_key, dummy_secret_key) |
44
|
|
|
|
45
|
|
|
|
46
|
|
|
def rand_string(length): |
47
|
|
|
lib = string.ascii_uppercase |
48
|
|
|
return ''.join([random.choice(lib) for i in range(0, length)]) |
49
|
|
|
|
50
|
|
|
|
51
|
|
|
def create_temp_file(size): |
52
|
|
|
t = tempfile.mktemp() |
53
|
|
|
f = open(t, 'wb') |
54
|
|
|
f.seek(size-1) |
55
|
|
|
f.write(b('0')) |
56
|
|
|
f.close() |
57
|
|
|
return t |
58
|
|
|
|
59
|
|
|
|
60
|
|
|
def remove_temp_file(file): |
61
|
|
|
try: |
62
|
|
|
os.remove(file) |
63
|
|
|
except OSError: |
64
|
|
|
pass |
65
|
|
|
|
66
|
|
|
|
67
|
|
|
def is_travis(): |
68
|
|
|
return os.environ['QINIU_TEST_ENV'] == 'travis' |
69
|
|
|
|
70
|
|
|
|
71
|
|
|
class UtilsTest(unittest.TestCase): |
72
|
|
|
|
73
|
|
|
def test_urlsafe(self): |
74
|
|
|
a = 'hello\x96' |
75
|
|
|
u = urlsafe_base64_encode(a) |
76
|
|
|
assert b(a) == urlsafe_base64_decode(u) |
77
|
|
|
|
78
|
|
|
|
79
|
|
|
class AuthTestCase(unittest.TestCase): |
80
|
|
|
|
81
|
|
|
def test_token(self): |
82
|
|
|
token = dummy_auth.token('test') |
83
|
|
|
assert token == 'abcdefghklmnopq:mSNBTR7uS2crJsyFr2Amwv1LaYg=' |
84
|
|
|
|
85
|
|
|
def test_token_with_data(self): |
86
|
|
|
token = dummy_auth.token_with_data('test') |
87
|
|
|
assert token == 'abcdefghklmnopq:-jP8eEV9v48MkYiBGs81aDxl60E=:dGVzdA==' |
88
|
|
|
|
89
|
|
|
def test_noKey(self): |
90
|
|
|
with pytest.raises(ValueError): |
91
|
|
|
Auth(None, None).token('nokey') |
92
|
|
|
with pytest.raises(ValueError): |
93
|
|
|
Auth('', '').token('nokey') |
94
|
|
|
|
95
|
|
|
def test_token_of_request(self): |
96
|
|
|
token = dummy_auth.token_of_request('http://www.qiniu.com?go=1', 'test', '') |
97
|
|
|
assert token == 'abcdefghklmnopq:cFyRVoWrE3IugPIMP5YJFTO-O-Y=' |
98
|
|
|
token = dummy_auth.token_of_request('http://www.qiniu.com?go=1', 'test', 'application/x-www-form-urlencoded') |
99
|
|
|
assert token == 'abcdefghklmnopq:svWRNcacOE-YMsc70nuIYdaa1e4=' |
100
|
|
|
|
101
|
|
|
def test_deprecatedPolicy(self): |
102
|
|
|
with pytest.raises(ValueError): |
103
|
|
|
dummy_auth.upload_token('1', None, policy={'asyncOps': 1}) |
104
|
|
|
|
105
|
|
|
def test_verify_callback(self): |
106
|
|
|
body = 'name=sunflower.jpg&hash=Fn6qeQi4VDLQ347NiRm-RlQx_4O2&location=Shanghai&price=1500.00&uid=123' |
107
|
|
|
url = 'test.qiniu.com/callback' |
108
|
|
|
ok = dummy_auth.verify_callback('QBox abcdefghklmnopq:ZWyeM5ljWMRFwuPTPOwQ4RwSto4=', url, body) |
109
|
|
|
assert ok |
110
|
|
|
|
111
|
|
|
|
112
|
|
|
class BucketTestCase(unittest.TestCase): |
113
|
|
|
q = Auth(access_key, secret_key) |
114
|
|
|
bucket = BucketManager(q) |
115
|
|
|
|
116
|
|
|
def test_list(self): |
117
|
|
|
ret, eof, info = self.bucket.list(bucket_name, limit=4) |
118
|
|
|
print(info) |
119
|
|
|
assert eof is False |
120
|
|
|
assert len(ret.get('items')) == 4 |
121
|
|
|
ret, eof, info = self.bucket.list(bucket_name, limit=100) |
122
|
|
|
print(info) |
123
|
|
|
assert eof is True |
124
|
|
|
|
125
|
|
|
def test_buckets(self): |
126
|
|
|
ret, info = self.bucket.buckets() |
127
|
|
|
print(info) |
128
|
|
|
assert bucket_name in ret |
129
|
|
|
|
130
|
|
|
def test_prefetch(self): |
131
|
|
|
ret, info = self.bucket.prefetch(bucket_name, 'python-sdk.html') |
132
|
|
|
print(info) |
133
|
|
|
assert ret['key'] == 'python-sdk.html' |
134
|
|
|
|
135
|
|
|
def test_fetch(self): |
136
|
|
|
ret, info = self.bucket.fetch('http://developer.qiniu.com/docs/v6/sdk/python-sdk.html', bucket_name, 'fetch.html') |
137
|
|
|
print(info) |
138
|
|
|
assert ret['key'] == 'fetch.html' |
139
|
|
|
assert 'hash' in ret |
140
|
|
|
|
141
|
|
|
def test_fetch_without_key(self): |
142
|
|
|
ret, info = self.bucket.fetch('http://developer.qiniu.com/docs/v6/sdk/python-sdk.html', bucket_name) |
143
|
|
|
print(info) |
144
|
|
|
assert ret['key'] == ret['hash'] |
145
|
|
|
assert 'hash' in ret |
146
|
|
|
|
147
|
|
|
def test_stat(self): |
148
|
|
|
ret, info = self.bucket.stat(bucket_name, 'python-sdk.html') |
149
|
|
|
print(info) |
150
|
|
|
assert 'hash' in ret |
151
|
|
|
|
152
|
|
|
def test_delete(self): |
153
|
|
|
ret, info = self.bucket.delete(bucket_name, 'del') |
154
|
|
|
print(info) |
155
|
|
|
assert ret is None |
156
|
|
|
assert info.status_code == 612 |
157
|
|
|
|
158
|
|
|
def test_rename(self): |
159
|
|
|
key = 'renameto'+rand_string(8) |
160
|
|
|
self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key) |
161
|
|
|
key2 = key + 'move' |
162
|
|
|
ret, info = self.bucket.rename(bucket_name, key, key2) |
163
|
|
|
print(info) |
164
|
|
|
assert ret == {} |
165
|
|
|
ret, info = self.bucket.delete(bucket_name, key2) |
166
|
|
|
print(info) |
167
|
|
|
assert ret == {} |
168
|
|
|
|
169
|
|
|
def test_copy(self): |
170
|
|
|
key = 'copyto'+rand_string(8) |
171
|
|
|
ret, info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key) |
172
|
|
|
print(info) |
173
|
|
|
assert ret == {} |
174
|
|
|
ret, info = self.bucket.delete(bucket_name, key) |
175
|
|
|
print(info) |
176
|
|
|
assert ret == {} |
177
|
|
|
|
178
|
|
|
def test_change_mime(self): |
179
|
|
|
ret, info = self.bucket.change_mime(bucket_name, 'python-sdk.html', 'text/html') |
180
|
|
|
print(info) |
181
|
|
|
assert ret == {} |
182
|
|
|
|
183
|
|
|
def test_copy(self): |
184
|
|
|
key = 'copyto'+rand_string(8) |
185
|
|
|
ret, info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key) |
186
|
|
|
print(info) |
187
|
|
|
assert ret == {} |
188
|
|
|
ret, info = self.bucket.delete(bucket_name, key) |
189
|
|
|
print(info) |
190
|
|
|
assert ret == {} |
191
|
|
|
|
192
|
|
|
def test_copy_force(self): |
193
|
|
|
ret, info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, 'copyfrom', force='true') |
194
|
|
|
print(info) |
195
|
|
|
assert info.status_code == 200 |
196
|
|
|
|
197
|
|
|
def test_change_mime(self): |
198
|
|
|
ret, info = self.bucket.change_mime(bucket_name, 'python-sdk.html', 'text/html') |
199
|
|
|
print(info) |
200
|
|
|
assert ret == {} |
201
|
|
|
|
202
|
|
|
def test_batch_copy(self): |
203
|
|
|
key = 'copyto'+rand_string(8) |
204
|
|
|
ops = build_batch_copy(bucket_name, {'copyfrom': key}, bucket_name) |
205
|
|
|
ret, info = self.bucket.batch(ops) |
206
|
|
|
print(info) |
207
|
|
|
assert ret[0]['code'] == 200 |
208
|
|
|
ops = build_batch_delete(bucket_name, [key]) |
209
|
|
|
ret, info = self.bucket.batch(ops) |
210
|
|
|
print(info) |
211
|
|
|
assert ret[0]['code'] == 200 |
212
|
|
|
|
213
|
|
|
def test_batch_copy_force(self): |
214
|
|
|
ops = build_batch_copy(bucket_name, {'copyfrom': 'copyfrom'}, bucket_name, force='true') |
215
|
|
|
ret, info = self.bucket.batch(ops) |
216
|
|
|
print(info) |
217
|
|
|
assert ret[0]['code'] == 200 |
218
|
|
|
|
219
|
|
View Code Duplication |
def test_batch_move(self): |
|
|
|
|
220
|
|
|
key = 'moveto'+rand_string(8) |
221
|
|
|
self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key) |
222
|
|
|
key2 = key + 'move' |
223
|
|
|
ops = build_batch_move(bucket_name, {key: key2}, bucket_name) |
224
|
|
|
ret, info = self.bucket.batch(ops) |
225
|
|
|
print(info) |
226
|
|
|
assert ret[0]['code'] == 200 |
227
|
|
|
ret, info = self.bucket.delete(bucket_name, key2) |
228
|
|
|
print(info) |
229
|
|
|
assert ret == {} |
230
|
|
|
|
231
|
|
View Code Duplication |
def test_batch_move_force(self): |
|
|
|
|
232
|
|
|
ret,info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, 'copyfrom', force='true') |
233
|
|
|
print(info) |
234
|
|
|
assert info.status_code == 200 |
235
|
|
|
ops = build_batch_move(bucket_name, {'copyfrom':'copyfrom'}, bucket_name,force='true') |
236
|
|
|
ret, info = self.bucket.batch(ops) |
237
|
|
|
print(info) |
238
|
|
|
assert ret[0]['code'] == 200 |
239
|
|
|
|
240
|
|
View Code Duplication |
def test_batch_rename(self): |
|
|
|
|
241
|
|
|
key = 'rename'+rand_string(8) |
242
|
|
|
self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key) |
243
|
|
|
key2 = key + 'rename' |
244
|
|
|
ops = build_batch_move(bucket_name, {key: key2}, bucket_name) |
245
|
|
|
ret, info = self.bucket.batch(ops) |
246
|
|
|
print(info) |
247
|
|
|
assert ret[0]['code'] == 200 |
248
|
|
|
ret, info = self.bucket.delete(bucket_name, key2) |
249
|
|
|
print(info) |
250
|
|
|
assert ret == {} |
251
|
|
|
|
252
|
|
View Code Duplication |
def test_batch_rename_force(self): |
|
|
|
|
253
|
|
|
ret,info = self.bucket.rename(bucket_name, 'copyfrom', 'copyfrom', force='true') |
254
|
|
|
print(info) |
255
|
|
|
assert info.status_code == 200 |
256
|
|
|
ops = build_batch_rename(bucket_name, {'copyfrom':'copyfrom'}, force='true') |
257
|
|
|
ret, info = self.bucket.batch(ops) |
258
|
|
|
print(info) |
259
|
|
|
assert ret[0]['code'] == 200 |
260
|
|
|
|
261
|
|
|
def test_batch_stat(self): |
262
|
|
|
ops = build_batch_stat(bucket_name, ['python-sdk.html']) |
263
|
|
|
ret, info = self.bucket.batch(ops) |
264
|
|
|
print(info) |
265
|
|
|
assert ret[0]['code'] == 200 |
266
|
|
|
|
267
|
|
|
def test_delete_after_days(self): |
268
|
|
|
days = '5' |
269
|
|
|
ret, info = self.bucket.delete_after_days(bucket_name,'invaild.html', days) |
270
|
|
|
assert info.status_code == 612 |
271
|
|
|
key = 'copyto'+rand_string(8) |
272
|
|
|
ret, info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key) |
273
|
|
|
ret, info = self.bucket.delete_after_days(bucket_name, key, days) |
274
|
|
|
assert info.status_code == 200 |
275
|
|
|
|
276
|
|
|
|
277
|
|
|
class UploaderTestCase(unittest.TestCase): |
278
|
|
|
|
279
|
|
|
mime_type = "text/plain" |
280
|
|
|
params = {'x:a': 'a'} |
281
|
|
|
q = Auth(access_key, secret_key) |
282
|
|
|
|
283
|
|
|
def test_put(self): |
284
|
|
|
key = 'a\\b\\c"hello' |
285
|
|
|
data = 'hello bubby!' |
286
|
|
|
token = self.q.upload_token(bucket_name) |
287
|
|
|
ret, info = put_data(token, key, data) |
288
|
|
|
print(info) |
289
|
|
|
assert ret['key'] == key |
290
|
|
|
|
291
|
|
|
def test_put_crc(self): |
292
|
|
|
key = '' |
293
|
|
|
data = 'hello bubby!' |
294
|
|
|
token = self.q.upload_token(bucket_name, key) |
295
|
|
|
ret, info = put_data(token, key, data, check_crc=True) |
296
|
|
|
print(info) |
297
|
|
|
assert ret['key'] == key |
298
|
|
|
|
299
|
|
|
def test_putfile(self): |
300
|
|
|
localfile = __file__ |
301
|
|
|
key = 'test_file' |
302
|
|
|
|
303
|
|
|
token = self.q.upload_token(bucket_name, key) |
304
|
|
|
ret, info = put_file(token, key, localfile, mime_type=self.mime_type, check_crc=True) |
305
|
|
|
print(info) |
306
|
|
|
assert ret['key'] == key |
307
|
|
|
assert ret['hash'] == etag(localfile) |
308
|
|
|
|
309
|
|
|
def test_putInvalidCrc(self): |
310
|
|
|
key = 'test_invalid' |
311
|
|
|
data = 'hello bubby!' |
312
|
|
|
crc32 = 'wrong crc32' |
313
|
|
|
token = self.q.upload_token(bucket_name) |
314
|
|
|
ret, info = _form_put(token, key, data, None, None, crc=crc32) |
315
|
|
|
print(info) |
316
|
|
|
assert ret is None |
317
|
|
|
assert info.status_code == 400 |
318
|
|
|
|
319
|
|
|
def test_putWithoutKey(self): |
320
|
|
|
key = None |
321
|
|
|
data = 'hello bubby!' |
322
|
|
|
token = self.q.upload_token(bucket_name) |
323
|
|
|
ret, info = put_data(token, key, data) |
324
|
|
|
print(info) |
325
|
|
|
assert ret['hash'] == ret['key'] |
326
|
|
|
|
327
|
|
|
data = 'hello bubby!' |
328
|
|
|
token = self.q.upload_token(bucket_name, 'nokey2') |
329
|
|
|
ret, info = put_data(token, None, data) |
330
|
|
|
print(info) |
331
|
|
|
assert ret is None |
332
|
|
|
assert info.status_code == 403 # key not match |
333
|
|
|
|
334
|
|
|
def test_withoutRead_withoutSeek_retry(self): |
335
|
|
|
key = 'retry' |
336
|
|
|
data = 'hello retry!' |
337
|
|
|
set_default(default_zone=Zone('http://a', 'http://upload.qiniu.com')) |
338
|
|
|
token = self.q.upload_token(bucket_name) |
339
|
|
|
ret, info = put_data(token, key, data) |
340
|
|
|
print(info) |
341
|
|
|
assert ret['key'] == key |
342
|
|
|
assert ret['hash'] == 'FlYu0iBR1WpvYi4whKXiBuQpyLLk' |
343
|
|
|
|
344
|
|
|
def test_hasRead_hasSeek_retry(self): |
345
|
|
|
key = 'withReadAndSeek_retry' |
346
|
|
|
data = StringIO('hello retry again!') |
347
|
|
|
set_default(default_zone=Zone('http://a', 'http://upload.qiniu.com')) |
348
|
|
|
token = self.q.upload_token(bucket_name) |
349
|
|
|
ret, info = put_data(token, key, data) |
350
|
|
|
print(info) |
351
|
|
|
assert ret['key'] == key |
352
|
|
|
assert ret['hash'] == 'FuEbdt6JP2BqwQJi7PezYhmuVYOo' |
353
|
|
|
|
354
|
|
|
def test_hasRead_withoutSeek_retry(self): |
355
|
|
|
key = 'withReadAndWithoutSeek_retry' |
356
|
|
|
data = ReadWithoutSeek('I only have read attribute!') |
357
|
|
|
set_default(default_zone=Zone('http://a', 'http://upload.qiniu.com')) |
358
|
|
|
token = self.q.upload_token(bucket_name) |
359
|
|
|
ret, info = put_data(token, key, data) |
360
|
|
|
print(info) |
361
|
|
|
assert ret is None |
362
|
|
|
|
363
|
|
|
def test_hasRead_WithoutSeek_retry2(self): |
364
|
|
|
key = 'withReadAndWithoutSeek_retry2' |
365
|
|
|
data = urlopen("http://www.qiniu.com") |
366
|
|
|
set_default(default_zone=Zone('http://a', 'http://upload.qiniu.com')) |
367
|
|
|
token = self.q.upload_token(bucket_name, key) |
368
|
|
|
ret, info = put_data(token, key, data) |
369
|
|
|
print(info) |
370
|
|
|
assert ret is not None |
371
|
|
|
|
372
|
|
|
def test_putData_without_fname(self): |
373
|
|
|
if is_travis(): |
374
|
|
|
return |
375
|
|
|
localfile = create_temp_file(30 * 1024 * 1024) |
376
|
|
|
key = 'test_putData_without_fname' |
377
|
|
|
with open(localfile, 'rb') as input_stream: |
378
|
|
|
token = self.q.upload_token(bucket_name) |
379
|
|
|
ret, info = put_data(token, key, input_stream) |
380
|
|
|
print(info) |
381
|
|
|
assert ret is not None |
382
|
|
|
|
383
|
|
|
def test_putData_without_fname1(self): |
384
|
|
|
if is_travis(): |
385
|
|
|
return |
386
|
|
|
localfile = create_temp_file(30 * 1024 * 1024) |
387
|
|
|
key = 'test_putData_without_fname1' |
388
|
|
|
with open(localfile, 'rb') as input_stream: |
389
|
|
|
token = self.q.upload_token(bucket_name) |
390
|
|
|
ret, info = put_data(token, key, input_stream, self.params, self.mime_type, False, None, "") |
391
|
|
|
print(info) |
392
|
|
|
assert ret is not None |
393
|
|
|
|
394
|
|
|
def test_putData_without_fname2(self): |
395
|
|
|
if is_travis(): |
396
|
|
|
return |
397
|
|
|
localfile = create_temp_file(30 * 1024 * 1024) |
398
|
|
|
key = 'test_putData_without_fname2' |
399
|
|
|
with open(localfile, 'rb') as input_stream: |
400
|
|
|
token = self.q.upload_token(bucket_name) |
401
|
|
|
ret, info = put_data(token, key, input_stream, self.params, self.mime_type, False, None, " ") |
402
|
|
|
print(info) |
403
|
|
|
assert ret is not None |
404
|
|
|
|
405
|
|
|
|
406
|
|
|
class ResumableUploaderTestCase(unittest.TestCase): |
407
|
|
|
|
408
|
|
|
mime_type = "text/plain" |
409
|
|
|
params = {'x:a': 'a'} |
410
|
|
|
q = Auth(access_key, secret_key) |
411
|
|
|
|
412
|
|
|
def test_put_stream(self): |
413
|
|
|
localfile = __file__ |
414
|
|
|
key = 'test_file_r' |
415
|
|
|
size = os.stat(localfile).st_size |
416
|
|
|
with open(localfile, 'rb') as input_stream: |
417
|
|
|
token = self.q.upload_token(bucket_name, key) |
418
|
|
|
ret, info = put_stream(token, key, input_stream, os.path.basename(__file__), size, self.params, self.mime_type) |
419
|
|
|
print(info) |
420
|
|
|
assert ret['key'] == key |
421
|
|
|
|
422
|
|
|
def test_big_file(self): |
423
|
|
|
key = 'big' |
424
|
|
|
token = self.q.upload_token(bucket_name, key) |
425
|
|
|
localfile = create_temp_file(4 * 1024 * 1024 + 1) |
426
|
|
|
progress_handler = lambda progress, total: progress |
427
|
|
|
qiniu.set_default(default_zone=Zone('http://a', 'http://upload.qiniu.com')) |
428
|
|
|
ret, info = put_file(token, key, localfile, self.params, self.mime_type, progress_handler=progress_handler) |
429
|
|
|
print(info) |
430
|
|
|
assert ret['key'] == key |
431
|
|
|
remove_temp_file(localfile) |
432
|
|
|
|
433
|
|
|
def test_retry(self): |
434
|
|
|
localfile = __file__ |
435
|
|
|
key = 'test_file_r_retry' |
436
|
|
|
qiniu.set_default(default_zone=Zone('http://a', 'http://upload.qiniu.com')) |
437
|
|
|
token = self.q.upload_token(bucket_name, key) |
438
|
|
|
ret, info = put_file(token, key, localfile, self.params, self.mime_type) |
439
|
|
|
print(info) |
440
|
|
|
assert ret['key'] == key |
441
|
|
|
assert ret['hash'] == etag(localfile) |
442
|
|
|
|
443
|
|
|
|
444
|
|
|
class DownloadTestCase(unittest.TestCase): |
445
|
|
|
|
446
|
|
|
q = Auth(access_key, secret_key) |
447
|
|
|
|
448
|
|
|
def test_private_url(self): |
449
|
|
|
private_bucket = 'private-res' |
450
|
|
|
private_key = 'gogopher.jpg' |
451
|
|
|
base_url = 'http://%s/%s' % (private_bucket+'.qiniudn.com', private_key) |
452
|
|
|
private_url = self.q.private_download_url(base_url, expires=3600) |
453
|
|
|
print(private_url) |
454
|
|
|
r = requests.get(private_url) |
455
|
|
|
assert r.status_code == 200 |
456
|
|
|
|
457
|
|
|
|
458
|
|
|
class MediaTestCase(unittest.TestCase): |
459
|
|
|
def test_pfop(self): |
460
|
|
|
q = Auth(access_key, secret_key) |
461
|
|
|
pfop = PersistentFop(q, 'testres', 'sdktest') |
462
|
|
|
op = op_save('avthumb/m3u8/segtime/10/vcodec/libx264/s/320x240', 'pythonsdk', 'pfoptest') |
463
|
|
|
ops = [] |
464
|
|
|
ops.append(op) |
465
|
|
|
ret, info = pfop.execute('sintel_trailer.mp4', ops, 1) |
466
|
|
|
print(info) |
467
|
|
|
assert ret['persistentId'] is not None |
468
|
|
|
|
469
|
|
|
|
470
|
|
|
class EtagTestCase(unittest.TestCase): |
471
|
|
|
def test_zero_size(self): |
472
|
|
|
open("x", 'a').close() |
473
|
|
|
hash = etag("x") |
474
|
|
|
assert hash == 'Fto5o-5ea0sNMlW_75VgGJCv2AcJ' |
475
|
|
|
remove_temp_file("x") |
476
|
|
|
def test_small_size(self): |
477
|
|
|
localfile = create_temp_file(1024 * 1024) |
478
|
|
|
hash = etag(localfile) |
479
|
|
|
assert hash == 'FnlAdmDasGTQOIgrU1QIZaGDv_1D' |
480
|
|
|
remove_temp_file(localfile) |
481
|
|
|
def test_large_size(self): |
482
|
|
|
localfile = create_temp_file(4 * 1024 * 1024 + 1) |
483
|
|
|
hash = etag(localfile) |
484
|
|
|
assert hash == 'ljF323utglY3GI6AvLgawSJ4_dgk' |
485
|
|
|
remove_temp_file(localfile) |
486
|
|
|
|
487
|
|
|
|
488
|
|
|
class ReadWithoutSeek(object): |
489
|
|
|
def __init__(self, str): |
490
|
|
|
self.str = str |
491
|
|
|
pass |
492
|
|
|
|
493
|
|
|
def read(self): |
494
|
|
|
print(self.str) |
495
|
|
|
|
496
|
|
|
if __name__ == '__main__': |
497
|
|
|
unittest.main() |
498
|
|
|
|