Code

< 40 %
40-60 %
> 60 %
1
# -*- coding: utf-8 -*-
2
# flake8: noqa
3 1
import os
4 1
import string
5 1
import random
6 1
import tempfile
7 1
import functools
8
9 1
import requests
10
11 1
import unittest
12 1
import pytest
13 1
from freezegun import freeze_time
14
15 1
from qiniu import Auth, set_default, etag, PersistentFop, build_op, op_save, Zone, QiniuMacAuth
16 1
from qiniu import BucketManager, build_batch_copy, build_batch_rename, build_batch_move, build_batch_stat, \
17
    build_batch_delete, DomainManager
18 1
from qiniu import urlsafe_base64_encode, urlsafe_base64_decode, canonical_mime_header_key, entry, decode_entry
19
20 1
from qiniu.compat import is_py2, is_py3, b, json
21
22 1
import qiniu.config
23
24
25 1
if is_py2:
26 1
    import sys
27 1
    import StringIO
28 1
    import urllib
29 1
    from imp import reload
30
31 1
    reload(sys)
32 1
    sys.setdefaultencoding('utf-8')
33 1
    StringIO = StringIO.StringIO
34 1
    urlopen = urllib.urlopen
35
elif is_py3:
36
    import io
37
    import urllib
38
39
    StringIO = io.StringIO
40
    urlopen = urllib.request.urlopen
41
42 1
access_key = os.getenv('QINIU_ACCESS_KEY')
43 1
secret_key = os.getenv('QINIU_SECRET_KEY')
44 1
bucket_name = os.getenv('QINIU_TEST_BUCKET')
45 1
hostscache_dir = None
46
47
48 1
def rand_string(length):
49 1
    lib = string.ascii_uppercase
50 1
    return ''.join([random.choice(lib) for i in range(0, length)])
51
52
53 1
def create_temp_file(size):
54 1
    t = tempfile.mktemp()
55 1
    f = open(t, 'wb')
56 1
    f.seek(size - 1)
57 1
    f.write(b('0'))
58 1
    f.close()
59 1
    return t
60
61
62 1
def remove_temp_file(file):
63 1
    try:
64 1
        os.remove(file)
65
    except OSError:
66
        pass
67
68
69 1
class UtilsTest(unittest.TestCase):
70 1
    def test_urlsafe(self):
71 1
        a = 'hello\x96'
72 1
        u = urlsafe_base64_encode(a)
73 1
        assert b(a) == urlsafe_base64_decode(u)
74
75 1
    def test_canonical_mime_header_key(self):
76 1
        field_names = [
77
            ":status",
78
            ":x-test-1",
79
            ":x-Test-2",
80
            "content-type",
81
            "CONTENT-LENGTH",
82
            "oRiGin",
83
            "ReFer",
84
            "Last-Modified",
85
            "acCePt-ChArsEt",
86
            "x-test-3",
87
            "cache-control",
88
        ]
89 1
        expect_canonical_field_names = [
90
            ":status",
91
            ":x-test-1",
92
            ":x-Test-2",
93
            "Content-Type",
94
            "Content-Length",
95
            "Origin",
96
            "Refer",
97
            "Last-Modified",
98
            "Accept-Charset",
99
            "X-Test-3",
100
            "Cache-Control",
101
        ]
102 1
        assert len(field_names) == len(expect_canonical_field_names)
103 1
        for i in range(len(field_names)):
104 1
            assert canonical_mime_header_key(field_names[i]) == expect_canonical_field_names[i]
105
106 1
    def test_entry(self):
