@@ -64,8 +64,8 @@ discard block |
||
| 64 | 64 | $this->initialize(); |
| 65 | 65 | try { |
| 66 | 66 | $messagesWithDelay = []; |
| 67 | - foreach($messages as $message) { |
|
| 68 | - if($message->getDelay() > 0) { |
|
| 67 | + foreach ($messages as $message) { |
|
| 68 | + if ($message->getDelay() > 0) { |
|
| 69 | 69 | $messagesWithDelay[$message->getDelay()][] = $message; |
| 70 | 70 | continue; |
| 71 | 71 | } |
@@ -75,8 +75,8 @@ discard block |
||
| 75 | 75 | $this->channel->batch_basic_publish($msg, $this->exchangeConfig->getName(), $message->getName()); |
| 76 | 76 | } |
| 77 | 77 | $this->channel->publish_batch(); |
| 78 | - foreach($messagesWithDelay as $delay => $delayedMessages) { |
|
| 79 | - if(!isset($this->delayedQueueWriterRegistry[$delay])) { |
|
| 78 | + foreach ($messagesWithDelay as $delay => $delayedMessages) { |
|
| 79 | + if (!isset($this->delayedQueueWriterRegistry[$delay])) { |
|
| 80 | 80 | $this->delayedQueueWriterRegistry[$delay] = new DelayedQueueWriter( |
| 81 | 81 | $this->exchangeConfig->getName(), |
| 82 | 82 | $delay, |
@@ -86,8 +86,8 @@ discard block |
||
| 86 | 86 | } |
| 87 | 87 | $this->delayedQueueWriterRegistry[$delay]->write($delayedMessages); |
| 88 | 88 | } |
| 89 | - } catch(\Exception $exception) { |
|
| 90 | - $this->logger->error('Error writing messages: '.$exception->getMessage()); |
|
| 89 | + } catch (\Exception $exception) { |
|
| 90 | + $this->logger->error('Error writing messages: ' . $exception->getMessage()); |
|
| 91 | 91 | throw new WriterException($exception->getMessage(), $exception->getCode()); |
| 92 | 92 | } |
| 93 | 93 | } |
@@ -97,7 +97,7 @@ discard block |
||
| 97 | 97 | */ |
| 98 | 98 | protected function initialize() |
| 99 | 99 | { |
| 100 | - if($this->channel) { |
|
| 100 | + if ($this->channel) { |
|
| 101 | 101 | return; |
| 102 | 102 | } |
| 103 | 103 | $this->logger->info('Connecting to RabbitMQ'); |