1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Abacaphiliac\AwsSdk\ClaimCheck\Sqs; |
4
|
|
|
|
5
|
|
|
use Abacaphiliac\AwsSdk\ClaimCheck\ClaimCheck; |
6
|
|
|
use Abacaphiliac\AwsSdk\ClaimCheck\Exception\ExceptionInterface; |
7
|
|
|
use Abacaphiliac\AwsSdk\ClaimCheck\Exception\RuntimeException; |
8
|
|
|
use Aws\Result; |
9
|
|
|
use Aws\Sqs\SqsClient; |
10
|
|
|
use GuzzleHttp\Promise\Promise; |
11
|
|
|
use Psr\Http\Message\StreamInterface; |
12
|
|
|
|
13
|
|
|
class SqsExtendedClient extends SqsClient |
14
|
|
|
{ |
15
|
|
|
/** @var SqsClient */ |
16
|
|
|
private $sqsClient; |
17
|
|
|
|
18
|
|
|
/** @var SqsExtendedClientConfiguration */ |
19
|
|
|
private $configuration; |
20
|
|
|
|
21
|
|
|
/** |
22
|
|
|
* SqsExtendedClient constructor. |
23
|
|
|
* @param SqsClient $sqsClient |
24
|
|
|
* @param SqsExtendedClientConfiguration $configuration |
25
|
|
|
*/ |
26
|
5 |
|
public function __construct(SqsClient $sqsClient, SqsExtendedClientConfiguration $configuration) |
27
|
|
|
{ |
28
|
5 |
|
$this->sqsClient = $sqsClient; |
29
|
5 |
|
$this->configuration = $configuration; |
30
|
5 |
|
} |
31
|
|
|
|
32
|
|
|
/** |
33
|
|
|
* @param string $name |
34
|
|
|
* @param mixed[] $args |
35
|
|
|
* @return Result|Promise |
36
|
|
|
* @throws ExceptionInterface |
37
|
|
|
*/ |
38
|
5 |
|
public function __call($name, array $args) |
39
|
|
|
{ |
40
|
5 |
|
$params = array_key_exists(0, $args) ? $args[0] : []; |
41
|
|
|
|
42
|
|
|
switch ($name) { |
43
|
5 |
|
case 'sendMessage': |
44
|
1 |
|
$result = $this->sendSqsMessage($params); |
45
|
1 |
|
break; |
46
|
4 |
|
case 'receiveMessage': |
47
|
3 |
|
$result = $this->receiveSqsMessage($params); |
48
|
3 |
|
break; |
49
|
1 |
|
case 'deleteMessage': |
50
|
1 |
|
$result = $this->deleteSqsMessage($params); |
51
|
1 |
|
break; |
52
|
|
|
default: |
53
|
|
|
$result = $this->sqsClient->{$name}($params); |
54
|
|
|
break; |
55
|
|
|
} |
56
|
|
|
|
57
|
5 |
|
return $result; |
58
|
|
|
} |
59
|
|
|
|
60
|
|
|
/** |
61
|
|
|
* @param mixed[] $args |
62
|
|
|
* @return Result |
63
|
|
|
* @throws ExceptionInterface |
64
|
|
|
*/ |
65
|
1 |
View Code Duplication |
private function sendSqsMessage(array $args = []) |
|
|
|
|
66
|
|
|
{ |
67
|
1 |
|
$claimCheckSerializer = $this->configuration->getClaimCheckSerializer(); |
68
|
|
|
|
69
|
1 |
|
$message = array_key_exists('MessageBody', $args) ? $args['MessageBody'] : ''; |
70
|
|
|
|
71
|
1 |
|
$claimCheck = $this->storeMessageInS3($message); |
72
|
|
|
|
73
|
1 |
|
$args['MessageBody'] = $claimCheckSerializer->serialize($claimCheck); |
74
|
|
|
|
75
|
1 |
|
return $this->sqsClient->sendMessage($args); |
76
|
|
|
} |
77
|
|
|
|
78
|
|
|
/** |
79
|
|
|
* @param mixed[] $args |
80
|
|
|
* @return Result |
81
|
|
|
* @throws ExceptionInterface |
82
|
|
|
*/ |
83
|
3 |
|
private function receiveSqsMessage(array $args = []) |
84
|
|
|
{ |
85
|
3 |
|
$result = $this->sqsClient->receiveMessage($args); |
86
|
|
|
|
87
|
3 |
|
$messages = array(); |
88
|
|
|
|
89
|
3 |
|
foreach ($result->search('Messages[]') as $i => $message) { |
90
|
3 |
|
$messages[$i] = $this->decodeSqsMessage($message); |
91
|
3 |
|
} |
92
|
|
|
|
93
|
3 |
|
$result->offsetSet('Messages', $messages); |
94
|
|
|
|
95
|
3 |
|
return $result; |
96
|
|
|
} |
97
|
|
|
|
98
|
|
|
/** |
99
|
|
|
* @param mixed[] $args |
100
|
|
|
* @return Result |
101
|
|
|
* @throws ExceptionInterface |
102
|
|
|
*/ |
103
|
1 |
|
private function deleteSqsMessage(array $args = []) |
104
|
|
|
{ |
105
|
1 |
|
if (array_key_exists('ReceiptHandle', $args) && $this->configuration->getDeleteFromS3()) { |
106
|
|
|
// Split receipt handle into S3 and SQS information. |
107
|
1 |
|
$decodedReceiptHandle = json_decode($args['ReceiptHandle'], true); |
108
|
1 |
|
if (json_last_error() === JSON_ERROR_NONE) { |
109
|
1 |
|
$s3Client = $this->configuration->getS3Client(); |
110
|
|
|
|
111
|
|
|
// Delete from S3. |
112
|
1 |
|
$s3Client->deleteObject(array( |
113
|
1 |
|
'Bucket' => $decodedReceiptHandle['s3_bucket_name'], |
114
|
1 |
|
'Key' => $decodedReceiptHandle['s3_key'], |
115
|
1 |
|
)); |
116
|
|
|
|
117
|
|
|
// Adjust SQS args. |
118
|
1 |
|
$args['ReceiptHandle'] = $decodedReceiptHandle['original_receipt_handle']; |
119
|
1 |
|
} |
120
|
1 |
|
} |
121
|
|
|
|
122
|
1 |
|
return $this->sqsClient->deleteMessage($args); |
123
|
|
|
} |
124
|
|
|
|
125
|
|
|
/** |
126
|
|
|
* @param mixed[] $message |
127
|
|
|
* @return string|bool |
128
|
|
|
* @throws ExceptionInterface |
129
|
|
|
*/ |
130
|
3 |
|
private function decodeSqsMessage(array $message) |
131
|
|
|
{ |
132
|
3 |
|
if (!array_key_exists('Body', $message)) { |
133
|
|
|
// Unknown message body. Skip processing. |
134
|
|
|
return $message; |
135
|
|
|
} |
136
|
|
|
|
137
|
|
|
try { |
138
|
3 |
|
$claimCheck = $this->configuration->getClaimCheckSerializer()->unserialize($message['Body']); |
139
|
3 |
|
} catch (ExceptionInterface $e) { |
140
|
|
|
// Unknown message body. Skip processing. |
141
|
1 |
|
return $message; |
142
|
|
|
} |
143
|
|
|
|
144
|
2 |
|
if (!$claimCheck instanceof ClaimCheck) { |
145
|
|
|
// Unknown message body. Skip processing. |
146
|
|
|
return $message; |
147
|
|
|
} |
148
|
|
|
|
149
|
|
|
try { |
150
|
2 |
|
$message['Body'] = $this->fetchClaimCheckFromS3($claimCheck); |
151
|
2 |
|
} catch (ExceptionInterface $e) { |
152
|
|
|
// Unknown message body. Skip processing. |
153
|
|
|
return $message; |
154
|
|
|
} |
155
|
|
|
|
156
|
2 |
|
if (array_key_exists('ReceiptHandle', $message) && $this->configuration->getDeleteFromS3()) { |
157
|
|
|
// Prepend S3 information to receipt handle. |
158
|
1 |
|
$message['ReceiptHandle'] = $this->embedS3PointerInReceiptHandle( |
159
|
1 |
|
$message['ReceiptHandle'], |
160
|
1 |
|
$claimCheck->getS3BucketName(), |
161
|
1 |
|
$claimCheck->getS3Key() |
162
|
1 |
|
); |
163
|
1 |
|
} |
164
|
|
|
|
165
|
2 |
|
return $message; |
166
|
|
|
} |
167
|
|
|
|
168
|
|
|
/** |
169
|
|
|
* @param string $message |
170
|
|
|
* @return ClaimCheck |
171
|
|
|
*/ |
172
|
1 |
View Code Duplication |
private function storeMessageInS3($message) |
|
|
|
|
173
|
|
|
{ |
174
|
1 |
|
$s3Client = $this->configuration->getS3Client(); |
175
|
1 |
|
$s3BucketName = $this->configuration->getS3BucketName(); |
176
|
1 |
|
$claimCheckFactory = $this->configuration->getClaimCheckFactory(); |
177
|
|
|
|
178
|
1 |
|
$claimCheck = $claimCheckFactory->create(); |
179
|
|
|
|
180
|
1 |
|
$s3Client->putObject([ |
181
|
1 |
|
'Bucket' => $s3BucketName, |
182
|
1 |
|
'Key' => $claimCheck->getS3Key(), |
183
|
1 |
|
'Body' => $message, |
184
|
1 |
|
]); |
185
|
|
|
|
186
|
1 |
|
return $claimCheck; |
187
|
|
|
} |
188
|
|
|
|
189
|
|
|
/** |
190
|
|
|
* @param ClaimCheck $claimCheck |
191
|
|
|
* @return string |
192
|
|
|
* @throws ExceptionInterface |
193
|
|
|
*/ |
194
|
2 |
|
private function fetchClaimCheckFromS3(ClaimCheck $claimCheck) |
195
|
|
|
{ |
196
|
2 |
|
$s3Client = $this->configuration->getS3Client(); |
197
|
|
|
|
198
|
2 |
|
$result = $s3Client->getObject([ |
199
|
2 |
|
'Bucket' => $claimCheck->getS3BucketName(), |
200
|
2 |
|
'Key' => $claimCheck->getS3Key(), |
201
|
2 |
|
]); |
202
|
|
|
|
203
|
2 |
|
$body = $result->get('Body'); |
204
|
|
|
|
205
|
|
|
// Unpack the message. |
206
|
2 |
|
if ($body instanceof StreamInterface) { |
207
|
|
|
try { |
208
|
|
|
return $body->getContents(); |
209
|
|
|
} catch (\RuntimeException $e) { |
210
|
|
|
throw new RuntimeException($e->getMessage(), 0, $e); |
211
|
|
|
} |
212
|
|
|
} |
213
|
|
|
|
214
|
2 |
|
return $body; |
215
|
|
|
} |
216
|
|
|
|
217
|
|
|
/** |
218
|
|
|
* @param string $receiptHandle |
219
|
|
|
* @param string $s3MsgBucketName |
220
|
|
|
* @param string $s3MsgKey |
221
|
|
|
* @return string |
222
|
|
|
*/ |
223
|
1 |
|
private function embedS3PointerInReceiptHandle($receiptHandle, $s3MsgBucketName, $s3MsgKey) |
224
|
|
|
{ |
225
|
1 |
|
return json_encode(array( |
226
|
1 |
|
'original_receipt_handle' => $receiptHandle, |
227
|
1 |
|
's3_bucket_name' => $s3MsgBucketName, |
228
|
1 |
|
's3_key' => $s3MsgKey, |
229
|
1 |
|
)); |
230
|
|
|
} |
231
|
|
|
} |
232
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.