107 1
        case_list = [
108
            {
109
                'msg': 'normal',
110
                'bucket': 'qiniuphotos',
111
                'key': 'gogopher.jpg',
112
                'expect': 'cWluaXVwaG90b3M6Z29nb3BoZXIuanBn'
113
            },
114
            {
115
                'msg': 'key empty',
116
                'bucket': 'qiniuphotos',
117
                'key': '',
118
                'expect': 'cWluaXVwaG90b3M6'
119
            },
120
            {
121
                'msg': 'key undefined',
122
                'bucket': 'qiniuphotos',
123
                'key': None,
124
                'expect': 'cWluaXVwaG90b3M='
125
            },
126
            {
127
                'msg': 'key need replace plus symbol',
128
                'bucket': 'qiniuphotos',
129
                'key': '012ts>a',
130
                'expect': 'cWluaXVwaG90b3M6MDEydHM-YQ=='
131
            },
132
            {
133
                'msg': 'key need replace slash symbol',
134
                'bucket': 'qiniuphotos',
135
                'key': '012ts?a',
136
                'expect': 'cWluaXVwaG90b3M6MDEydHM_YQ=='
137
            }
138
        ]
139 1
        for c in case_list:
140 1
            assert c.get('expect') == entry(c.get('bucket'), c.get('key')), c.get('msg')
141
142 1
    def test_decode_entry(self):
143 1
        case_list = [
144
            {
145
                'msg': 'normal',
146
                'expect': {
147
                    'bucket': 'qiniuphotos',
148
                    'key': 'gogopher.jpg'
149
                },
150
                'entry': 'cWluaXVwaG90b3M6Z29nb3BoZXIuanBn'
151
            },
152
            {
153
                'msg': 'key empty',
154
                'expect': {
155
                    'bucket': 'qiniuphotos',
156
                    'key': ''
157
                },
158
                'entry': 'cWluaXVwaG90b3M6'
159
            },
160
            {
161
                'msg': 'key undefined',
162
                'expect': {
163
                    'bucket': 'qiniuphotos',
164
                    'key': None
165
                },
166
                'entry': 'cWluaXVwaG90b3M='
167
            },
168
            {
169
                'msg': 'key need replace plus symbol',
170
                'expect': {
171
                    'bucket': 'qiniuphotos',
172
                    'key': '012ts>a'
173
                },
174
                'entry': 'cWluaXVwaG90b3M6MDEydHM-YQ=='
175
            },
176
            {
177
                'msg': 'key need replace slash symbol',
178
                'expect': {
179
                    'bucket': 'qiniuphotos',
180
                    'key': '012ts?a'
181
                },
182
                'entry': 'cWluaXVwaG90b3M6MDEydHM_YQ=='
183
            }
184
        ]
185 1
        for c in case_list:
186 1
            bucket, key = decode_entry(c.get('entry'))
187 1
            assert bucket == c.get('expect').get('bucket'), c.get('msg')
188 1
            assert key == c.get('expect').get('key'), c.get('msg')
189
190
191 1
class BucketTestCase(unittest.TestCase):
192 1
    q = Auth(access_key, secret_key)
193 1
    bucket = BucketManager(q)
194
195 1
    def test_list(self):
196 1
        ret, eof, info = self.bucket.list(bucket_name, limit=4)
197 1
        assert eof is False
198 1
        assert len(ret.get('items')) == 4
199 1
        ret, eof, info = self.bucket.list(bucket_name, limit=1000)
200 1
        assert info.status_code == 200, info
201
202 1
    def test_buckets(self):
203 1
        ret, info = self.bucket.buckets()
204 1
        print(info)
205 1
        assert bucket_name in ret
206
207 1
    def test_prefetch(self):
208 1
        ret, info = self.bucket.prefetch(bucket_name, 'python-sdk.html', hostscache_dir=hostscache_dir)
209 1
        print(info)
210 1
        assert ret['key'] == 'python-sdk.html'
211
212 1
    def test_fetch(self):
213 1
        ret, info = self.bucket.fetch('https://developer.qiniu.com/kodo/sdk/python', bucket_name,
214
                                      'fetch.html', hostscache_dir=hostscache_dir)
215 1
        print(info)
216 1
        assert ret['key'] == 'fetch.html'
