Test Failed
Pull Request — master (#39)
by Aleksandr
06:37
created
RabbitMq/Consumer.php 1 patch
Spacing   +13 added lines, -15 removed lines patch added patch discarded remove patch
@@ -97,19 +97,18 @@  discard block
 block discarded – undo
97 97
 
98 98
     protected function setup(): Consumer
99 99
     {
100
-        foreach($this->queueConsumings as $index => $queueConsuming) {
100
+        foreach ($this->queueConsumings as $index => $queueConsuming) {
101 101
             $this->channel->basic_qos($queueConsuming->qosPrefetchSize, $queueConsuming->qosPrefetchCount, false);
102 102
 
103 103
             $consumerTag = $this->channel->basic_consume(
104 104
                 $queueConsuming->queueName,
105 105
                 $queueConsuming->consumerTag ?
106
-                    $queueConsuming->consumerTag :
107
-                    sprintf("PHPPROCESS_%s_%s_%s", gethostname(), getmypid(), $index),
106
+                    $queueConsuming->consumerTag : sprintf("PHPPROCESS_%s_%s_%s", gethostname(), getmypid(), $index),
108 107
                 $queueConsuming->noLocal,
109 108
                 $queueConsuming->noAck,
110 109
                 $queueConsuming->exclusive,
111 110
                 $queueConsuming->nowait,
112
-                function (AMQPMessage $message) use ($queueConsuming) {
111
+                function(AMQPMessage $message) use ($queueConsuming) {
113 112
                     $this->getExecuteCallbackStrategy($queueConsuming)->consumeCallback($message);
114 113
                 });
115 114
 
@@ -135,12 +134,11 @@  discard block
 block discarded – undo
135 134
         $this->queueConsumings[] = $queueConsuming;
136 135
         if (null === $executeCallbackStrategy) {
137 136
             $executeCallbackStrategy = null === $queueConsuming->batchCount ?
138
-                new SimpleExecuteCallbackStrategy() :
139
-                new BatchExecuteCallbackStrategy($queueConsuming->batchCount);
137
+                new SimpleExecuteCallbackStrategy() : new BatchExecuteCallbackStrategy($queueConsuming->batchCount);
140 138
         }
141 139
 
142 140
         $executeCallbackStrategy->setMessagesProccessor(new FnMessagesProcessor(
143
-            (function (array $messages) use ($queueConsuming) {
141
+            (function(array $messages) use ($queueConsuming) {
144 142
                 $logAmqpContext = ['queue' => $queueConsuming->queueName];
145 143
                 if ($this->getExecuteCallbackStrategy($queueConsuming)->canPrecessMultiMessages()) {
146 144
                     $logAmqpContext['messages'] = $messages;
@@ -183,11 +181,11 @@  discard block
 block discarded – undo
183 181
         $canPrecessMultiMessages = $executeCallbackStrategy->canPrecessMultiMessages();
184 182
         if ($canPrecessMultiMessages) {
185 183
             if (!$queueConsuming->callback instanceof BatchConsumerInterface) {
186
-                throw new \InvalidArgumentException('TODO '. $queueConsuming->queueName);
184
+                throw new \InvalidArgumentException('TODO '.$queueConsuming->queueName);
187 185
             }
188 186
         } else {
189 187
             if (!$queueConsuming->callback instanceof ConsumerInterface) {
190
-                throw new \InvalidArgumentException('TODO '. $queueConsuming->queueName);
188
+                throw new \InvalidArgumentException('TODO '.$queueConsuming->queueName);
191 189
             }
192 190
         }
193 191
 
@@ -247,7 +245,7 @@  discard block
 block discarded – undo
247 245
                     break;
248 246
                 }
249 247
             } catch (AMQPTimeoutException $e) {
250
-                foreach($this->executeCallbackStrategies as $executeCallbackStrategy) {
248
+                foreach ($this->executeCallbackStrategies as $executeCallbackStrategy) {
251 249
                     $executeCallbackStrategy->onCatchTimeout($e);
252 250
                 }
253 251
                 $now = new \DateTime();
@@ -313,7 +311,7 @@  discard block
 block discarded – undo
313 311
 
314 312
         if (!$queueConsuming->noAck) {
315 313
             $messages = array_combine(
316
-                array_map(fn ($message) => $message->getDeliveryTag(), $messages),
314
+                array_map(fn($message) => $message->getDeliveryTag(), $messages),
317 315
                 $messages
318 316
             );
319 317
 
@@ -332,9 +330,9 @@  discard block
 block discarded – undo
332 330
     {
333 331
         $executeCallbackStrategy = $this->getExecuteCallbackStrategy($queueConsuming);
334 332
 
335
-        $ack = !array_search(fn ($reply) => $reply !== null && $reply !== ConsumerInterface::MSG_ACK, $replies, true);
333
+        $ack = !array_search(fn($reply) => $reply !== null && $reply !== ConsumerInterface::MSG_ACK, $replies, true);
336 334
         if ($this->multiAck && count($messages) > 1 && $ack) {
337
-            $channels = array_map(fn ($message) => $message->getChannel(), $messages);
335
+            $channels = array_map(fn($message) => $message->getChannel(), $messages);
338 336
             if (count($channels) !== array_unique($channels)) { // all messages have same channel
339 337
                 throw new InvalidArgumentException('Messages can not be processed as multi ack with different channels');
340 338
             }
@@ -344,7 +342,7 @@  discard block
 block discarded – undo
344 342
             $executeCallbackStrategy->onMessageProcessed($message);
345 343
 
346 344
             return array_combine(
347
-                array_map(fn ($message) => $message->getDeliveryTag(), $messages),
345
+                array_map(fn($message) => $message->getDeliveryTag(), $messages),
348 346
                 array_fill(0, count($messages), ConsumerInterface::MSG_ACK)
349 347
             );
350 348
         } else {
@@ -386,7 +384,7 @@  discard block
 block discarded – undo
386 384
                 'content_type' => 'text/plain',
387 385
                 'correlation_id' => $message->get('correlation_id'),
388 386
             ]);
389
-            $message->getChannel()->basic_publish($replayMessage , '', $message->get('reply_to'));
387
+            $message->getChannel()->basic_publish($replayMessage, '', $message->get('reply_to'));
390 388
         } else {
391 389
             $this->logger->error('Rpc call send msg to queue which have not rpc reponse', [
392 390
                 'amqp' => ['message' => $message]
Please login to merge, or discard this patch.