@@ 86-108 (lines=23) @@ | ||
83 | /** |
|
84 | * @param MessageInterface[] $messages |
|
85 | */ |
|
86 | public function acknowledge(array $messages) |
|
87 | { |
|
88 | $url = $this->getQueueUrl(); |
|
89 | $failed = []; |
|
90 | $batches = array_chunk($this->createDeleteEntries($messages), self::BATCHSIZE_DELETE); |
|
91 | ||
92 | foreach ($batches as $batch) { |
|
93 | $results = $this->client->deleteMessageBatch([ |
|
94 | 'QueueUrl' => $url, |
|
95 | 'Entries' => $batch, |
|
96 | ]); |
|
97 | ||
98 | $map = function ($result) use ($messages) { |
|
99 | return $messages[$result['Id']]; |
|
100 | }; |
|
101 | ||
102 | $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: [])); |
|
103 | } |
|
104 | ||
105 | if (!empty($failed)) { |
|
106 | throw new FailedAcknowledgementException($this, $failed); |
|
107 | } |
|
108 | } |
|
109 | ||
110 | /** |
|
111 | * @param MessageInterface[] $messages |
|
@@ 115-137 (lines=23) @@ | ||
112 | * |
|
113 | * @throws FailedAcknowledgementException|void |
|
114 | */ |
|
115 | public function reject(array $messages) |
|
116 | { |
|
117 | $url = $this->getQueueUrl(); |
|
118 | $failed = []; |
|
119 | $batches = array_chunk($this->createRejectEntries($messages), self::BATCHSIZE_DELETE); |
|
120 | ||
121 | foreach ($batches as $batch) { |
|
122 | $results = $this->client->changeMessageVisibilityBatch([ |
|
123 | 'QueueUrl' => $url, |
|
124 | 'Entries' => $batch, |
|
125 | ]); |
|
126 | ||
127 | $map = function ($result) use ($messages) { |
|
128 | return $messages[$result['Id']]; |
|
129 | }; |
|
130 | ||
131 | $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: [])); |
|
132 | } |
|
133 | ||
134 | if (!empty($failed)) { |
|
135 | throw new FailedAcknowledgementException($this, $failed); |
|
136 | } |
|
137 | } |
|
138 | ||
139 | /** |
|
140 | * @param MessageFactoryInterface $factory |
|
@@ 193-215 (lines=23) @@ | ||
190 | /** |
|
191 | * @param MessageInterface[] $messages |
|
192 | */ |
|
193 | public function enqueue(array $messages) |
|
194 | { |
|
195 | $url = $this->getQueueUrl(); |
|
196 | $failed = []; |
|
197 | $batches = array_chunk($this->createEnqueueEntries($messages), self::BATCHSIZE_SEND); |
|
198 | ||
199 | foreach ($batches as $batch) { |
|
200 | $results = $this->client->sendMessageBatch([ |
|
201 | 'QueueUrl' => $url, |
|
202 | 'Entries' => $batch, |
|
203 | ]); |
|
204 | ||
205 | $map = function ($result) use ($messages) { |
|
206 | return $messages[$result['Id']]; |
|
207 | }; |
|
208 | ||
209 | $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: [])); |
|
210 | } |
|
211 | ||
212 | if (!empty($failed)) { |
|
213 | throw new FailedEnqueueException($this, $failed); |
|
214 | } |
|
215 | } |
|
216 | ||
217 | /** |
|
218 | * {@inheritdoc} |