217 1
        assert 'hash' in ret
218
219 1
    def test_fetch_without_key(self):
220 1
        ret, info = self.bucket.fetch('https://developer.qiniu.com/kodo/sdk/python', bucket_name,
221
                                      hostscache_dir=hostscache_dir)
222 1
        print(info)
223 1
        assert ret['key'] == ret['hash']
224 1
        assert 'hash' in ret
225
226 1
    def test_stat(self):
227 1
        ret, info = self.bucket.stat(bucket_name, 'python-sdk.html')
228 1
        print(info)
229 1
        assert 'hash' in ret
230
231 1
    def test_delete(self):
232 1
        ret, info = self.bucket.delete(bucket_name, 'del')
233 1
        print(info)
234 1
        assert ret is None
235 1
        assert info.status_code == 612
236
237 1
    def test_rename(self):
238 1
        key = 'renameto' + rand_string(8)
239 1
        self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key)
240 1
        key2 = key + 'move'
241 1
        ret, info = self.bucket.rename(bucket_name, key, key2)
242 1
        print(info)
243 1
        assert ret == {}
244 1
        ret, info = self.bucket.delete(bucket_name, key2)
245 1
        print(info)
246 1
        assert ret == {}
247
248 1
    def test_copy(self):
249 1
        key = 'copyto' + rand_string(8)
250 1
        ret, info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key)
251 1
        print(info)
252 1
        assert ret == {}
253 1
        ret, info = self.bucket.delete(bucket_name, key)
254 1
        print(info)
255 1
        assert ret == {}
256
257 1
    def test_change_mime(self):
258 1
        ret, info = self.bucket.change_mime(bucket_name, 'python-sdk.html', 'text/html')
259 1
        print(info)
260 1
        assert ret == {}
261
262 1
    def test_change_type(self):
263 1
        target_key = 'copyto' + rand_string(8)
264 1
        self.bucket.copy(bucket_name, 'copyfrom', bucket_name, target_key)
265 1
        ret, info = self.bucket.change_type(bucket_name, target_key, 1)
266 1
        print(info)
267 1
        assert ret == {}
268 1
        ret, info = self.bucket.stat(bucket_name, target_key)
269 1
        print(info)
270 1
        assert 'type' in ret
271 1
        self.bucket.delete(bucket_name, target_key)
272
273 1
    def test_copy_force(self):
274 1
        ret, info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, 'copyfrom', force='true')
275 1
        print(info)
276 1
        assert info.status_code == 200
277
278 1
    def test_batch_copy(self):
279 1
        key = 'copyto' + rand_string(8)
280 1
        ops = build_batch_copy(bucket_name, {'copyfrom': key}, bucket_name)
281 1
        ret, info = self.bucket.batch(ops)
282 1
        print(info)
283 1
        assert ret[0]['code'] == 200
284 1
        ops = build_batch_delete(bucket_name, [key])
285 1
        ret, info = self.bucket.batch(ops)
286 1
        print(info)
287 1
        assert ret[0]['code'] == 200
288
289 1
    def test_batch_copy_force(self):
290 1
        ops = build_batch_copy(bucket_name, {'copyfrom': 'copyfrom'}, bucket_name, force='true')
291 1
        ret, info = self.bucket.batch(ops)
292 1
        print(info)
293 1
        assert ret[0]['code'] == 200
294
295 1
    def test_batch_move(self):
296 1
        key = 'moveto' + rand_string(8)
297 1
        self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key)
298 1
        key2 = key + 'move'
299 1
        ops = build_batch_move(bucket_name, {key: key2}, bucket_name)
300 1
        ret, info = self.bucket.batch(ops)
301 1
        print(info)
302 1
        assert ret[0]['code'] == 200
303 1
        ret, info = self.bucket.delete(bucket_name, key2)
304 1
        print(info)
305 1
        assert ret == {}
