| @@ 86-108 (lines=23) @@ | ||
| 83 | /** |
|
| 84 | * @param MessageInterface[] $messages |
|
| 85 | */ |
|
| 86 | public function acknowledge(array $messages) |
|
| 87 | { |
|
| 88 | $url = $this->getQueueUrl(); |
|
| 89 | $failed = []; |
|
| 90 | $batches = array_chunk($this->createDeleteEntries($messages), self::BATCHSIZE_DELETE); |
|
| 91 | ||
| 92 | foreach ($batches as $batch) { |
|
| 93 | $results = $this->client->deleteMessageBatch([ |
|
| 94 | 'QueueUrl' => $url, |
|
| 95 | 'Entries' => $batch, |
|
| 96 | ]); |
|
| 97 | ||
| 98 | $map = function ($result) use ($messages) { |
|
| 99 | return $messages[$result['Id']]; |
|
| 100 | }; |
|
| 101 | ||
| 102 | $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: [])); |
|
| 103 | } |
|
| 104 | ||
| 105 | if (!empty($failed)) { |
|
| 106 | throw new FailedAcknowledgementException($this, $failed); |
|
| 107 | } |
|
| 108 | } |
|
| 109 | ||
| 110 | /** |
|
| 111 | * @param MessageInterface[] $messages |
|
| @@ 115-137 (lines=23) @@ | ||
| 112 | * |
|
| 113 | * @throws FailedAcknowledgementException|void |
|
| 114 | */ |
|
| 115 | public function reject(array $messages) |
|
| 116 | { |
|
| 117 | $url = $this->getQueueUrl(); |
|
| 118 | $failed = []; |
|
| 119 | $batches = array_chunk($this->createRejectEntries($messages), self::BATCHSIZE_DELETE); |
|
| 120 | ||
| 121 | foreach ($batches as $batch) { |
|
| 122 | $results = $this->client->changeMessageVisibilityBatch([ |
|
| 123 | 'QueueUrl' => $url, |
|
| 124 | 'Entries' => $batch, |
|
| 125 | ]); |
|
| 126 | ||
| 127 | $map = function ($result) use ($messages) { |
|
| 128 | return $messages[$result['Id']]; |
|
| 129 | }; |
|
| 130 | ||
| 131 | $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: [])); |
|
| 132 | } |
|
| 133 | ||
| 134 | if (!empty($failed)) { |
|
| 135 | throw new FailedAcknowledgementException($this, $failed); |
|
| 136 | } |
|
| 137 | } |
|
| 138 | ||
| 139 | /** |
|
| 140 | * @param MessageFactoryInterface $factory |
|
| @@ 193-215 (lines=23) @@ | ||
| 190 | /** |
|
| 191 | * @param MessageInterface[] $messages |
|
| 192 | */ |
|
| 193 | public function enqueue(array $messages) |
|
| 194 | { |
|
| 195 | $url = $this->getQueueUrl(); |
|
| 196 | $failed = []; |
|
| 197 | $batches = array_chunk($this->createEnqueueEntries($messages), self::BATCHSIZE_SEND); |
|
| 198 | ||
| 199 | foreach ($batches as $batch) { |
|
| 200 | $results = $this->client->sendMessageBatch([ |
|
| 201 | 'QueueUrl' => $url, |
|
| 202 | 'Entries' => $batch, |
|
| 203 | ]); |
|
| 204 | ||
| 205 | $map = function ($result) use ($messages) { |
|
| 206 | return $messages[$result['Id']]; |
|
| 207 | }; |
|
| 208 | ||
| 209 | $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: [])); |
|
| 210 | } |
|
| 211 | ||
| 212 | if (!empty($failed)) { |
|
| 213 | throw new FailedEnqueueException($this, $failed); |
|
| 214 | } |
|
| 215 | } |
|
| 216 | ||
| 217 | /** |
|
| 218 | * {@inheritdoc} |
|