| @@ 86-108 (lines=23) @@ | ||
| 83 | /** |
|
| 84 | * @param MessageInterface[] $messages |
|
| 85 | */ |
|
| 86 | public function acknowledge(array $messages) |
|
| 87 | { |
|
| 88 | $url = $this->getQueueUrl(); |
|
| 89 | $failed = []; |
|
| 90 | $batches = array_chunk($this->createDeleteEntries($messages), self::BATCHSIZE_DELETE); |
|
| 91 | ||
| 92 | foreach ($batches as $batch) { |
|
| 93 | $results = $this->client->deleteMessageBatch([ |
|
| 94 | 'QueueUrl' => $url, |
|
| 95 | 'Entries' => $batch, |
|
| 96 | ]); |
|
| 97 | ||
| 98 | $map = function ($result) use ($messages) { |
|
| 99 | return $messages[$result['Id']]; |
|
| 100 | }; |
|
| 101 | ||
| 102 | $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: [])); |
|
| 103 | } |
|
| 104 | ||
| 105 | if (!empty($failed)) { |
|
| 106 | throw new FailedAcknowledgementException($this, $failed); |
|
| 107 | } |
|
| 108 | } |
|
| 109 | ||
| 110 | /** |
|
| 111 | * @param MessageFactoryInterface $factory |
|
| @@ 161-183 (lines=23) @@ | ||
| 158 | /** |
|
| 159 | * @param MessageInterface[] $messages |
|
| 160 | */ |
|
| 161 | public function enqueue(array $messages) |
|
| 162 | { |
|
| 163 | $url = $this->getQueueUrl(); |
|
| 164 | $failed = []; |
|
| 165 | $batches = array_chunk($this->createEnqueueEntries($messages), self::BATCHSIZE_SEND); |
|
| 166 | ||
| 167 | foreach ($batches as $batch) { |
|
| 168 | $results = $this->client->sendMessageBatch([ |
|
| 169 | 'QueueUrl' => $url, |
|
| 170 | 'Entries' => $batch, |
|
| 171 | ]); |
|
| 172 | ||
| 173 | $map = function ($result) use ($messages) { |
|
| 174 | return $messages[$result['Id']]; |
|
| 175 | }; |
|
| 176 | ||
| 177 | $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: [])); |
|
| 178 | } |
|
| 179 | ||
| 180 | if (!empty($failed)) { |
|
| 181 | throw new FailedEnqueueException($this, $failed); |
|
| 182 | } |
|
| 183 | } |
|
| 184 | ||
| 185 | /** |
|
| 186 | * {@inheritdoc} |
|