306
307 1
    def test_batch_move_force(self):
308 1
        ret, info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, 'copyfrom', force='true')
309 1
        print(info)
310 1
        assert info.status_code == 200
311 1
        ops = build_batch_move(bucket_name, {'copyfrom': 'copyfrom'}, bucket_name, force='true')
312 1
        ret, info = self.bucket.batch(ops)
313 1
        print(info)
314 1
        assert ret[0]['code'] == 200
315
316 1
    def test_batch_rename(self):
317 1
        key = 'rename' + rand_string(8)
318 1
        self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key)
319 1
        key2 = key + 'rename'
320 1
        ops = build_batch_move(bucket_name, {key: key2}, bucket_name)
321 1
        ret, info = self.bucket.batch(ops)
322 1
        print(info)
323 1
        assert ret[0]['code'] == 200
324 1
        ret, info = self.bucket.delete(bucket_name, key2)
325 1
        print(info)
326 1
        assert ret == {}
327
328 1
    def test_batch_rename_force(self):
329 1
        ret, info = self.bucket.rename(bucket_name, 'copyfrom', 'copyfrom', force='true')
330 1
        print(info)
331 1
        assert info.status_code == 200
332 1
        ops = build_batch_rename(bucket_name, {'copyfrom': 'copyfrom'}, force='true')
333 1
        ret, info = self.bucket.batch(ops)
334 1
        print(info)
335 1
        assert ret[0]['code'] == 200
336
337 1
    def test_batch_stat(self):
338 1
        ops = build_batch_stat(bucket_name, ['python-sdk.html'])
339 1
        ret, info = self.bucket.batch(ops)
340 1
        print(info)
341 1
        assert ret[0]['code'] == 200
342
343 1
    def test_delete_after_days(self):
344 1
        days = '5'
345 1
        ret, info = self.bucket.delete_after_days(bucket_name, 'invaild.html', days)
346 1
        assert info.status_code == 612
347 1
        key = 'copyto' + rand_string(8)
348 1
        ret, info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key)
349 1
        ret, info = self.bucket.delete_after_days(bucket_name, key, days)
350 1
        assert info.status_code == 200
351
352 1
    def test_set_object_lifecycle(self):
353 1
        key = 'test_set_object_lifecycle' + rand_string(8)
354 1
        ret, info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key)
355 1
        assert info.status_code == 200
356 1
        ret, info = self.bucket.set_object_lifecycle(
357
            bucket=bucket_name,
358
            key=key,
359
            to_line_after_days=10,
360
            to_archive_ir_after_days=15,
361
            to_archive_after_days=20,
362
            to_deep_archive_after_days=30,
363
            delete_after_days=40
364
        )
365 1
        assert info.status_code == 200
366
367 1
    def test_set_object_lifecycle_with_cond(self):
368 1
        key = 'test_set_object_lifecycle_cond' + rand_string(8)
369 1
        ret, info = self.bucket.copy(bucket_name, 'copyfrom', bucket_name, key)
370 1
        assert info.status_code == 200
371 1
        ret, info = self.bucket.stat(bucket_name, key)
372 1
        assert info.status_code == 200
373 1
        key_hash = ret['hash']
374 1
        ret, info = self.bucket.set_object_lifecycle(
375
            bucket=bucket_name,
376
            key=key,
377
            to_line_after_days=10,
378
            to_archive_ir_after_days=15,
379
            to_archive_after_days=20,
380
            to_deep_archive_after_days=30,
381
            delete_after_days=40,
382
            cond={
383
                'hash': key_hash
384
            }
385
        )
386 1
        assert info.status_code == 200
387
388 1
    def test_list_domains(self):
389 1
        ret, info = self.bucket.list_domains(bucket_name)
390 1
        print(info)
391 1
        assert info.status_code == 200
392 1
        assert isinstance(ret, list)
393
394 1
    @freeze_time("1970-01-01")
