Completed
Push — master ( b77c8b...5ec1d2 )
by
unknown
16s queued 13s
created

src/Qiniu/Storage/ResumeUploader.php (11 issues)

1
<?php
2
3
namespace Qiniu\Storage;
4
5
use Qiniu\Config;
6
use Qiniu\Http\Client;
7
use Qiniu\Http\Error;
8
use Qiniu\Enum\SplitUploadVersion;
9
use Qiniu\Http\RequestOptions;
10
11
/**
12
 * 断点续上传类, 该类主要实现了断点续上传中的分块上传,
13
 * 以及相应地创建块和创建文件过程.
14
 *
15
 * @link http://developer.qiniu.com/docs/v6/api/reference/up/mkblk.html
16
 * @link http://developer.qiniu.com/docs/v6/api/reference/up/mkfile.html
17
 */
18
final class ResumeUploader
19
{
20
    private $upToken;
21
    private $key;
22
    private $inputStream;
23
    private $size;
24
    private $params;
25
    private $mime;
26
    private $contexts;
27
    private $finishedEtags;
28
    private $host;
29
    private $bucket;
30
    private $currentUrl;
31
    private $config;
32
    private $resumeRecordFile;
33
    private $version;
34
    private $partSize;
35
    /**
36
     * @var RequestOptions
37
     */
38
    private $reqOpt;
39
40 6
    /**
41
     * 上传二进制流到七牛
42
     *
43
     * @param string $upToken 上传凭证
44
     * @param string $key 上传文件名
45
     * @param string $inputStream 上传二进制流
46
     * @param string $size 上传流的大小
47
     * @param string $params 自定义变量
48
     * @param string $mime 上传数据的mimeType
49
     * @param Config $config
50 6
     * @param string $resumeRecordFile 断点续传的已上传的部分信息记录文件
51 6
     * @param string $version 分片上传版本 目前支持v1/v2版本 默认v1
52 6
     * @param int $partSize 分片上传v2字段 默认大小为4MB 分片大小范围为1 MB - 1 GB
53 6
     * @param RequestOptions $reqOpt 分片上传v2字段 默认大小为4MB 分片大小范围为1 MB - 1 GB
54 6
     * @link http://developer.qiniu.com/docs/v6/api/overview/up/response/vars.html#xvar
55 6
     *
56 6
     * @throws \Exception
57 6
     */
58
    public function __construct(
59 6
        $upToken,
60 6
        $key,
61
        $inputStream,
62
        $size,
63
        $params,
64 6
        $mime,
65 6
        $config,
66
        $resumeRecordFile = null,
67
        $version = 'v1',
68 6
        $partSize = config::BLOCK_SIZE,
69 6
        $reqOpt = null
70
    ) {
71
72
        $this->upToken = $upToken;
73
        $this->key = $key;
74 6
        $this->inputStream = $inputStream;
75
        $this->size = $size;
76 6
        $this->params = $params;
77 6
        $this->mime = $mime;
78 6
        $this->contexts = array();
79 6
        $this->finishedEtags = array("etags" => array(), "uploadId" => "", "expiredAt" => 0, "uploaded" => 0);
80 6
        $this->config = $config;
81
        $this->resumeRecordFile = $resumeRecordFile ? $resumeRecordFile : null;
82
        $this->partSize = $partSize ? $partSize : config::BLOCK_SIZE;
83 6
84 6
        if ($reqOpt === null) {
85 6
            $reqOpt = new RequestOptions();
86 6
        }
87 6
        $this->reqOpt = $reqOpt;
88 6
89 6
        try {
90 3
            $this->version = SplitUploadVersion::from($version ? $version : 'v1');
91 3
        } catch (\Exception $e) {
92
            throw new \Exception("only support v1/v2 now!", 0, $e);
93
        }
94
95 3
        list($accessKey, $bucket, $err) = \Qiniu\explodeUpToken($upToken);
96 3
        $this->bucket = $bucket;
97 3
        if ($err != null) {
98 6
            return array(null, $err);
99 3
        }
100 3
101 3
        list($upHost, $err) = $config->getUpHostV2($accessKey, $bucket);
102
        if ($err != null) {
103 6
            throw new \Exception($err->message(), 1);
104
        }
105
        $this->host = $upHost;
106 6
    }
107 6
108 6
    /**
109 6
     * 上传操作
110
     */
111
    public function upload($fname)
112
    {
113
        $uploaded = 0;
114
        if ($this->version == SplitUploadVersion::V2) {
115 6
            $partNumber = 1;
116
            $encodedObjectName = $this->key ? \Qiniu\base64_urlSafeEncode($this->key) : '~';
117 6
        };
118 6
        // get upload record from resumeRecordFile
119
        if ($this->resumeRecordFile != null) {
120
            $blkputRets = null;
121 6
            if (file_exists($this->resumeRecordFile)) {
122
                $stream = fopen($this->resumeRecordFile, 'r');
123 6
                if ($stream) {
124 6
                    $streamLen = filesize($this->resumeRecordFile);
125 6
                    if ($streamLen > 0) {
126 6
                        $contents = fread($stream, $streamLen);
127 6
                        fclose($stream);
128 6
                        if ($contents) {
129 6
                            $blkputRets = json_decode($contents, true);
130
                            if ($blkputRets === null) {
131
                                error_log("resumeFile contents decode error");
132
                            }
133
                        } else {
134
                            error_log("read resumeFile failed");
135 6
                        }
136
                    } else {
137
                        error_log("resumeFile is empty");
138
                    }
139
                } else {
140
                    error_log("resumeFile open failed");
141 6
                }
142
            } else {
143 6
                error_log("resumeFile not exists");
144 6
            }
145 6
146 6
            if ($blkputRets) {
0 ignored issues
show
$blkputRets is of type null, thus it always evaluated to false.
Loading history...
147
                if ($this->version == SplitUploadVersion::V1) {
148
                    if (isset($blkputRets['contexts']) && isset($blkputRets['uploaded']) &&
149 6
                        is_array($blkputRets['contexts']) && is_int($blkputRets['uploaded'])) {
150
                        $this->contexts = $blkputRets['contexts'];
151
                        $uploaded = $blkputRets['uploaded'];
152 6
                    }
153
                } elseif ($this->version == SplitUploadVersion::V2) {
154
                    if (isset($blkputRets["etags"]) && isset($blkputRets["uploadId"]) &&
155 6
                        isset($blkputRets["expiredAt"]) && $blkputRets["expiredAt"] > time()
156
                        && $blkputRets["uploaded"] > 0 && is_array($blkputRets["etags"]) &&
157 6
                        is_string($blkputRets["uploadId"]) && is_int($blkputRets["expiredAt"])) {
158 6
                        $this->finishedEtags['etags'] = $blkputRets["etags"];
159 6
                        $this->finishedEtags["uploadId"] = $blkputRets["uploadId"];
160
                        $this->finishedEtags["expiredAt"] = $blkputRets["expiredAt"];
161
                        $this->finishedEtags["uploaded"] = $blkputRets["uploaded"];
162 6
                        $uploaded = $blkputRets["uploaded"];
163
                        $partNumber = count($this->finishedEtags["etags"]) + 1;
164 6
                    } else {
165 6
                        $this->makeInitReq($encodedObjectName);
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $encodedObjectName does not seem to be defined for all execution paths leading up to this point.
Loading history...
166
                    }
167 6
                } else {
168
                    throw new \Exception("only support v1/v2 now!");
169
                }
170
            } else {
171
                if ($this->version == SplitUploadVersion::V2) {
0 ignored issues
show
The condition $this->version == Qiniu\...\SplitUploadVersion::V2 is always false.
Loading history...
172
                    $this->makeInitReq($encodedObjectName);
173
                }
174
            }
175
        } else {
176
            // init a Multipart Upload task if choose v2
177
            if ($this->version == SplitUploadVersion::V2) {
0 ignored issues
show
The condition $this->version == Qiniu\...\SplitUploadVersion::V2 is always false.
Loading history...
178
                $this->makeInitReq($encodedObjectName);
179
            }
180
        }
181
182
        while ($uploaded < $this->size) {
183
            $blockSize = $this->blockSize($uploaded);
184
            $data = fread($this->inputStream, $blockSize);
0 ignored issues
show
$this->inputStream of type string is incompatible with the type resource expected by parameter $stream of fread(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

184
            $data = fread(/** @scrutinizer ignore-type */ $this->inputStream, $blockSize);
Loading history...
185
            if ($data === false) {
186
                throw new \Exception("file read failed", 1);
187
            }
188
            if ($this->version == SplitUploadVersion::V1) {
189
                $crc = \Qiniu\crc32_data($data);
190
                $response = $this->makeBlock($data, $blockSize);
191
            } elseif ($this->version == SplitUploadVersion::V2) {
192
                $md5 = md5($data);
193
                $response = $this->uploadPart(
194
                    $data,
195
                    $partNumber,
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $partNumber does not seem to be defined for all execution paths leading up to this point.
Loading history...
196
                    $this->finishedEtags["uploadId"],
197
                    $encodedObjectName,
198
                    $md5
199
                );
200
            } else {
201
                throw new \Exception("only support v1/v2 now!");
202
            }
203
204
            $ret = null;
205
            if ($response->ok() && $response->json() != null) {
206
                $ret = $response->json();
207
            }
208
            if ($response->statusCode < 0) {
209
                list($accessKey, $bucket, $err) = \Qiniu\explodeUpToken($this->upToken);
210
                if ($err != null) {
211
                    return array(null, $err);
212
                }
213
                list($upHostBackup, $err) = $this->config->getUpBackupHostV2($accessKey, $bucket);
214
                if ($err != null) {
215
                    return array(null, $err);
216
                }
217
                $this->host = $upHostBackup;
218
            }
219
220
            if ($this->version == SplitUploadVersion::V1) {
221
                if ($response->needRetry() || !isset($ret['crc32']) || $crc != $ret['crc32']) {
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $crc does not seem to be defined for all execution paths leading up to this point.
Loading history...
222
                    $response = $this->makeBlock($data, $blockSize);
223
                    $ret = $response->json();
224
                }
225
226
                if (!$response->ok() || !isset($ret['crc32']) || $crc != $ret['crc32']) {
227
                    return array(null, new Error($this->currentUrl, $response));
228
                }
229
                array_push($this->contexts, $ret['ctx']);
230
            } elseif ($this->version == SplitUploadVersion::V2) {
231
                if ($response->needRetry() || !isset($ret['md5']) || $md5 != $ret['md5']) {
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $md5 does not seem to be defined for all execution paths leading up to this point.
Loading history...
232
                    $response = $this->uploadPart(
233
                        $data,
234
                        $partNumber,
235
                        $this->finishedEtags["uploadId"],
236
                        $encodedObjectName,
237
                        $md5
238
                    );
239
                    $ret = $response->json();
240
                }
241
242
                if (!$response->ok() || !isset($ret['md5']) || $md5 != $ret['md5']) {
243
                    return array(null, new Error($this->currentUrl, $response));
244
                }
245
                $blockStatus = array('etag' => $ret['etag'], 'partNumber' => $partNumber);
246
                array_push($this->finishedEtags['etags'], $blockStatus);
247
                $partNumber += 1;
248
            } else {
249
                throw new \Exception("only support v1/v2 now!");
250
            }
251
252
            $uploaded += $blockSize;
253
            if ($this->version == SplitUploadVersion::V2) {
254
                $this->finishedEtags['uploaded'] = $uploaded;
255
            }
256
257
            if ($this->resumeRecordFile !== null) {
258
                if ($this->version == SplitUploadVersion::V1) {
259
                    $recordData = array(
260
                        'contexts' => $this->contexts,
261
                        'uploaded' => $uploaded
262
                    );
263
                    $recordData = json_encode($recordData);
264
                } elseif ($this->version == SplitUploadVersion::V2) {
265
                    $recordData = json_encode($this->finishedEtags);
266
                } else {
267
                    throw new \Exception("only support v1/v2 now!");
268
                }
269
                if ($recordData) {
270
                    $isWritten = file_put_contents($this->resumeRecordFile, $recordData);
271
                    if ($isWritten === false) {
272
                        error_log("write resumeRecordFile failed");
273
                    }
274
                } else {
275
                    error_log('resumeRecordData encode failed');
276
                }
277
            }
278
        }
279
        if ($this->version == SplitUploadVersion::V1) {
280
            return $this->makeFile($fname);
281
        } elseif ($this->version == SplitUploadVersion::V2) {
0 ignored issues
show
The condition $this->version == Qiniu\...\SplitUploadVersion::V2 is always false.
Loading history...
282
            return $this->completeParts($fname, $this->finishedEtags['uploadId'], $encodedObjectName);
283
        } else {
284
            throw new \Exception("only support v1/v2 now!");
285
        }
286
    }
287
288
    /**
289
     * 创建块
290
     */
291
    private function makeBlock($block, $blockSize)
292
    {
293
        $url = $this->host . '/mkblk/' . $blockSize;
294
        return $this->post($url, $block);
295
    }
296
297
    private function fileUrl($fname)
298
    {
299
        $url = $this->host . '/mkfile/' . $this->size;
300
        $url .= '/mimeType/' . \Qiniu\base64_urlSafeEncode($this->mime);
301
        if ($this->key != null) {
302
            $url .= '/key/' . \Qiniu\base64_urlSafeEncode($this->key);
303
        }
304
        $url .= '/fname/' . \Qiniu\base64_urlSafeEncode($fname);
305
        if (!empty($this->params)) {
306
            foreach ($this->params as $key => $value) {
0 ignored issues
show
The expression $this->params of type string is not traversable.
Loading history...
307
                $val = \Qiniu\base64_urlSafeEncode($value);
308
                $url .= "/$key/$val";
309
            }
310
        }
311
        return $url;
312
    }
313
314
    /**
315
     * 创建文件
316
     */
317
    private function makeFile($fname)
318
    {
319
        $url = $this->fileUrl($fname);
320
        $body = implode(',', $this->contexts);
321
        $response = $this->post($url, $body);
322
        if ($response->needRetry()) {
323
            $response = $this->post($url, $body);
324
        }
325
        if (!$response->ok()) {
326
            return array(null, new Error($this->currentUrl, $response));
327
        }
328
        return array($response->json(), null);
329
    }
330
331
    private function post($url, $data)
332
    {
333
        $this->currentUrl = $url;
334
        $headers = array('Authorization' => 'UpToken ' . $this->upToken);
335
        return Client::post($url, $data, $headers, $this->reqOpt);
336
    }
337
338
    private function blockSize($uploaded)
339
    {
340
        if ($this->size < $uploaded + $this->partSize) {
341
            return $this->size - $uploaded;
342
        }
343
        return $this->partSize;
344
    }
345
346
    private function makeInitReq($encodedObjectName)
347
    {
348
        $res = $this->initReq($encodedObjectName);
349
        $this->finishedEtags["uploadId"] = $res['uploadId'];
350
        $this->finishedEtags["expiredAt"] = $res['expireAt'];
351
    }
352
353
    /**
354
     * 初始化上传任务
355
     */
356
    private function initReq($encodedObjectName)
357
    {
358
        $url = $this->host . '/buckets/' . $this->bucket . '/objects/' . $encodedObjectName . '/uploads';
359
        $headers = array(
360
            'Authorization' => 'UpToken ' . $this->upToken,
361
            'Content-Type' => 'application/json'
362
        );
363
        $response = $this->postWithHeaders($url, null, $headers);
364
        return $response->json();
365
    }
366
367
    /**
368
     * 分块上传v2
369
     */
370
    private function uploadPart($block, $partNumber, $uploadId, $encodedObjectName, $md5)
371
    {
372
        $headers = array(
373
            'Authorization' => 'UpToken ' . $this->upToken,
374
            'Content-Type' => 'application/octet-stream',
375
            'Content-MD5' => $md5
376
        );
377
        $url = $this->host . '/buckets/' . $this->bucket . '/objects/' . $encodedObjectName .
378
            '/uploads/' . $uploadId . '/' . $partNumber;
379
        $response = $this->put($url, $block, $headers);
380
        return $response;
381
    }
382
383
    private function completeParts($fname, $uploadId, $encodedObjectName)
384
    {
385
        $headers = array(
386
            'Authorization' => 'UpToken ' . $this->upToken,
387
            'Content-Type' => 'application/json'
388
        );
389
        $etags = $this->finishedEtags['etags'];
390
        $sortedEtags = \Qiniu\arraySort($etags, 'partNumber');
391
        $metadata = array();
392
        $customVars = array();
393
        if ($this->params) {
394
            foreach ($this->params as $k => $v) {
0 ignored issues
show
The expression $this->params of type string is not traversable.
Loading history...
395
                if (strpos($k, 'x:') === 0) {
396
                    $customVars[$k] = $v;
397
                } elseif (strpos($k, 'x-qn-meta-') === 0) {
398
                    $metadata[$k] = $v;
399
                }
400
            }
401
        }
402
        if (empty($metadata)) {
403
            $metadata = null;
404
        }
405
        if (empty($customVars)) {
406
            $customVars = null;
407
        }
408
        $body = array(
409
            'fname' => $fname,
410
            'mimeType' => $this->mime,
411
            'metadata' => $metadata,
412
            'customVars' => $customVars,
413
            'parts' => $sortedEtags
414
        );
415
        $jsonBody = json_encode($body);
416
        $url = $this->host . '/buckets/' . $this->bucket . '/objects/' . $encodedObjectName . '/uploads/' . $uploadId;
417
        $response = $this->postWithHeaders($url, $jsonBody, $headers);
418
        if ($response->needRetry()) {
419
            $response = $this->postWithHeaders($url, $jsonBody, $headers);
420
        }
421
        if (!$response->ok()) {
422
            return array(null, new Error($this->currentUrl, $response));
423
        }
424
        return array($response->json(), null);
425
    }
426
427
    private function put($url, $data, $headers)
428
    {
429
        $this->currentUrl = $url;
430
        return Client::put($url, $data, $headers, $this->reqOpt);
431
    }
432
433
    private function postWithHeaders($url, $data, $headers)
434
    {
435
        $this->currentUrl = $url;
436
        return Client::post($url, $data, $headers, $this->reqOpt);
437
    }
438
}
439