Completed
Pull Request — master (#39)
by Aleksandr
06:46
created

OldSoundRabbitMqExtension::loadMultipleConsumers()   F

Complexity

Conditions 16
Paths 4098

Size

Total Lines 81
Code Lines 53

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 34
CRAP Score 25.4786

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 16
eloc 53
c 2
b 0
f 0
nc 4098
nop 0
dl 0
loc 81
ccs 34
cts 51
cp 0.6667
crap 25.4786
rs 1.4

1 Method

Rating   Name   Duplication   Size   Complexity  
A OldSoundRabbitMqExtension::createChannelReference() 0 3 1

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use OldSound\RabbitMqBundle\Consumer\ConsumersRegistry;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\Consumer\ConsumersRegistry was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
6
use OldSound\RabbitMqBundle\Declarations\DeclarationsRegistry;
7
use OldSound\RabbitMqBundle\Declarations\QueueConsuming;
8
use OldSound\RabbitMqBundle\Declarations\BindingDeclaration;
9
use OldSound\RabbitMqBundle\Declarations\ExchangeDeclaration;
10
use OldSound\RabbitMqBundle\Declarations\QueueDeclaration;
11
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\SimpleExecuteCallbackStrategy;
12
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\BatchExecuteCallbackStrategy;
13
use PhpAmqpLib\Channel\AMQPChannel;
14
use PhpAmqpLib\Connection\AbstractConnection;
15
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
16
use Symfony\Component\DependencyInjection\Argument\TaggedIteratorArgument;
17
use Symfony\Component\DependencyInjection\ContainerInterface;
18
use Symfony\Component\DependencyInjection\Extension\Extension;
19
use Symfony\Component\DependencyInjection\ContainerBuilder;
20
use Symfony\Component\DependencyInjection\Definition;
21
use Symfony\Component\DependencyInjection\Reference;
22
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
23
use Symfony\Component\Config\FileLocator;
24
use Symfony\Component\HttpKernel\Log\Logger;
25
use Symfony\Component\Serializer\SerializerInterface;
26
27
/**+
28
 * OldSoundRabbitMqExtension.
29
 *
30
 * @author Alvaro Videla
31
 * @author Marc Weistroff <[email protected]>
32
 */
33
class OldSoundRabbitMqExtension extends Extension
34
{
35
    /**
36 31
     * @var ContainerBuilder
37
     */
38 31
    private $container;
39
40 31
    /**
41 31
     * @var Boolean Whether the data collector is enabled
42
     */
43 31
    private $collectorEnabled;
44 31
45
    private $channelIds = [];
46 31
    private $groups = [];
0 ignored issues
show
introduced by
The private property $groups is not used, and could be removed.
Loading history...
47
48 31
    private $config = [];
49 31
50 31
    public function load(array $configs, ContainerBuilder $container)
51 31
    {
52 31
        $this->container = $container;
53 31
54 31
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
55 31
        $loader->load('rabbitmq.xml');
56 31
57 31
        $configuration = $this->getConfiguration($configs, $container);
58
        $this->config = $this->processConfiguration($configuration, $configs);
59 31
60 28
        $this->collectorEnabled = $this->config['enable_collector'];
61 28
62 28
        $this->loadConnections();
63
64
        $declarationRegistryDef = new Definition(DeclarationsRegistry::class);
65 28
        $declarationRegistryDef->setPublic(true);
66 28
        $declarationRegistryDef->setAutowired(true);
67
        $this->container->setDefinition('old_sound_rabbit_mq.declaration_registry', $declarationRegistryDef);
68 3
69
        # declarations
70 31
        foreach ($this->loadExchanges($this->config['declarations']['exchanges']) as $exchange) {
71
            $declarationRegistryDef->addMethodCall('addExchange', [$exchange]);
72 31
        };
73
        foreach ($this->loadQueues($this->config['declarations']['queues']) as $queue) {
74 31
            $declarationRegistryDef->addMethodCall('addQueue', [$queue]);
75
        };
76
        foreach ($this->loadBindings($this->config['declarations']['bindings']) as $binding) {
77 31
            $this->container->getDefinition('old_sound_rabbit_mq.declaration_registry')->addMethodCall('addBinding', [$binding]);
78
        };
79 31
80 31
        $this->loadProducers();
81
        $this->loadConsumers();
82 31
83 22
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
84 31
            $channels = [];
85
            foreach (array_unique($this->channelIds) as $id) {
86 31
                $channels[] = new Reference($id);
87 31
            }
88
89 31
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
90
            $definition->replaceArgument(0, $channels);
91
        } else {
92
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
93 31
        }
94 31
    }
95 31
96
    public function getConfiguration(array $config, ContainerBuilder $container)
97 31
    {
98 31
        return new Configuration($this->getAlias());
99
    }
100 31
101
    /**
102
     * @return Definition[]
103
     */
104
    protected function loadExchanges($exchanges): array
105
    {
106 31
        return array_map(function ($exchange) {
107 31
            $exchangeDeclaration = new Definition(ExchangeDeclaration::class);
108
            $exchangeDeclaration->setProperties($exchange);
109 31
110
            foreach($this->loadBindings($exchange['bindings'], $exchange['name'], null) as $binding) {
111 31
                $this->container->getDefinition('old_sound_rabbit_mq.declaration_registry')->addMethodCall('addBinding', [$binding]);
112
            }
113 31
114
            $this->container->setDefinition('old_sound_rabbit_mq.exchange.'.$exchange['name'], $exchangeDeclaration);
115 31
            return $exchangeDeclaration;
116
        }, $exchanges);
117
    }
118 31
119 22
    /**
120 22
     * @return Definition[]
121 22
     */
122 22
    protected function loadQueues($queues): array
123 22
    {
124 22
        return array_map(function ($queue, $key) use ($queues) {
0 ignored issues
show
Unused Code introduced by
The import $queues is not used and could be removed.

This check looks for imports that have been defined, but are not used in the scope.

Loading history...
125 22
            $queue['name'] = $queue['name'] ?? $key;
126 22
            $queueDeclaration = new Definition(QueueDeclaration::class);
127 22
            $queueDeclaration->setProperties($queue);
128 22
129 22
            foreach ($this->loadBindings($queue['bindings'], null, $queue['name'], false) as $binding) {
130 22
                $this->container->getDefinition('old_sound_rabbit_mq.declaration_registry')->addMethodCall('addBinding', [$binding]);
131
            }
132 22
133
            return $queueDeclaration;
134 22
        }, $queues, array_keys($queues));
135
    }
136 31
137
    protected function createBindingDef($binding, string $exchange = null, string $destination = null, bool $destinationIsExchange = null): Definition
138 31
    {
139
        $routingKeys = $binding['routing_keys'] ?? [$binding['routing_key']];
140 31
141 31
        $definition = new Definition(BindingDeclaration::class);
142 27
        $definition->setProperties([
143 27
            'exchange' => $exchange ? $exchange : $binding['exchange'],
144 27
            'destinationIsExchange' => isset($destinationIsExchange) ? $destinationIsExchange : $binding['destination_is_exchange'],
145 27
            'destination' => $destination ? $destination : $binding['destination'],
146
            'routingKeys' => $routingKeys,
147 27
            // TODO 'arguments' => $binding['arguments'],
148 1
            //'nowait' => $binding['nowait'],
149
        ]);
150 27
151
        return $definition;
152 27
    }
153 27
154
    protected function loadBindings($bindings, string $exchange = null, string $destination = null, bool $destinationIsExchange = null): array
155 27
    {
156 27
        $definitions = [];
157 27
        foreach ($bindings as $binding) {
158 25
            $definitions[] = $this->createBindingDef($binding, $exchange, $destination, $destinationIsExchange);
159
        }
160 27
161
        return $definitions;
162
    }
163
164 27
    protected function loadConnections()
165
    {
166
        $connFactoryDer = new Definition('%old_sound_rabbit_mq.connection_factory.class%');
167
168 27
        foreach ($this->config['connections'] as $key => $connection) {
169
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
170 27
            $classParam =
171 27
                $connection['lazy']
172 22
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
173
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
174
175
            $definition = new Definition($classParam);
176
            $definition->setPublic(false);
177
178
            $definition->setFactory([$connFactoryDer, 'createConnection']);
179
            $definition->setArguments([$classParam, $connection]);
180
181 31
            $definition->addTag('old_sound_rabbit_mq.connection');
182
            $definition->setPublic(true);
183 31
184
            $connectionAliase = sprintf('old_sound_rabbit_mq.connection.%s', $key);
185 31
            $this->container->setDefinition($connectionAliase, $definition);
186 26
187 26
            $channelDef = new Definition(AMQPChannel::class, [
188 26
                new Reference($connectionAliase)
189 26
            ]);
190
            $channelDef->setFactory([self::class, 'getChannelFromConnection']);
191 26
            $channelDef->setPublic(true);
192
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.channel.%s', $key), $channelDef);
193
        }
194 26
    }
195
196 26
    public static function getChannelFromConnection(AbstractConnection $connection)
197
    {
198
        return $connection->channel();
199 26
    }
200 26
201
    protected function loadProducers()
202 26
    {
203 22
        if ($this->config['sandbox']) {
204 22
            foreach ($this->config['producers'] as $key => $producer) {
205 22
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
206 22
                $this->container->setDefinition(sprintf('old_sound_rabbit_mq.producer.%s', $key), $definition);
207
            }
208
            return;
209
        }
210 26
211
        $defaultAutoDeclare = $this->container->getParameter('kernel.environment') !== 'prod';
212
        foreach ($this->config['producers'] as $producerName => $producer) {
213 26
            $alias = sprintf('old_sound_rabbit_mq.producer.%s', $producerName);
214
215
            $definition = new Definition($producer['class']);
216 26
            $definition->setPublic(true);
217
            $definition->addTag('old_sound_rabbit_mq.producer', ['name' => $producerName]);
218
            //this producer doesn't define an exchange -> using AMQP Default
219
            if (!isset($producer['exchange_options'])) {
220
                $producer['exchange_options'] = $this->getDefaultExchangeOptions();
221
            }
222
            //$definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
223
            //this producer doesn't define a queue -> using AMQP Default
224
            if (!isset($producer['queue_options'])) {
225
                $producer['queue_options'] = $this->getDefaultQueueOptions();
226 26
            }
227
            //$definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
228
229
            $definition->addArgument($this->createChannelReference($producer['connection']));
230 26
            $definition->addArgument($producer['exchange']);
231 26
            //$this->injectConnection($definition, $producer['connection']);
232 25
            //if ($this->collectorEnabled) {
233
            //    $this->injectTraceableChannel($definition, $key, $producer['connection']);
234
            //}
235 26
236 1
            //if (!$producer['auto_setup_fabric']) {
237
            //    $definition->addMethodCall('disableAutoSetupFabric');
238
            //}
239 26
240 26
            if (isset($producer['auto_declare'])) {
241 26
                $definition->setProperty('autoDeclare', $producer['auto_declare'] ?? $defaultAutoDeclare);
242
            }
243 31
244
            $this->container->setDefinition($alias, $definition);
245 31
            if ($producer['logging']) {
246
                $this->injectLogger($alias);
247 31
            }
248 22
        }
249 22
    }
250
251 22
    private function createChannelReference($connectionName): Reference
252
    {
253
        return new Reference(sprintf('old_sound_rabbit_mq.channel.%s', $connectionName));
254
    }
255
256
    protected function loadConsumers()
257
    {
258 22
        $simpleExecuteCallbackStrategyAlias = 'old_sound_rabbit_mq.execute_callback_strategy.simple';
259 22
        $this->container->setDefinition($simpleExecuteCallbackStrategyAlias, new Definition(SimpleExecuteCallbackStrategy::class));
260 22
261 22
        foreach ($this->config['consumers'] as $consumerName => $consumer) {
262
            $alias = sprintf('old_sound_rabbit_mq.consumer.%s', $consumerName);
263
            $serializerAlias = sprintf('old_sound_rabbit_mq.consumer.%s.serializer', $consumerName);// TODO
0 ignored issues
show
Unused Code introduced by
The assignment to $serializerAlias is dead and can be removed.
Loading history...
264 22
265
            $connectionName = isset($consumer['connection']) ? $consumer['connection'] : 'default';
266 22
267 22
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%', [
268 22
                $this->createChannelReference($connectionName)
269 22
            ]);
270 22
            $definition->setPublic(true);
271
            $definition->addTag('old_sound_rabbit_mq.consumer', ['name' => $consumerName]);
272 22
            // TODO $this->container->setAlias($serializerAlias, SerializerInterface::class);
273 22
            // $definition->addMethodCall('setSerializer', [new Reference($serializerAlias)]);}
274 22
            foreach($consumer['consumeQueues'] as $index => $consumeQueue) {
275 22
                $queueConsumingDef = new Definition(QueueConsuming::class);
276
                $queueConsumingDef->setProperties([
277
                    'queueName' => $consumeQueue['queue'],
278
                    'callback' => new Reference($consumeQueue['callback']),
279 22
                    //'qosPrefetchSize' => $consumeQueue['qos_prefetch_size'],
280
                    'qosPrefetchCount' => $consumeQueue['qos_prefetch_count'],
281
                    //'consumerTag' => $consumeQueue['consumer_tag'],
282
                    //'noLocal' => $consumeQueue['no_local'],
283
                ]);
284
285
                $executeCallbackStrategyRef = isset($consumeQueue['batch_count']) ?
286
                    new Definition(BatchExecuteCallbackStrategy::class, [$consumeQueue['batch_count']]) :
287 22
                    new Reference($simpleExecuteCallbackStrategyAlias);
288
289
                $definition->addMethodCall('consumeQueue', [
290 22
                    $queueConsumingDef,
291
                    $executeCallbackStrategyRef
292
                ]);
293 22
            }
294
295
            if ($this->container->has('event_dispatcher')) {
296
                $definition->addMethodCall('setEventDispatcher', [
297
                    new Reference(
298
                        'event_dispatcher',
299
                        ContainerInterface::IGNORE_ON_INVALID_REFERENCE
300
                    )
301
                ]);
302
            }
303 22
304
            /* TODO if (array_key_exists('qos_options', $consumer)) {
305
                $definition->addMethodCall('setQosOptions', array(
306
                    $consumer['qos_options']['prefetch_size'],
307 22
                    $consumer['qos_options']['prefetch_count'],
308 22
                    $consumer['qos_options']['global']
309 22
                ));
310
            }*/
311
312 22
            if (isset($consumer['idle_timeout'])) {
313
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
314
            }
315
            if (isset($consumer['idle_timeout_exit_code'])) {
316 22
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
317 22
            }
318 22
            if (isset($consumer['timeout_wait'])) {
319 22
                $definition->setProperty('timeoutWait', [$consumer['timeout_wait']]);
320
            }
321 22
            if (isset($consumer['graceful_max_execution'])) {
322 22
                $definition->addMethodCall(
323
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
324
                    array($consumer['graceful_max_execution']['timeout'])
325 31
                );
326
                $definition->addMethodCall(
327 31
                    'setGracefulMaxExecutionTimeoutExitCode',
328
                    array($consumer['graceful_max_execution']['exit_code'])
329 31
                );
330
            }
331 22
332
            $this->injectConnection($definition, $consumer['connection']);
333
            if ($this->collectorEnabled) {
334
                $this->injectTraceableChannel($definition, $consumerName, $consumer['connection']);
335
            }
336
337
            $this->container->setDefinition($alias, $definition);
338 22
339
            if ($consumer['logging']) {
340 22
                $this->injectLogger($alias);
341 22
            }
342 22
        }
