Completed
Pull Request — master (#39)
by Aleksandr
06:46
created
EventListener/PcntlSignalDispatchSubscriber.php 1 patch
Spacing   +1 added lines, -1 removed lines patch added patch discarded remove patch
@@ -21,7 +21,7 @@
 block discarded – undo
21 21
 
22 22
     public function __construct(Consumer $consumer)
23 23
     {
24
-        $stopConsumer = function () use ($consumer) {
24
+        $stopConsumer = function() use ($consumer) {
25 25
             // Process current message, then halt consumer
26 26
             $consumer->forceStopConsumer();
27 27
             // Halt consumer if waiting for a new message from the queue
Please login to merge, or discard this patch.
RabbitMq/AMQPConnectionFactory.php 1 patch
Spacing   +5 added lines, -5 removed lines patch added patch discarded remove patch
@@ -31,7 +31,7 @@  discard block
 block discarded – undo
31 31
     {
32 32
         $parameters = $this->parseUrl(array_merge($this->defaultParameters, $parameters));
33 33
         if (is_array($parameters['ssl_context'])) {
34
-            $parameters['ssl_context'] = ! empty($parameters['ssl_context'])
34
+            $parameters['ssl_context'] = !empty($parameters['ssl_context'])
35 35
                 ? stream_context_create(array('ssl' => $parameters['ssl_context']))
36 36
                 : null;
37 37
         }
@@ -46,10 +46,10 @@  discard block
 block discarded – undo
46 46
             $parameters['user'],
47 47
             $parameters['password'],
48 48
             $parameters['vhost'],
49
-            false,      // insist
49
+            false, // insist
50 50
             'AMQPLAIN', // login_method
51
-            null,       // login_response
52
-            'en_US',    // locale
51
+            null, // login_response
52
+            'en_US', // locale
53 53
         ];
54 54
 
55 55
         $isSocketConnection = $class == \PhpAmqpLib\Connection\AMQPSocketConnection::class || is_subclass_of($class, \PhpAmqpLib\Connection\AMQPSocketConnection::class);
@@ -93,7 +93,7 @@  discard block
 block discarded – undo
93 93
             $parameters['host'] = urldecode($url['host']);
94 94
         }
95 95
         if (isset($url['port'])) {
96
-            $parameters['port'] = (int)$url['port'];
96
+            $parameters['port'] = (int) $url['port'];
97 97
         }
98 98
         if (isset($url['user'])) {
99 99
             $parameters['user'] = urldecode($url['user']);
Please login to merge, or discard this patch.
RabbitMq/RpcClient.php 1 patch
Spacing   +1 added lines, -1 removed lines patch added patch discarded remove patch
@@ -124,7 +124,7 @@
 block discarded – undo
124 124
         }
125 125
 
126 126
         $replices = [];
127
-        foreach($this->messages as $message) {
127
+        foreach ($this->messages as $message) {
128 128
             /** @var AMQPMessage $message */
129 129
             if (!$message->has('correlation_id')) {
130 130
                 $this->logger->error('unexpected message. rpc replies have no correlation_id ');
Please login to merge, or discard this patch.
RabbitMq/Consumer.php 1 patch
Spacing   +12 added lines, -13 removed lines patch added patch discarded remove patch
@@ -95,19 +95,18 @@  discard block
 block discarded – undo
95 95
 
96 96
     protected function setup(): Consumer
97 97
     {
98
-        foreach($this->queueConsumings as $index => $queueConsuming) {
98
+        foreach ($this->queueConsumings as $index => $queueConsuming) {
99 99
             $this->channel->basic_qos($queueConsuming->qosPrefetchSize, $queueConsuming->qosPrefetchCount, false);
100 100
 
101 101
             $consumerTag = $this->channel->basic_consume(
102 102
                 $queueConsuming->queueName,
103 103
                 $queueConsuming->consumerTag ?
104
-                    $queueConsuming->consumerTag :
105
-                    sprintf("PHPPROCESS_%s_%s_%s", gethostname(), getmypid(), $index),
104
+                    $queueConsuming->consumerTag : sprintf("PHPPROCESS_%s_%s_%s", gethostname(), getmypid(), $index),
106 105
                 $queueConsuming->noLocal,
107 106
                 $queueConsuming->noAck,
108 107
                 $queueConsuming->exclusive,
109 108
                 $queueConsuming->nowait,
110
-                function (AMQPMessage $message) use ($queueConsuming) {
109
+                function(AMQPMessage $message) use ($queueConsuming) {
111 110
                     $this->getExecuteCallbackStrategy($queueConsuming)->consumeCallback($message);
112 111
                 });
113 112
 
@@ -122,7 +121,7 @@  discard block
 block discarded – undo
122 121
     {
123 122
         $this->queueConsumings[] = $queueConsuming;
124 123
         $executeCallbackStrategy->setMessagesProccessor(new FnMessagesProcessor(
125
-            (function (array $messages) use ($queueConsuming) {
124
+            (function(array $messages) use ($queueConsuming) {
126 125
                 $logAmqpContext = ['queue' => $queueConsuming->queueName];
127 126
                 if ($this->getExecuteCallbackStrategy($queueConsuming)->canPrecessMultiMessages()) {
128 127
                     $logAmqpContext['messages'] = $messages;
@@ -165,11 +164,11 @@  discard block
 block discarded – undo
165 164
         $canPrecessMultiMessages = $executeCallbackStrategy->canPrecessMultiMessages();
166 165
         if ($canPrecessMultiMessages) {
167 166
             if (!$queueConsuming->callback instanceof BatchConsumerInterface) {
168
-                throw new \InvalidArgumentException('TODO '. $queueConsuming->queueName);
167
+                throw new \InvalidArgumentException('TODO '.$queueConsuming->queueName);
169 168
             }
170 169
         } else {
171 170
             if (!$queueConsuming->callback instanceof ConsumerInterface) {
172
-                throw new \InvalidArgumentException('TODO '. $queueConsuming->queueName);
171
+                throw new \InvalidArgumentException('TODO '.$queueConsuming->queueName);
173 172
             }
174 173
         }
175 174
 
@@ -229,7 +228,7 @@  discard block
 block discarded – undo
229 228
                     break;
230 229
                 }
231 230
             } catch (AMQPTimeoutException $e) {
232
-                foreach($this->executeCallbackStrategies as $executeCallbackStrategy) {
231
+                foreach ($this->executeCallbackStrategies as $executeCallbackStrategy) {
233 232
                     $executeCallbackStrategy->onCatchTimeout($e);
234 233
                 }
235 234
                 $now = new \DateTime();
@@ -295,7 +294,7 @@  discard block
 block discarded – undo
295 294
 
296 295
         if (!$queueConsuming->noAck) {
297 296
             $messages = array_combine(
298
-                array_map(fn ($message) => $message->getDeliveryTag(), $messages),
297
+                array_map(fn($message) => $message->getDeliveryTag(), $messages),
299 298
                 $messages
300 299
             );
301 300
 
@@ -314,9 +313,9 @@  discard block
 block discarded – undo
314 313
     {
315 314
         $executeCallbackStrategy = $this->getExecuteCallbackStrategy($queueConsuming);
316 315
 
317
-        $ack = !array_search(fn ($reply) => $reply !== null && $reply !== ConsumerInterface::MSG_ACK, $replies, true);
316
+        $ack = !array_search(fn($reply) => $reply !== null && $reply !== ConsumerInterface::MSG_ACK, $replies, true);
318 317
         if ($this->multiAck && count($messages) > 1 && $ack) {
319
-            $channels = array_map(fn ($message) => $message->getChannel(), $messages);
318
+            $channels = array_map(fn($message) => $message->getChannel(), $messages);
320 319
             if (count($channels) !== array_unique($channels)) { // all messages have same channel
321 320
                 throw new InvalidArgumentException('Messages can not be processed as multi ack with different channels');
322 321
             }
@@ -326,7 +325,7 @@  discard block
 block discarded – undo
326 325
             $executeCallbackStrategy->onMessageProcessed($message);
327 326
 
328 327
             return array_combine(
329
-                array_map(fn ($message) => $message->getDeliveryTag(), $messages),
328
+                array_map(fn($message) => $message->getDeliveryTag(), $messages),
330 329
                 array_fill(0, count($messages), ConsumerInterface::MSG_ACK)
331 330
             );
332 331
         } else {
@@ -368,7 +367,7 @@  discard block
 block discarded – undo
368 367
                 'content_type' => 'text/plain',
369 368
                 'correlation_id' => $message->get('correlation_id'),
370 369
             ]);
371
-            $message->getChannel()->basic_publish($replayMessage , '', $message->get('reply_to'));
370
+            $message->getChannel()->basic_publish($replayMessage, '', $message->get('reply_to'));
372 371
         } else {
373 372
             $this->logger->error('Rpc call send msg to queue which have not rpc reponse', [
374 373
                 'amqp' => ['message' => $message]
Please login to merge, or discard this patch.
Serializer/JsonMessageBodySerializer.php 1 patch
Spacing   +1 added lines, -1 removed lines patch added patch discarded remove patch
@@ -33,7 +33,7 @@
 block discarded – undo
33 33
                 'message' => $body->getMessage(),
34 34
             ]);
35 35
         }
36
-        return json_encode($body);// $this->serializer->serialize($body, 'json');
36
+        return json_encode($body); // $this->serializer->serialize($body, 'json');
37 37
     }
38 38
 
39 39
     public function deserialize(string $body)
Please login to merge, or discard this patch.
Command/ConsumerCommand.php 1 patch
Spacing   +1 added lines, -1 removed lines patch added patch discarded remove patch
@@ -113,7 +113,7 @@
 block discarded – undo
113 113
             new ConsoleLogger($output)
114 114
         );
115 115
         $declarationRegistry = $this->container->get('old_sound_rabbit_mq.declaration_registry');
116
-        foreach($consumer->getQueueConsumings() as $queueConsuming) {
116
+        foreach ($consumer->getQueueConsumings() as $queueConsuming) {
117 117
             $declarator->declareForQueueDeclaration($queueConsuming->queueName, $declarationRegistry);
118 118
         }
119 119
     }
Please login to merge, or discard this patch.
Command/DeclareCommand.php 1 patch
Spacing   +1 added lines, -1 removed lines patch added patch discarded remove patch
@@ -42,7 +42,7 @@
 block discarded – undo
42 42
 
43 43
         $connection = $input->getArgument('connection');
44 44
         $channelAlias = sprintf('old_sound_rabbit_mq.channel.%s', $connection);
45
-        if(!$this->container->has($channelAlias)) {
45
+        if (!$this->container->has($channelAlias)) {
46 46
             throw new InvalidOptionException('Connection is not exist');
47 47
         };
48 48
 
Please login to merge, or discard this patch.
Declarations/DeclarationsRegistry.php 1 patch
Spacing   +1 added lines, -1 removed lines patch added patch discarded remove patch
@@ -35,7 +35,7 @@
 block discarded – undo
35 35
      */
36 36
     public function getBindingsByExchange(ExchangeDeclaration $exchange): array
37 37
     {
38
-        return array_filter($this->bindings, function ($binding) use ($exchange) {
38
+        return array_filter($this->bindings, function($binding) use ($exchange) {
39 39
             return $binding->exchange === $exchange->name || ($binding->destinationIsExchange && $binding->destination === $exchange->name);
40 40
         });
41 41
     }
Please login to merge, or discard this patch.
Declarations/QueueDeclaration.php 1 patch
Spacing   +1 added lines, -1 removed lines patch added patch discarded remove patch
@@ -38,7 +38,7 @@
 block discarded – undo
38 38
      */
39 39
     public function declure() {
40 40
         foreach ($this->queues as $name => $options) {
41
-            list($queueName, ,) = $this->getChannel()->queue_declare($name, $options['passive'],
41
+            list($queueName,,) = $this->getChannel()->queue_declare($name, $options['passive'],
42 42
                 $options['durable'], $options['exclusive'],
43 43
                 $options['auto_delete'], $options['nowait'],
44 44
                 $options['arguments'], $options['ticket']);
Please login to merge, or discard this patch.