Completed
Push — master ( 367a58...ea98e4 )
by
unknown
27:38 queued 14s
created
lib/private/Files/ObjectStore/S3ObjectTrait.php 1 patch
Indentation   +236 added lines, -236 removed lines patch added patch discarded remove patch
@@ -16,240 +16,240 @@
 block discarded – undo
16 16
 use Psr\Http\Message\StreamInterface;
17 17
 
18 18
 trait S3ObjectTrait {
19
-	use S3ConfigTrait;
20
-
21
-	/**
22
-	 * Returns the connection
23
-	 *
24
-	 * @return S3Client connected client
25
-	 * @throws \Exception if connection could not be made
26
-	 */
27
-	abstract protected function getConnection();
28
-
29
-	abstract protected function getCertificateBundlePath(): ?string;
30
-	abstract protected function getSSECParameters(bool $copy = false): array;
31
-
32
-	/**
33
-	 * @param string $urn the unified resource name used to identify the object
34
-	 *
35
-	 * @return resource stream with the read data
36
-	 * @throws \Exception when something goes wrong, message will be logged
37
-	 * @since 7.0.0
38
-	 */
39
-	public function readObject($urn) {
40
-		$fh = SeekableHttpStream::open(function ($range) use ($urn) {
41
-			$command = $this->getConnection()->getCommand('GetObject', [
42
-				'Bucket' => $this->bucket,
43
-				'Key' => $urn,
44
-				'Range' => 'bytes=' . $range,
45
-			] + $this->getSSECParameters());
46
-			$request = \Aws\serialize($command);
47
-			$headers = [];
48
-			foreach ($request->getHeaders() as $key => $values) {
49
-				foreach ($values as $value) {
50
-					$headers[] = "$key: $value";
51
-				}
52
-			}
53
-			$opts = [
54
-				'http' => [
55
-					'protocol_version' => $request->getProtocolVersion(),
56
-					'header' => $headers,
57
-				]
58
-			];
59
-			$bundle = $this->getCertificateBundlePath();
60
-			if ($bundle) {
61
-				$opts['ssl'] = [
62
-					'cafile' => $bundle
63
-				];
64
-			}
65
-
66
-			if ($this->getProxy()) {
67
-				$opts['http']['proxy'] = $this->getProxy();
68
-				$opts['http']['request_fulluri'] = true;
69
-			}
70
-
71
-			$context = stream_context_create($opts);
72
-			return fopen($request->getUri(), 'r', false, $context);
73
-		});
74
-		if (!$fh) {
75
-			throw new \Exception("Failed to read object $urn");
76
-		}
77
-		return $fh;
78
-	}
79
-
80
-	private function buildS3Metadata(array $metadata): array {
81
-		$result = [];
82
-		foreach ($metadata as $key => $value) {
83
-			$result['x-amz-meta-' . $key] = $value;
84
-		}
85
-		return $result;
86
-	}
87
-
88
-	/**
89
-	 * Single object put helper
90
-	 *
91
-	 * @param string $urn the unified resource name used to identify the object
92
-	 * @param StreamInterface $stream stream with the data to write
93
-	 * @param array $metaData the metadata to set for the object
94
-	 * @throws \Exception when something goes wrong, message will be logged
95
-	 */
96
-	protected function writeSingle(string $urn, StreamInterface $stream, array $metaData): void {
97
-		$mimetype = $metaData['mimetype'] ?? null;
98
-		unset($metaData['mimetype']);
99
-		$this->getConnection()->putObject([
100
-			'Bucket' => $this->bucket,
101
-			'Key' => $urn,
102
-			'Body' => $stream,
103
-			'ACL' => 'private',
104
-			'ContentType' => $mimetype,
105
-			'Metadata' => $this->buildS3Metadata($metaData),
106
-			'StorageClass' => $this->storageClass,
107
-		] + $this->getSSECParameters());
108
-	}
109
-
110
-
111
-	/**
112
-	 * Multipart upload helper that tries to avoid orphaned fragments in S3
113
-	 *
114
-	 * @param string $urn the unified resource name used to identify the object
115
-	 * @param StreamInterface $stream stream with the data to write
116
-	 * @param array $metaData the metadata to set for the object
117
-	 * @throws \Exception when something goes wrong, message will be logged
118
-	 */
119
-	protected function writeMultiPart(string $urn, StreamInterface $stream, array $metaData): void {
120
-		$mimetype = $metaData['mimetype'] ?? null;
121
-		unset($metaData['mimetype']);
122
-
123
-		$attempts = 0;
124
-		$uploaded = false;
125
-		$concurrency = $this->concurrency;
126
-		$exception = null;
127
-		$state = null;
128
-
129
-		// retry multipart upload once with concurrency at half on failure
130
-		while (!$uploaded && $attempts <= 1) {
131
-			$uploader = new MultipartUploader($this->getConnection(), $stream, [
132
-				'bucket' => $this->bucket,
133
-				'concurrency' => $concurrency,
134
-				'key' => $urn,
135
-				'part_size' => $this->uploadPartSize,
136
-				'state' => $state,
137
-				'params' => [
138
-					'ContentType' => $mimetype,
139
-					'Metadata' => $this->buildS3Metadata($metaData),
140
-					'StorageClass' => $this->storageClass,
141
-				] + $this->getSSECParameters(),
142
-			]);
143
-
144
-			try {
145
-				$uploader->upload();
146
-				$uploaded = true;
147
-			} catch (S3MultipartUploadException $e) {
148
-				$exception = $e;
149
-				$attempts++;
150
-
151
-				if ($concurrency > 1) {
152
-					$concurrency = round($concurrency / 2);
153
-				}
154
-
155
-				if ($stream->isSeekable()) {
156
-					$stream->rewind();
157
-				}
158
-			}
159
-		}
160
-
161
-		if (!$uploaded) {
162
-			// if anything goes wrong with multipart, make sure that you don´t poison and
163
-			// slow down s3 bucket with orphaned fragments
164
-			$uploadInfo = $exception->getState()->getId();
165
-			if ($exception->getState()->isInitiated() && (array_key_exists('UploadId', $uploadInfo))) {
166
-				$this->getConnection()->abortMultipartUpload($uploadInfo);
167
-			}
168
-
169
-			throw new \OCA\DAV\Connector\Sabre\Exception\BadGateway('Error while uploading to S3 bucket', 0, $exception);
170
-		}
171
-	}
172
-
173
-	public function writeObject($urn, $stream, ?string $mimetype = null) {
174
-		$metaData = [];
175
-		if ($mimetype) {
176
-			$metaData['mimetype'] = $mimetype;
177
-		}
178
-		$this->writeObjectWithMetaData($urn, $stream, $metaData);
179
-	}
180
-
181
-	public function writeObjectWithMetaData(string $urn, $stream, array $metaData): void {
182
-		$canSeek = fseek($stream, 0, SEEK_CUR) === 0;
183
-		$psrStream = Utils::streamFor($stream);
184
-
185
-
186
-		$size = $psrStream->getSize();
187
-		if ($size === null || !$canSeek) {
188
-			// The s3 single-part upload requires the size to be known for the stream.
189
-			// So for input streams that don't have a known size, we need to copy (part of)
190
-			// the input into a temporary stream so the size can be determined
191
-			$buffer = new Psr7\Stream(fopen('php://temp', 'rw+'));
192
-			Utils::copyToStream($psrStream, $buffer, $this->putSizeLimit);
193
-			$buffer->seek(0);
194
-			if ($buffer->getSize() < $this->putSizeLimit) {
195
-				// buffer is fully seekable, so use it directly for the small upload
196
-				$this->writeSingle($urn, $buffer, $metaData);
197
-			} else {
198
-				$loadStream = new Psr7\AppendStream([$buffer, $psrStream]);
199
-				$this->writeMultiPart($urn, $loadStream, $metaData);
200
-			}
201
-		} else {
202
-			if ($size < $this->putSizeLimit) {
203
-				$this->writeSingle($urn, $psrStream, $metaData);
204
-			} else {
205
-				$this->writeMultiPart($urn, $psrStream, $metaData);
206
-			}
207
-		}
208
-		$psrStream->close();
209
-	}
210
-
211
-	/**
212
-	 * @param string $urn the unified resource name used to identify the object
213
-	 * @return void
214
-	 * @throws \Exception when something goes wrong, message will be logged
215
-	 * @since 7.0.0
216
-	 */
217
-	public function deleteObject($urn) {
218
-		$this->getConnection()->deleteObject([
219
-			'Bucket' => $this->bucket,
220
-			'Key' => $urn,
221
-		]);
222
-	}
223
-
224
-	public function objectExists($urn) {
225
-		return $this->getConnection()->doesObjectExist($this->bucket, $urn, $this->getSSECParameters());
226
-	}
227
-
228
-	public function copyObject($from, $to, array $options = []) {
229
-		$sourceMetadata = $this->getConnection()->headObject([
230
-			'Bucket' => $this->getBucket(),
231
-			'Key' => $from,
232
-		] + $this->getSSECParameters());
233
-
234
-		$size = (int)($sourceMetadata->get('Size') ?? $sourceMetadata->get('ContentLength'));
235
-
236
-		if ($this->useMultipartCopy && $size > $this->copySizeLimit) {
237
-			$copy = new MultipartCopy($this->getConnection(), [
238
-				'source_bucket' => $this->getBucket(),
239
-				'source_key' => $from
240
-			], array_merge([
241
-				'bucket' => $this->getBucket(),
242
-				'key' => $to,
243
-				'acl' => 'private',
244
-				'params' => $this->getSSECParameters() + $this->getSSECParameters(true),
245
-				'source_metadata' => $sourceMetadata
246
-			], $options));
247
-			$copy->copy();
248
-		} else {
249
-			$this->getConnection()->copy($this->getBucket(), $from, $this->getBucket(), $to, 'private', array_merge([
250
-				'params' => $this->getSSECParameters() + $this->getSSECParameters(true),
251
-				'mup_threshold' => PHP_INT_MAX,
252
-			], $options));
253
-		}
254
-	}
19
+    use S3ConfigTrait;
20
+
21
+    /**
22
+     * Returns the connection
23
+     *
24
+     * @return S3Client connected client
25
+     * @throws \Exception if connection could not be made
26
+     */
27
+    abstract protected function getConnection();
28
+
29
+    abstract protected function getCertificateBundlePath(): ?string;
30
+    abstract protected function getSSECParameters(bool $copy = false): array;
31
+
32
+    /**
33
+     * @param string $urn the unified resource name used to identify the object
34
+     *
35
+     * @return resource stream with the read data
36
+     * @throws \Exception when something goes wrong, message will be logged
37
+     * @since 7.0.0
38
+     */
39
+    public function readObject($urn) {
40
+        $fh = SeekableHttpStream::open(function ($range) use ($urn) {
41
+            $command = $this->getConnection()->getCommand('GetObject', [
42
+                'Bucket' => $this->bucket,
43
+                'Key' => $urn,
44
+                'Range' => 'bytes=' . $range,
45
+            ] + $this->getSSECParameters());
46
+            $request = \Aws\serialize($command);
47
+            $headers = [];
48
+            foreach ($request->getHeaders() as $key => $values) {
49
+                foreach ($values as $value) {
50
+                    $headers[] = "$key: $value";
51
+                }
52
+            }
53
+            $opts = [
54
+                'http' => [
55
+                    'protocol_version' => $request->getProtocolVersion(),
56
+                    'header' => $headers,
57
+                ]
58
+            ];
59
+            $bundle = $this->getCertificateBundlePath();
60
+            if ($bundle) {
61
+                $opts['ssl'] = [
62
+                    'cafile' => $bundle
63
+                ];
64
+            }
65
+
66
+            if ($this->getProxy()) {
67
+                $opts['http']['proxy'] = $this->getProxy();
68
+                $opts['http']['request_fulluri'] = true;
69
+            }
70
+
71
+            $context = stream_context_create($opts);
72
+            return fopen($request->getUri(), 'r', false, $context);
73
+        });
74
+        if (!$fh) {
75
+            throw new \Exception("Failed to read object $urn");
76
+        }
77
+        return $fh;
78
+    }
79
+
80
+    private function buildS3Metadata(array $metadata): array {
81
+        $result = [];
82
+        foreach ($metadata as $key => $value) {
83
+            $result['x-amz-meta-' . $key] = $value;
84
+        }
85
+        return $result;
86
+    }
87
+
88
+    /**
89
+     * Single object put helper
90
+     *
91
+     * @param string $urn the unified resource name used to identify the object
92
+     * @param StreamInterface $stream stream with the data to write
93
+     * @param array $metaData the metadata to set for the object
94
+     * @throws \Exception when something goes wrong, message will be logged
95
+     */
96
+    protected function writeSingle(string $urn, StreamInterface $stream, array $metaData): void {
97
+        $mimetype = $metaData['mimetype'] ?? null;
98
+        unset($metaData['mimetype']);
99
+        $this->getConnection()->putObject([
100
+            'Bucket' => $this->bucket,
101
+            'Key' => $urn,
102
+            'Body' => $stream,
103
+            'ACL' => 'private',
104
+            'ContentType' => $mimetype,
105
+            'Metadata' => $this->buildS3Metadata($metaData),
106
+            'StorageClass' => $this->storageClass,
107
+        ] + $this->getSSECParameters());
108
+    }
109
+
110
+
111
+    /**
112
+     * Multipart upload helper that tries to avoid orphaned fragments in S3
113
+     *
114
+     * @param string $urn the unified resource name used to identify the object
115
+     * @param StreamInterface $stream stream with the data to write
116
+     * @param array $metaData the metadata to set for the object
117
+     * @throws \Exception when something goes wrong, message will be logged
118
+     */
119
+    protected function writeMultiPart(string $urn, StreamInterface $stream, array $metaData): void {
120
+        $mimetype = $metaData['mimetype'] ?? null;
121
+        unset($metaData['mimetype']);
122
+
123
+        $attempts = 0;
124
+        $uploaded = false;
125
+        $concurrency = $this->concurrency;
126
+        $exception = null;
127
+        $state = null;
128
+
129
+        // retry multipart upload once with concurrency at half on failure
130
+        while (!$uploaded && $attempts <= 1) {
131
+            $uploader = new MultipartUploader($this->getConnection(), $stream, [
132
+                'bucket' => $this->bucket,
133
+                'concurrency' => $concurrency,
134
+                'key' => $urn,
135
+                'part_size' => $this->uploadPartSize,
136
+                'state' => $state,
137
+                'params' => [
138
+                    'ContentType' => $mimetype,
139
+                    'Metadata' => $this->buildS3Metadata($metaData),
140
+                    'StorageClass' => $this->storageClass,
141
+                ] + $this->getSSECParameters(),
142
+            ]);
143
+
144
+            try {
145
+                $uploader->upload();
146
+                $uploaded = true;
147
+            } catch (S3MultipartUploadException $e) {
148
+                $exception = $e;
149
+                $attempts++;
150
+
151
+                if ($concurrency > 1) {
152
+                    $concurrency = round($concurrency / 2);
153
+                }
154
+
155
+                if ($stream->isSeekable()) {
156
+                    $stream->rewind();
157
+                }
158
+            }
159
+        }
160
+
161
+        if (!$uploaded) {
162
+            // if anything goes wrong with multipart, make sure that you don´t poison and
163
+            // slow down s3 bucket with orphaned fragments
164
+            $uploadInfo = $exception->getState()->getId();
165
+            if ($exception->getState()->isInitiated() && (array_key_exists('UploadId', $uploadInfo))) {
166
+                $this->getConnection()->abortMultipartUpload($uploadInfo);
167
+            }
168
+
169
+            throw new \OCA\DAV\Connector\Sabre\Exception\BadGateway('Error while uploading to S3 bucket', 0, $exception);
170
+        }
171
+    }
172
+
173
+    public function writeObject($urn, $stream, ?string $mimetype = null) {
174
+        $metaData = [];
175
+        if ($mimetype) {
176
+            $metaData['mimetype'] = $mimetype;
177
+        }
178
+        $this->writeObjectWithMetaData($urn, $stream, $metaData);
179
+    }
180
+
181
+    public function writeObjectWithMetaData(string $urn, $stream, array $metaData): void {
182
+        $canSeek = fseek($stream, 0, SEEK_CUR) === 0;
183
+        $psrStream = Utils::streamFor($stream);
184
+
185
+
186
+        $size = $psrStream->getSize();
187
+        if ($size === null || !$canSeek) {
188
+            // The s3 single-part upload requires the size to be known for the stream.
189
+            // So for input streams that don't have a known size, we need to copy (part of)
190
+            // the input into a temporary stream so the size can be determined
191
+            $buffer = new Psr7\Stream(fopen('php://temp', 'rw+'));
192
+            Utils::copyToStream($psrStream, $buffer, $this->putSizeLimit);
193
+            $buffer->seek(0);
194
+            if ($buffer->getSize() < $this->putSizeLimit) {
195
+                // buffer is fully seekable, so use it directly for the small upload
196
+                $this->writeSingle($urn, $buffer, $metaData);
197
+            } else {
198
+                $loadStream = new Psr7\AppendStream([$buffer, $psrStream]);
199
+                $this->writeMultiPart($urn, $loadStream, $metaData);
200
+            }
201
+        } else {
202
+            if ($size < $this->putSizeLimit) {
203
+                $this->writeSingle($urn, $psrStream, $metaData);
204
+            } else {
205
+                $this->writeMultiPart($urn, $psrStream, $metaData);
206
+            }
207
+        }
208
+        $psrStream->close();
209
+    }
210
+
211
+    /**
212
+     * @param string $urn the unified resource name used to identify the object
213
+     * @return void
214
+     * @throws \Exception when something goes wrong, message will be logged
215
+     * @since 7.0.0
216
+     */
217
+    public function deleteObject($urn) {
218
+        $this->getConnection()->deleteObject([
219
+            'Bucket' => $this->bucket,
220
+            'Key' => $urn,
221
+        ]);
222
+    }
223
+
224
+    public function objectExists($urn) {
225
+        return $this->getConnection()->doesObjectExist($this->bucket, $urn, $this->getSSECParameters());
226
+    }
227
+
228
+    public function copyObject($from, $to, array $options = []) {
229
+        $sourceMetadata = $this->getConnection()->headObject([
230
+            'Bucket' => $this->getBucket(),
231
+            'Key' => $from,
232
+        ] + $this->getSSECParameters());
233
+
234
+        $size = (int)($sourceMetadata->get('Size') ?? $sourceMetadata->get('ContentLength'));
235
+
236
+        if ($this->useMultipartCopy && $size > $this->copySizeLimit) {
237
+            $copy = new MultipartCopy($this->getConnection(), [
238
+                'source_bucket' => $this->getBucket(),
239
+                'source_key' => $from
240
+            ], array_merge([
241
+                'bucket' => $this->getBucket(),
242
+                'key' => $to,
243
+                'acl' => 'private',
244
+                'params' => $this->getSSECParameters() + $this->getSSECParameters(true),
245
+                'source_metadata' => $sourceMetadata
246
+            ], $options));
247
+            $copy->copy();
248
+        } else {
249
+            $this->getConnection()->copy($this->getBucket(), $from, $this->getBucket(), $to, 'private', array_merge([
250
+                'params' => $this->getSSECParameters() + $this->getSSECParameters(true),
251
+                'mup_threshold' => PHP_INT_MAX,
252
+            ], $options));
253
+        }
254
+    }
255 255
 }
Please login to merge, or discard this patch.