343 22
    }
344 22
345 22
    /**
346
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
347 22
     * parameter. So we revert the change for right configurations.
348
     *
349
     * @param array $config
350
     *
351
     * @return array
352
     */
353
    private function normalizeArgumentKeys(array $config)
0 ignored issues
show
Unused Code introduced by
The method normalizeArgumentKeys() is not used, and could be removed.

This check looks for private methods that have been defined, but are not used inside the class.

Loading history...
354
    {
355 22
        if (isset($config['arguments'])) {
356 22
            $arguments = $config['arguments'];
357 22
            // support for old configuration
358
            if (is_string($arguments)) {
359
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

359
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
360 22
            }
361
362
            $newArguments = [];
363 22
            foreach ($arguments as $key => $value) {
364
                if (strstr($key, '_')) {
365
                    $key = str_replace('_', '-', $key);
366 22
                }
367
                $newArguments[$key] = $value;
368
            }
369
            $config['arguments'] = $newArguments;
370
        }
371
        return $config;
372
    }
373
374
    /**
375
     * Support for arguments provided as string. Support for old configuration files.
376 22
     *
377
     * @deprecated
378
     * @param string $arguments
379
     * @return array
380 22
     */
381 22
    private function argumentsStringAsArray($arguments)
382 22
    {
383
        $argumentsArray = [];
384
385 22
        $argumentPairs = explode(',', $arguments);
386
        foreach ($argumentPairs as $argument) {
387
            $argumentPair = explode(':', $argument);
388
            $type = 'S';
389 22
            if (isset($argumentPair[2])) {
390 22
                $type = $argumentPair[2];
391 22
            }
392 22
            $argumentsArray[$argumentPair[0]] = [$type, $argumentPair[1]];
393
        }
394 31
395
        return $argumentsArray;
396 31
    }
