@@ 198-222 (lines=25) @@ | ||
195 | * @return mixed|void |
|
196 | * @throws AMQPProtocolChannelException |
|
197 | */ |
|
198 | public function publish(string $message, string $routingKey = '') |
|
199 | { |
|
200 | if ($this->attributes['auto_create'] === true) { |
|
201 | $this->create(); |
|
202 | $this->bind(); |
|
203 | } |
|
204 | try { |
|
205 | $this->getChannel()->basic_publish( |
|
206 | new AMQPMessage($message), |
|
207 | $this->attributes['name'], |
|
208 | $routingKey, |
|
209 | true |
|
210 | ); |
|
211 | $this->retryCount = 0; |
|
212 | } catch (AMQPChannelClosedException $exception) { |
|
213 | $this->retryCount++; |
|
214 | // Retry publishing with re-connect |
|
215 | if ($this->retryCount < self::MAX_RETRIES) { |
|
216 | $this->getConnection()->reconnect(); |
|
217 | $this->publish($message, $routingKey); |
|
218 | return; |
|
219 | } |
|
220 | throw $exception; |
|
221 | } |
|
222 | } |
|
223 | } |
|
224 |
@@ 254-280 (lines=27) @@ | ||
251 | * @return mixed|void |
|
252 | * @throws AMQPProtocolChannelException |
|
253 | */ |
|
254 | public function publish(string $message, string $routingKey = '') |
|
255 | { |
|
256 | if ($this->attributes['auto_create'] === true) { |
|
257 | $this->create(); |
|
258 | $this->bind(); |
|
259 | } |
|
260 | ||
261 | try { |
|
262 | $this->getChannel() |
|
263 | ->basic_publish( |
|
264 | new AMQPMessage($message), |
|
265 | '', |
|
266 | $this->attributes['name'], |
|
267 | true |
|
268 | ); |
|
269 | $this->retryCount = 0; |
|
270 | } catch (AMQPChannelClosedException $exception) { |
|
271 | $this->retryCount++; |
|
272 | // Retry publishing with re-connect |
|
273 | if ($this->retryCount < self::MAX_RETRIES) { |
|
274 | $this->getConnection()->reconnect(); |
|
275 | $this->publish($message, $routingKey); |
|
276 | return; |
|
277 | } |
|
278 | throw $exception; |
|
279 | } |
|
280 | } |
|
281 | ||
282 | /** |
|
283 | * {@inheritdoc} |