Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
1 | <?php |
||
44 | final class SqsAdapter implements AdapterInterface, NamedInterface |
||
45 | { |
||
46 | const BATCHSIZE_DELETE = 10; |
||
47 | const BATCHSIZE_RECEIVE = 10; |
||
48 | const BATCHSIZE_SEND = 10; |
||
49 | |||
50 | /** @var SqsClient */ |
||
51 | protected $client; |
||
52 | |||
53 | /** @var array */ |
||
54 | protected $options; |
||
55 | |||
56 | /** @var string */ |
||
57 | protected $name; |
||
58 | |||
59 | /** @var string */ |
||
60 | protected $url; |
||
61 | |||
62 | /** |
||
63 | * @param SqsClient $client |
||
64 | * @param string $name |
||
65 | * @param array $options - DelaySeconds <integer> The time in seconds that the delivery of all |
||
66 | * messages in the queue will be delayed. |
||
67 | * - MaximumMessageSize <integer> The limit of how many bytes a message |
||
68 | * can contain before Amazon SQS rejects it. |
||
69 | * - MessageRetentionPeriod <integer> The number of seconds Amazon SQS |
||
70 | * retains a message. |
||
71 | * - Policy <string> The queue's policy. A valid form-url-encoded policy. |
||
72 | * - ReceiveMessageWaitTimeSeconds <integer> The time for which a |
||
73 | * ReceiveMessage call will wait for a message to arrive. |
||
74 | * - VisibilityTimeout <integer> The visibility timeout for the queue. |
||
75 | */ |
||
76 | 18 | public function __construct(SqsClient $client, $name, array $options = []) |
|
82 | |||
83 | /** |
||
84 | * @param MessageInterface[] $messages |
||
85 | */ |
||
86 | 5 | View Code Duplication | public function acknowledge(array $messages) |
109 | |||
110 | /** |
||
111 | * @param MessageInterface[] $messages |
||
112 | * |
||
113 | * @throws FailedAcknowledgementException|void |
||
114 | */ |
||
115 | 1 | View Code Duplication | public function reject(array $messages) |
116 | { |
||
117 | 1 | $url = $this->getQueueUrl(); |
|
118 | 1 | $failed = []; |
|
119 | 1 | $batches = array_chunk($this->createRejectEntries($messages), self::BATCHSIZE_DELETE); |
|
120 | |||
121 | 1 | foreach ($batches as $batch) { |
|
122 | 1 | $results = $this->client->changeMessageVisibilityBatch([ |
|
123 | 1 | 'QueueUrl' => $url, |
|
124 | 1 | 'Entries' => $batch, |
|
125 | ]); |
||
126 | |||
127 | $map = function ($result) use ($messages) { |
||
128 | return $messages[$result['Id']]; |
||
129 | 1 | }; |
|
130 | |||
131 | 1 | $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: [])); |
|
132 | } |
||
133 | |||
134 | 1 | if (!empty($failed)) { |
|
135 | throw new FailedAcknowledgementException($this, $failed); |
||
136 | } |
||
137 | 1 | } |
|
138 | |||
139 | /** |
||
140 | * @param MessageFactoryInterface $factory |
||
141 | * @param int $limit |
||
142 | * |
||
143 | * @return \Generator |
||
144 | */ |
||
145 | 8 | public function dequeue(MessageFactoryInterface $factory, $limit) |
|
146 | { |
||
147 | 8 | $remaining = $limit ?: 0; |
|
148 | |||
149 | 8 | while (null === $limit || $remaining > 0) { |
|
150 | /** |
||
151 | * If a limit has been specified, set {@see $size} so that we don't return more |
||
152 | * than the requested number of messages if it's less than the batch size. |
||
153 | */ |
||
154 | 8 | $size = ($limit !== null) ? min($remaining, self::BATCHSIZE_RECEIVE) : self::BATCHSIZE_RECEIVE; |
|
155 | |||
156 | 8 | $timestamp = time() + $this->getQueueVisibilityTimeout(); |
|
157 | $validator = function () use ($timestamp) { |
||
158 | 4 | return time() < $timestamp; |
|
159 | 8 | }; |
|
160 | |||
161 | 8 | $results = $this->client->receiveMessage(array_filter([ |
|
162 | 8 | 'QueueUrl' => $this->getQueueUrl(), |
|
163 | 'AttributeNames' => ['All'], |
||
164 | 8 | 'MaxNumberOfMessages' => $size, |
|
165 | 8 | 'VisibilityTimeout' => $this->getOption('VisibilityTimeout'), |
|
166 | 8 | 'WaitTimeSeconds' => $this->getOption('ReceiveMessageWaitTimeSeconds'), |
|
167 | ])); |
||
168 | |||
169 | 8 | $messages = $results->get('Messages') ?: []; |
|
170 | |||
171 | 8 | if (count($messages) === 0) { |
|
172 | 1 | break; |
|
173 | } |
||
174 | |||
175 | 7 | foreach ($messages as $result) { |
|
176 | 7 | yield $factory->createMessage( |
|
177 | 7 | $result['Body'], |
|
178 | [ |
||
179 | 7 | 'metadata' => $this->createMessageMetadata($result), |
|
180 | 7 | 'validator' => $validator, |
|
181 | ] |
||
182 | ); |
||
183 | } |
||
184 | |||
185 | // Decrement the number of messages remaining. |
||
186 | 5 | $remaining -= count($messages); |
|
187 | } |
||
188 | 6 | } |
|
189 | |||
190 | /** |
||
191 | * @param MessageInterface[] $messages |
||
192 | */ |
||
193 | 4 | View Code Duplication | public function enqueue(array $messages) |
194 | { |
||
195 | 4 | $url = $this->getQueueUrl(); |
|
196 | 4 | $failed = []; |
|
197 | 4 | $batches = array_chunk($this->createEnqueueEntries($messages), self::BATCHSIZE_SEND); |
|
198 | |||
199 | 4 | foreach ($batches as $batch) { |
|
200 | 4 | $results = $this->client->sendMessageBatch([ |
|
201 | 4 | 'QueueUrl' => $url, |
|
202 | 4 | 'Entries' => $batch, |
|
203 | ]); |
||
204 | |||
205 | $map = function ($result) use ($messages) { |
||
206 | return $messages[$result['Id']]; |
||
207 | 4 | }; |
|
208 | |||
209 | 4 | $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: [])); |
|
210 | } |
||
211 | |||
212 | 4 | if (!empty($failed)) { |
|
213 | throw new FailedEnqueueException($this, $failed); |
||
214 | } |
||
215 | 4 | } |
|
216 | |||
217 | /** |
||
218 | * {@inheritdoc} |
||
219 | */ |
||
220 | 2 | public function purge() |
|
224 | |||
225 | /** |
||
226 | * {@inheritdoc} |
||
227 | */ |
||
228 | 2 | public function delete() |
|
232 | |||
233 | /** |
||
234 | * @param MessageInterface[] $messages |
||
235 | * |
||
236 | * @return array |
||
237 | */ |
||
238 | 5 | View Code Duplication | protected function createDeleteEntries(array $messages) |
239 | { |
||
240 | 5 | array_walk( |
|
241 | 5 | $messages, |
|
242 | function (MessageInterface &$message, $id) { |
||
243 | 5 | $metadata = $message->getMetadata(); |
|
244 | $message = [ |
||
245 | 5 | 'Id' => $id, |
|
246 | 5 | 'ReceiptHandle' => $metadata->get('ReceiptHandle'), |
|
247 | ]; |
||
248 | 5 | } |
|
249 | ); |
||
250 | |||
251 | 5 | return $messages; |
|
252 | } |
||
253 | |||
254 | /** |
||
255 | * @param MessageInterface[] $messages |
||
256 | * |
||
257 | * @return array |
||
258 | */ |
||
259 | 1 | View Code Duplication | protected function createRejectEntries(array $messages) |
260 | { |
||
261 | 1 | array_walk( |
|
262 | 1 | $messages, |
|
263 | function (MessageInterface &$message, $id) { |
||
264 | 1 | $metadata = $message->getMetadata(); |
|
265 | $message = [ |
||
266 | 1 | 'Id' => $id, |
|
267 | 1 | 'ReceiptHandle' => $metadata->get('ReceiptHandle'), |
|
268 | 1 | 'VisibilityTimeout' => 0, |
|
269 | ]; |
||
270 | 1 | } |
|
271 | ); |
||
272 | |||
273 | 1 | return $messages; |
|
274 | } |
||
275 | |||
276 | /** |
||
277 | * @param MessageInterface[] $messages |
||
278 | * |
||
279 | * @return array |
||
280 | */ |
||
281 | 4 | protected function createEnqueueEntries(array $messages) |
|
282 | { |
||
283 | 4 | array_walk( |
|
284 | 4 | $messages, |
|
285 | 4 | function (MessageInterface &$message, $id) { |
|
286 | 4 | $metadata = $message->getMetadata(); |
|
287 | $message = [ |
||
288 | 4 | 'Id' => $id, |
|
289 | 4 | 'MessageBody' => $message->getBody(), |
|
290 | 4 | 'MessageAttributes' => $metadata->get('MessageAttributes') ?: [], |
|
291 | ]; |
||
292 | 4 | if (!is_null($metadata->get('DelaySeconds'))) { |
|
293 | 2 | $message['DelaySeconds'] = $metadata->get('DelaySeconds'); |
|
294 | } |
||
295 | 4 | } |
|
296 | ); |
||
297 | |||
298 | 4 | return $messages; |
|
299 | } |
||
300 | |||
301 | /** |
||
302 | * @param array $result |
||
303 | * |
||
304 | * @return array |
||
305 | */ |
||
306 | 7 | protected function createMessageMetadata(array $result) |
|
307 | { |
||
308 | 7 | return array_intersect_key( |
|
309 | 7 | $result, |
|
310 | [ |
||
311 | 7 | 'Attributes' => [], |
|
312 | 'MessageAttributes' => [], |
||
313 | 'MessageId' => null, |
||
314 | 'ReceiptHandle' => null, |
||
315 | ] |
||
316 | ); |
||
317 | } |
||
318 | |||
319 | /** |
||
320 | * @param string $name |
||
321 | * @param mixed $default |
||
322 | * |
||
323 | * @return mixed |
||
324 | */ |
||
325 | 8 | protected function getOption($name, $default = null) |
|
329 | |||
330 | /** |
||
331 | * @return string |
||
332 | */ |
||
333 | 17 | protected function getQueueUrl() |
|
334 | { |
||
335 | 17 | if (!$this->url) { |
|
336 | 17 | $result = $this->client->createQueue([ |
|
337 | 17 | 'QueueName' => $this->name, |
|
338 | 17 | 'Attributes' => $this->options, |
|
339 | ]); |
||
340 | |||
341 | 17 | $this->url = $result->get('QueueUrl'); |
|
342 | } |
||
343 | |||
344 | 17 | return $this->url; |
|
345 | } |
||
346 | |||
347 | /** |
||
348 | * @return int |
||
349 | */ |
||
350 | 8 | protected function getQueueVisibilityTimeout() |
|
351 | { |
||
352 | 8 | if (!isset($this->options['VisibilityTimeout'])) { |
|
353 | 8 | $result = $this->client->getQueueAttributes([ |
|
354 | 8 | 'QueueUrl' => $this->getQueueUrl(), |
|
355 | 'AttributeNames' => ['VisibilityTimeout'], |
||
356 | ]); |
||
357 | |||
358 | 8 | $attributes = $result->get('Attributes'); |
|
359 | 8 | $this->options['VisibilityTimeout'] = $attributes['VisibilityTimeout']; |
|
360 | } |
||
361 | |||
362 | 8 | return $this->options['VisibilityTimeout']; |
|
363 | } |
||
364 | |||
365 | /** |
||
366 | * @return string |
||
367 | */ |
||
368 | public function getQueueName() |
||
372 | } |
||
373 |
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.