Completed
Pull Request — master (#39)
by Aleksandr
06:46
created
DependencyInjection/Compiler/ConsumersListCompilerPass.php 1 patch
Spacing   +1 added lines, -1 removed lines patch added patch discarded remove patch
@@ -13,7 +13,7 @@
 block discarded – undo
13 13
     public function process(ContainerBuilder $container)
14 14
     {
15 15
         $taggedConsumers = $container->findTaggedServiceIds('old_sound_rabbit_mq.consumer');
16
-        $consumerNames = array_map(fn ($tags) => $tags[0]['name'], $taggedConsumers);
16
+        $consumerNames = array_map(fn($tags) => $tags[0]['name'], $taggedConsumers);
17 17
         $container->setParameter('old_sound_rabbit_mq.allowed_consumer_names', $consumerNames);
18 18
     }
19 19
 }
20 20
\ No newline at end of file
Please login to merge, or discard this patch.
DependencyInjection/Compiler/ExtraContextLoggerCompilerPass.php 1 patch
Spacing   +2 added lines, -2 removed lines patch added patch discarded remove patch
@@ -22,7 +22,7 @@  discard block
 block discarded – undo
22 22
 
23 23
         foreach ($taggedProducers as $id => $tags) {
24 24
             $producerName = $tags[0]['name'];
25
-            $originLogger = $id . '.logger';
25
+            $originLogger = $id.'.logger';
26 26
 
27 27
             if ($container->hasDefinition($originLogger)) {
28 28
                 $consumerDef = $container->getDefinition($id);
@@ -34,7 +34,7 @@  discard block
 block discarded – undo
34 34
 
35 35
         foreach ($taggedConsumers as $id => $tags) {
36 36
             $consumerName = $tags[0]['name'];
37
-            $originLogger = $id . '.logger';
37
+            $originLogger = $id.'.logger';
38 38
 
39 39
             if ($container->hasDefinition($originLogger)) {
40 40
                 $consumerDef = $container->getDefinition($id);
Please login to merge, or discard this patch.
DependencyInjection/OldSoundRabbitMqExtension.php 1 patch
Spacing   +8 added lines, -9 removed lines patch added patch discarded remove patch
@@ -51,7 +51,7 @@  discard block
 block discarded – undo
51 51
     {
52 52
         $this->container = $container;
53 53
 
54
-        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
54
+        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__.'/../Resources/config')));
55 55
         $loader->load('rabbitmq.xml');
56 56
 
57 57
         $configuration = $this->getConfiguration($configs, $container);
@@ -103,11 +103,11 @@  discard block
 block discarded – undo
103 103
      */
104 104
     protected function loadExchanges($exchanges): array
105 105
     {
106
-        return array_map(function ($exchange) {
106
+        return array_map(function($exchange) {
107 107
             $exchangeDeclaration = new Definition(ExchangeDeclaration::class);
108 108
             $exchangeDeclaration->setProperties($exchange);
109 109
 
110
-            foreach($this->loadBindings($exchange['bindings'], $exchange['name'], null) as $binding) {
110
+            foreach ($this->loadBindings($exchange['bindings'], $exchange['name'], null) as $binding) {
111 111
                 $this->container->getDefinition('old_sound_rabbit_mq.declaration_registry')->addMethodCall('addBinding', [$binding]);
112 112
             }
113 113
 
@@ -121,7 +121,7 @@  discard block
 block discarded – undo
121 121
      */
122 122
     protected function loadQueues($queues): array
123 123
     {
124
-        return array_map(function ($queue, $key) use ($queues) {
124
+        return array_map(function($queue, $key) use ($queues) {
125 125
             $queue['name'] = $queue['name'] ?? $key;
126 126
             $queueDeclaration = new Definition(QueueDeclaration::class);
127 127
             $queueDeclaration->setProperties($queue);
@@ -260,7 +260,7 @@  discard block
 block discarded – undo
260 260
 
261 261
         foreach ($this->config['consumers'] as $consumerName => $consumer) {
262 262
             $alias = sprintf('old_sound_rabbit_mq.consumer.%s', $consumerName);
263
-            $serializerAlias = sprintf('old_sound_rabbit_mq.consumer.%s.serializer', $consumerName);// TODO
263
+            $serializerAlias = sprintf('old_sound_rabbit_mq.consumer.%s.serializer', $consumerName); // TODO
264 264
 
265 265
             $connectionName = isset($consumer['connection']) ? $consumer['connection'] : 'default';
266 266
 
@@ -271,7 +271,7 @@  discard block
 block discarded – undo
271 271
             $definition->addTag('old_sound_rabbit_mq.consumer', ['name' => $consumerName]);
272 272
             // TODO $this->container->setAlias($serializerAlias, SerializerInterface::class);
273 273
             // $definition->addMethodCall('setSerializer', [new Reference($serializerAlias)]);}
274
-            foreach($consumer['consumeQueues'] as $index => $consumeQueue) {
274
+            foreach ($consumer['consumeQueues'] as $index => $consumeQueue) {
275 275
                 $queueConsumingDef = new Definition(QueueConsuming::class);
276 276
                 $queueConsumingDef->setProperties([
277 277
                     'queueName' => $consumeQueue['queue'],
@@ -283,8 +283,7 @@  discard block
 block discarded – undo
283 283
                 ]);
284 284
 
285 285
                 $executeCallbackStrategyRef = isset($consumeQueue['batch_count']) ?
286
-                    new Definition(BatchExecuteCallbackStrategy::class, [$consumeQueue['batch_count']]) :
287
-                    new Reference($simpleExecuteCallbackStrategyAlias);
286
+                    new Definition(BatchExecuteCallbackStrategy::class, [$consumeQueue['batch_count']]) : new Reference($simpleExecuteCallbackStrategyAlias);
288 287
 
289 288
                 $definition->addMethodCall('consumeQueue', [
290 289
                     $queueConsumingDef,
@@ -445,7 +444,7 @@  discard block
 block discarded – undo
445 444
         $definition->addTag('monolog.logger', [
446 445
             'channel' => 'phpamqplib'
447 446
         ]);
448
-        $loggerAlias = $definitionAlias . '.loggeer';
447
+        $loggerAlias = $definitionAlias.'.loggeer';
449 448
         $this->container->setAlias($loggerAlias, 'logger');
450 449
         $definition->addMethodCall('setLogger', [new Reference($loggerAlias, ContainerInterface::IGNORE_ON_INVALID_REFERENCE)]);
451 450
     }
Please login to merge, or discard this patch.
EventListener/PcntlSignalDispatchSubscriber.php 1 patch
Spacing   +1 added lines, -1 removed lines patch added patch discarded remove patch
@@ -21,7 +21,7 @@
 block discarded – undo
21 21
 
22 22
     public function __construct(Consumer $consumer)
23 23
     {
24
-        $stopConsumer = function () use ($consumer) {
24
+        $stopConsumer = function() use ($consumer) {
25 25
             // Process current message, then halt consumer
26 26
             $consumer->forceStopConsumer();
27 27
             // Halt consumer if waiting for a new message from the queue
Please login to merge, or discard this patch.
RabbitMq/AMQPConnectionFactory.php 1 patch
Spacing   +5 added lines, -5 removed lines patch added patch discarded remove patch
@@ -31,7 +31,7 @@  discard block
 block discarded – undo
31 31
     {
32 32
         $parameters = $this->parseUrl(array_merge($this->defaultParameters, $parameters));
33 33
         if (is_array($parameters['ssl_context'])) {
34
-            $parameters['ssl_context'] = ! empty($parameters['ssl_context'])
34
+            $parameters['ssl_context'] = !empty($parameters['ssl_context'])
35 35
                 ? stream_context_create(array('ssl' => $parameters['ssl_context']))
36 36
                 : null;
37 37
         }
@@ -46,10 +46,10 @@  discard block
 block discarded – undo
46 46
             $parameters['user'],
47 47
             $parameters['password'],
48 48
             $parameters['vhost'],
49
-            false,      // insist
49
+            false, // insist
50 50
             'AMQPLAIN', // login_method
51
-            null,       // login_response
52
-            'en_US',    // locale
51
+            null, // login_response
52
+            'en_US', // locale
53 53
         ];
54 54
 
55 55
         $isSocketConnection = $class == \PhpAmqpLib\Connection\AMQPSocketConnection::class || is_subclass_of($class, \PhpAmqpLib\Connection\AMQPSocketConnection::class);
@@ -93,7 +93,7 @@  discard block
 block discarded – undo
93 93
             $parameters['host'] = urldecode($url['host']);
94 94
         }
95 95
         if (isset($url['port'])) {
96
-            $parameters['port'] = (int)$url['port'];
96
+            $parameters['port'] = (int) $url['port'];
97 97
         }
98 98
         if (isset($url['user'])) {
99 99
             $parameters['user'] = urldecode($url['user']);
Please login to merge, or discard this patch.
RabbitMq/RpcClient.php 1 patch
Spacing   +1 added lines, -1 removed lines patch added patch discarded remove patch
@@ -124,7 +124,7 @@
 block discarded – undo
124 124
         }
125 125
 
126 126
         $replices = [];
127
-        foreach($this->messages as $message) {
127
+        foreach ($this->messages as $message) {
128 128
             /** @var AMQPMessage $message */
129 129
             if (!$message->has('correlation_id')) {
130 130
                 $this->logger->error('unexpected message. rpc replies have no correlation_id ');
Please login to merge, or discard this patch.
RabbitMq/Consumer.php 1 patch
Spacing   +12 added lines, -13 removed lines patch added patch discarded remove patch
@@ -95,19 +95,18 @@  discard block
 block discarded – undo
95 95
 
96 96
     protected function setup(): Consumer
97 97
     {
98
-        foreach($this->queueConsumings as $index => $queueConsuming) {
98
+        foreach ($this->queueConsumings as $index => $queueConsuming) {
99 99
             $this->channel->basic_qos($queueConsuming->qosPrefetchSize, $queueConsuming->qosPrefetchCount, false);
100 100
 
101 101
             $consumerTag = $this->channel->basic_consume(
102 102
                 $queueConsuming->queueName,
103 103
                 $queueConsuming->consumerTag ?
104
-                    $queueConsuming->consumerTag :
105
-                    sprintf("PHPPROCESS_%s_%s_%s", gethostname(), getmypid(), $index),
104
+                    $queueConsuming->consumerTag : sprintf("PHPPROCESS_%s_%s_%s", gethostname(), getmypid(), $index),
106 105
                 $queueConsuming->noLocal,
107 106
                 $queueConsuming->noAck,
108 107
                 $queueConsuming->exclusive,
109 108
                 $queueConsuming->nowait,
110
-                function (AMQPMessage $message) use ($queueConsuming) {
109
+                function(AMQPMessage $message) use ($queueConsuming) {
111 110
                     $this->getExecuteCallbackStrategy($queueConsuming)->consumeCallback($message);
112 111
                 });
113 112
 
@@ -122,7 +121,7 @@  discard block
 block discarded – undo
122 121
     {
123 122
         $this->queueConsumings[] = $queueConsuming;
124 123
         $executeCallbackStrategy->setMessagesProccessor(new FnMessagesProcessor(
125
-            (function (array $messages) use ($queueConsuming) {
124
+            (function(array $messages) use ($queueConsuming) {
126 125
                 $logAmqpContext = ['queue' => $queueConsuming->queueName];
127 126
                 if ($this->getExecuteCallbackStrategy($queueConsuming)->canPrecessMultiMessages()) {
128 127
                     $logAmqpContext['messages'] = $messages;
@@ -165,11 +164,11 @@  discard block
 block discarded – undo
165 164
         $canPrecessMultiMessages = $executeCallbackStrategy->canPrecessMultiMessages();
166 165
         if ($canPrecessMultiMessages) {
167 166
             if (!$queueConsuming->callback instanceof BatchConsumerInterface) {
168
-                throw new \InvalidArgumentException('TODO '. $queueConsuming->queueName);
167
+                throw new \InvalidArgumentException('TODO '.$queueConsuming->queueName);
169 168
             }
170 169
         } else {
171 170
             if (!$queueConsuming->callback instanceof ConsumerInterface) {
172
-                throw new \InvalidArgumentException('TODO '. $queueConsuming->queueName);
171
+                throw new \InvalidArgumentException('TODO '.$queueConsuming->queueName);
173 172
             }
174 173
         }
175 174
 
@@ -229,7 +228,7 @@  discard block
 block discarded – undo
229 228
                     break;
230 229
                 }
231 230
             } catch (AMQPTimeoutException $e) {
232
-                foreach($this->executeCallbackStrategies as $executeCallbackStrategy) {
231
+                foreach ($this->executeCallbackStrategies as $executeCallbackStrategy) {
233 232
                     $executeCallbackStrategy->onCatchTimeout($e);
234 233
                 }
235 234
                 $now = new \DateTime();
@@ -295,7 +294,7 @@  discard block
 block discarded – undo
295 294
 
296 295
         if (!$queueConsuming->noAck) {
297 296
             $messages = array_combine(
298
-                array_map(fn ($message) => $message->getDeliveryTag(), $messages),
297
+                array_map(fn($message) => $message->getDeliveryTag(), $messages),
299 298
                 $messages
300 299
             );
301 300
 
@@ -314,9 +313,9 @@  discard block
 block discarded – undo
314 313
     {
315 314
         $executeCallbackStrategy = $this->getExecuteCallbackStrategy($queueConsuming);
316 315
 
317
-        $ack = !array_search(fn ($reply) => $reply !== null && $reply !== ConsumerInterface::MSG_ACK, $replies, true);
316
+        $ack = !array_search(fn($reply) => $reply !== null && $reply !== ConsumerInterface::MSG_ACK, $replies, true);
318 317
         if ($this->multiAck && count($messages) > 1 && $ack) {
319
-            $channels = array_map(fn ($message) => $message->getChannel(), $messages);
318
+            $channels = array_map(fn($message) => $message->getChannel(), $messages);
320 319
             if (count($channels) !== array_unique($channels)) { // all messages have same channel
321 320
                 throw new InvalidArgumentException('Messages can not be processed as multi ack with different channels');
322 321
             }
@@ -326,7 +325,7 @@  discard block
 block discarded – undo
326 325
             $executeCallbackStrategy->onMessageProcessed($message);
327 326
 
328 327
             return array_combine(
329
-                array_map(fn ($message) => $message->getDeliveryTag(), $messages),
328
+                array_map(fn($message) => $message->getDeliveryTag(), $messages),
330 329
                 array_fill(0, count($messages), ConsumerInterface::MSG_ACK)
331 330
             );
332 331
         } else {
@@ -368,7 +367,7 @@  discard block
 block discarded – undo
368 367
                 'content_type' => 'text/plain',
369 368
                 'correlation_id' => $message->get('correlation_id'),
370 369
             ]);
371
-            $message->getChannel()->basic_publish($replayMessage , '', $message->get('reply_to'));
370
+            $message->getChannel()->basic_publish($replayMessage, '', $message->get('reply_to'));
372 371
         } else {
373 372
             $this->logger->error('Rpc call send msg to queue which have not rpc reponse', [
374 373
                 'amqp' => ['message' => $message]
Please login to merge, or discard this patch.
Serializer/JsonMessageBodySerializer.php 1 patch
Spacing   +1 added lines, -1 removed lines patch added patch discarded remove patch
@@ -33,7 +33,7 @@
 block discarded – undo
33 33
                 'message' => $body->getMessage(),
34 34
             ]);
35 35
         }
36
-        return json_encode($body);// $this->serializer->serialize($body, 'json');
36
+        return json_encode($body); // $this->serializer->serialize($body, 'json');
37 37
     }
38 38
 
39 39
     public function deserialize(string $body)
Please login to merge, or discard this patch.
Command/ConsumerCommand.php 1 patch
Spacing   +1 added lines, -1 removed lines patch added patch discarded remove patch
@@ -113,7 +113,7 @@
 block discarded – undo
113 113
             new ConsoleLogger($output)
114 114
         );
115 115
         $declarationRegistry = $this->container->get('old_sound_rabbit_mq.declaration_registry');
116
-        foreach($consumer->getQueueConsumings() as $queueConsuming) {
116
+        foreach ($consumer->getQueueConsumings() as $queueConsuming) {
117 117
             $declarator->declareForQueueDeclaration($queueConsuming->queueName, $declarationRegistry);
118 118
         }
119 119
     }
Please login to merge, or discard this patch.