395
    def test_invalid_x_qiniu_date(self):
396 1
        ret, info = self.bucket.stat(bucket_name, 'python-sdk.html')
397 1
        assert ret is None
398 1
        assert info.status_code == 403
399
400 1
    @freeze_time("1970-01-01")
401
    def test_invalid_x_qiniu_date_with_disable_date_sign(self):
402 1
        q = Auth(access_key, secret_key, disable_qiniu_timestamp_signature=True)
403 1
        bucket = BucketManager(q)
404 1
        ret, info = bucket.stat(bucket_name, 'python-sdk.html')
405 1
        assert 'hash' in ret
406
407 1
    @freeze_time("1970-01-01")
408
    def test_invalid_x_qiniu_date_env(self):
409 1
        os.environ['DISABLE_QINIU_TIMESTAMP_SIGNATURE'] = 'True'
410 1
        ret, info = self.bucket.stat(bucket_name, 'python-sdk.html')
411 1
        os.unsetenv('DISABLE_QINIU_TIMESTAMP_SIGNATURE')
412 1
        assert 'hash' in ret
413
414 1
    @freeze_time("1970-01-01")
415
    def test_invalid_x_qiniu_date_env_be_ignored(self):
416 1
        os.environ['DISABLE_QINIU_TIMESTAMP_SIGNATURE'] = 'True'
417 1
        q = Auth(access_key, secret_key, disable_qiniu_timestamp_signature=False)
418 1
        bucket = BucketManager(q)
419 1
        ret, info = bucket.stat(bucket_name, 'python-sdk.html')
420 1
        os.unsetenv('DISABLE_QINIU_TIMESTAMP_SIGNATURE')
421 1
        assert ret is None
422 1
        assert info.status_code == 403
423
424 1
class DownloadTestCase(unittest.TestCase):
425 1
    q = Auth(access_key, secret_key)
426
427 1
    def test_private_url(self):
428 1
        private_bucket_domain = 'private-sdk.peterpy.cn'
429 1
        private_key = 'gogopher.jpg'
430 1
        base_url = 'http://%s/%s' % (private_bucket_domain, private_key)
431 1
        private_url = self.q.private_download_url(base_url, expires=3600)
432 1
        print(private_url)
433 1
        r = requests.get(private_url)
434 1
        assert r.status_code == 200
435
436
437 1
class EtagTestCase(unittest.TestCase):
438 1
    def test_zero_size(self):
439 1
        open("x", 'a').close()
440 1
        hash = etag("x")
441 1
        assert hash == 'Fto5o-5ea0sNMlW_75VgGJCv2AcJ'
442 1
        remove_temp_file("x")
443
444 1
    def test_small_size(self):
445 1
        localfile = create_temp_file(1024 * 1024)
446 1
        hash = etag(localfile)
447 1
        assert hash == 'FnlAdmDasGTQOIgrU1QIZaGDv_1D'
448 1
        remove_temp_file(localfile)
449
450 1
    def test_large_size(self):
451 1
        localfile = create_temp_file(4 * 1024 * 1024 + 1)
452 1
        hash = etag(localfile)
453 1
        assert hash == 'ljF323utglY3GI6AvLgawSJ4_dgk'
454 1
        remove_temp_file(localfile)
455
456
457 1
class CdnTestCase(unittest.TestCase):
458 1
    q = Auth(access_key, secret_key)
459 1
    domain_manager = DomainManager(q)
460
461 1
    def test_get_domain(self):
462 1
        ret, info = self.domain_manager.get_domain('pythonsdk.qiniu.io')
463 1
        print(info)
464 1
        assert info.status_code == 200
465
466
467 1
class ReadWithoutSeek(object):
468 1
    def __init__(self, str):
469
        self.str = str
470
        pass
471
472 1
    def read(self):
473
        print(self.str)
474
475
476 1
if __name__ == '__main__':
477
    unittest.main()
478