397
398 31
    protected function injectTraceableChannel(Definition $definition, $name, $connectionName)
399
    {
400
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
401
        $traceableChannel = new Definition('%old_sound_rabbit_mq.traceable.channel.class%');
402
        $traceableChannel
403
            ->setPublic(false)
404
            ->addTag('old_sound_rabbit_mq.traceable_channel');
405
        $this->injectConnection($traceableChannel, $connectionName);
406
407
        $this->container->setDefinition($id, $traceableChannel);
408
409
        $this->channelIds[] = $id;
410
        $definition->addArgument(new Reference($id));
411
    }
412
413
    protected function injectConnection(Definition $definition, $connectionName)
414
    {
415
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
416
    }
417
418
    public function getAlias()
419
    {
420
        return 'old_sound_rabbit_mq';
421
    }
422
423
    /**
424
     * Add proper dequeuer aware call
425
     *
426
     * @param string $callback
427
     * @param string $name
428
     */
429
    protected function addDequeuerAwareCall($callback, $name)
430
    {
431
        if (!$this->container->has($callback)) {
432
            return;
433
        }
434
435
        $callbackDefinition = $this->container->findDefinition($callback);
436
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
437
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
438
            $callbackDefinition->addMethodCall('setDequeuer', [new Reference($name)]);
439
        }
440
    }
441
442
    private function injectLogger(string $definitionAlias)
443
    {
444
        $definition = $this->container->getDefinition($definitionAlias);
445
        $definition->addTag('monolog.logger', [
446
            'channel' => 'phpamqplib'
447
        ]);
448
        $loggerAlias = $definitionAlias . '.loggeer';
449
        $this->container->setAlias($loggerAlias, 'logger');
450
        $definition->addMethodCall('setLogger', [new Reference($loggerAlias, ContainerInterface::IGNORE_ON_INVALID_REFERENCE)]);
451
    }
452
453
    /**
454
     * Get default AMQP exchange options
455 31
     * TODO use
456
     * @return array
457 31
     */
458
    protected function getDefaultExchangeOptions()
459 31
    {
460 22
        return array(
461
            'name' => '',
462 22
            'type' => 'direct',
463 22
            'passive' => true,
464 22
            'declare' => false
465 22
        );
466 22
    }
467 22
468 22
    /**
469 22
     * Get default AMQP queue options
470
     * TODO use
471
     * @return array
472 22
     */
473 22
    protected function getDefaultQueueOptions()
474 22
    {
475
        return array(
476 31
            'name' => '',
477
            'declare' => false
478
        );
479
    }
480
}
481