@@ 86-108 (lines=23) @@ | ||
83 | /** |
|
84 | * @param MessageInterface[] $messages |
|
85 | */ |
|
86 | public function acknowledge(array $messages) |
|
87 | { |
|
88 | $url = $this->getQueueUrl(); |
|
89 | $failed = []; |
|
90 | $batches = array_chunk($this->createDeleteEntries($messages), self::BATCHSIZE_DELETE); |
|
91 | ||
92 | foreach ($batches as $batch) { |
|
93 | $results = $this->client->deleteMessageBatch([ |
|
94 | 'QueueUrl' => $url, |
|
95 | 'Entries' => $batch, |
|
96 | ]); |
|
97 | ||
98 | $map = function ($result) use ($messages) { |
|
99 | return $messages[$result['Id']]; |
|
100 | }; |
|
101 | ||
102 | $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: [])); |
|
103 | } |
|
104 | ||
105 | if (!empty($failed)) { |
|
106 | throw new FailedAcknowledgementException($this, $failed); |
|
107 | } |
|
108 | } |
|
109 | ||
110 | /** |
|
111 | * @param MessageFactoryInterface $factory |
|
@@ 161-183 (lines=23) @@ | ||
158 | /** |
|
159 | * @param MessageInterface[] $messages |
|
160 | */ |
|
161 | public function enqueue(array $messages) |
|
162 | { |
|
163 | $url = $this->getQueueUrl(); |
|
164 | $failed = []; |
|
165 | $batches = array_chunk($this->createEnqueueEntries($messages), self::BATCHSIZE_SEND); |
|
166 | ||
167 | foreach ($batches as $batch) { |
|
168 | $results = $this->client->sendMessageBatch([ |
|
169 | 'QueueUrl' => $url, |
|
170 | 'Entries' => $batch, |
|
171 | ]); |
|
172 | ||
173 | $map = function ($result) use ($messages) { |
|
174 | return $messages[$result['Id']]; |
|
175 | }; |
|
176 | ||
177 | $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: [])); |
|
178 | } |
|
179 | ||
180 | if (!empty($failed)) { |
|
181 | throw new FailedEnqueueException($this, $failed); |
|
182 | } |
|
183 | } |
|
184 | ||
185 | /** |
|
186 | * {@inheritdoc} |