Test Failed
Pull Request — master (#39)
by Aleksandr
09:31
created

OldSoundRabbitMqExtension   F

Complexity

Total Complexity 60

Size/Duplication

Total Lines 436
Duplicated Lines 0 %

Test Coverage

Coverage 68.46%

Importance

Changes 24
Bugs 4 Features 2
Metric Value
eloc 200
c 24
b 4
f 2
dl 0
loc 436
rs 3.6
ccs 191
cts 279
cp 0.6846
wmc 60

20 Methods

Rating   Name   Duplication   Size   Complexity  
A getChannelFromConnection() 0 3 1
A createBindingDef() 0 15 4
B load() 0 43 7
A loadExchanges() 0 13 2
A loadConnections() 0 29 4
A getConfiguration() 0 3 1
A loadQueues() 0 13 2
A loadBindings() 0 8 2
A injectLogger() 0 9 1
A injectTraceableChannel() 0 13 1
A addDequeuerAwareCall() 0 10 3
B loadProducers() 0 42 8
A normalizeArgumentKeys() 0 19 5
A createChannelReference() 0 3 1
A getDefaultQueueOptions() 0 5 1
A injectConnection() 0 3 1
D loadConsumers() 0 80 11
A getDefaultExchangeOptions() 0 7 1
A argumentsStringAsArray() 0 15 3
A getAlias() 0 3 1

How to fix   Complexity   

Complex Class

Complex classes like OldSoundRabbitMqExtension often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use OldSoundRabbitMqExtension, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use OldSound\RabbitMqBundle\Consumer\ConsumersRegistry;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\Consumer\ConsumersRegistry was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
6
use OldSound\RabbitMqBundle\Declarations\DeclarationsRegistry;
7
use OldSound\RabbitMqBundle\Declarations\QueueConsuming;
8
use OldSound\RabbitMqBundle\Declarations\BindingDeclaration;
9
use OldSound\RabbitMqBundle\Declarations\ExchangeDeclaration;
10
use OldSound\RabbitMqBundle\Declarations\QueueDeclaration;
11
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\SimpleExecuteCallbackStrategy;
12
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\BatchExecuteCallbackStrategy;
13
use PhpAmqpLib\Channel\AMQPChannel;
14
use PhpAmqpLib\Connection\AbstractConnection;
15
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
16
use Symfony\Component\DependencyInjection\Argument\TaggedIteratorArgument;
17
use Symfony\Component\DependencyInjection\ContainerInterface;
18
use Symfony\Component\DependencyInjection\Extension\Extension;
19
use Symfony\Component\DependencyInjection\ContainerBuilder;
20
use Symfony\Component\DependencyInjection\Definition;
21
use Symfony\Component\DependencyInjection\Reference;
22
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
23
use Symfony\Component\Config\FileLocator;
24
use Symfony\Component\HttpKernel\Log\Logger;
25
use Symfony\Component\Serializer\SerializerInterface;
26
27
/**+
28
 * OldSoundRabbitMqExtension.
29
 *
30
 * @author Alvaro Videla
31
 * @author Marc Weistroff <[email protected]>
32
 */
33
class OldSoundRabbitMqExtension extends Extension
34
{
35
    /**
36 31
     * @var ContainerBuilder
37
     */
38 31
    private $container;
39
40 31
    /**
41 31
     * @var Boolean Whether the data collector is enabled
42
     */
43 31
    private $collectorEnabled;
44 31
45
    private $channelIds = [];
46 31
    private $groups = [];
0 ignored issues
show
introduced by
The private property $groups is not used, and could be removed.
Loading history...
47
48 31
    private $config = [];
49 31
50 31
    public function load(array $configs, ContainerBuilder $container)
51 31
    {
52 31
        $this->container = $container;
53 31
54 31
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
55 31
        $loader->load('rabbitmq.xml');
56 31
57 31
        $configuration = $this->getConfiguration($configs, $container);
58
        $this->config = $this->processConfiguration($configuration, $configs);
59 31
60 28
        $this->collectorEnabled = $this->config['enable_collector'];
61 28
62 28
        $this->loadConnections();
63
64
        $declarationRegistryDef = new Definition(DeclarationsRegistry::class);
65 28
        $declarationRegistryDef->setPublic(true);
66 28
        $declarationRegistryDef->setAutowired(true);
67
        $this->container->setDefinition('old_sound_rabbit_mq.declaration_registry', $declarationRegistryDef);
68 3
69
        # declarations
70 31
        foreach ($this->loadExchanges($this->config['declarations']['exchanges']) as $exchange) {
71
            $declarationRegistryDef->addMethodCall('addExchange', [$exchange]);
72 31
        };
73
        foreach ($this->loadQueues($this->config['declarations']['queues']) as $queue) {
74 31
            $declarationRegistryDef->addMethodCall('addQueue', [$queue]);
75
        };
76
        foreach ($this->loadBindings($this->config['declarations']['bindings']) as $binding) {
77 31
            $this->container->getDefinition('old_sound_rabbit_mq.declaration_registry')->addMethodCall('addBinding', [$binding]);
78
        };
79 31
80 31
        $this->loadProducers();
81
        $this->loadConsumers();
82 31
83 22
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
84 31
            $channels = [];
85
            foreach (array_unique($this->channelIds) as $id) {
86 31
                $channels[] = new Reference($id);
87 31
            }
88
89 31
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
90
            $definition->replaceArgument(0, $channels);
91
        } else {
92
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
93 31
        }
94 31
    }
95 31
96
    public function getConfiguration(array $config, ContainerBuilder $container)
97 31
    {
98 31
        return new Configuration($this->getAlias());
99
    }
100 31
101
    /**
102
     * @return Definition[]
103
     */
104
    protected function loadExchanges($exchanges): array
105
    {
106 31
        return array_map(function ($exchange) {
107 31
            $exchangeDeclaration = new Definition(ExchangeDeclaration::class);
108
            $exchangeDeclaration->setProperties($exchange);
109 31
110
            foreach($this->loadBindings($exchange['bindings'], $exchange['name'], null) as $binding) {
111 31
                $this->container->getDefinition('old_sound_rabbit_mq.declaration_registry')->addMethodCall('addBinding', [$binding]);
112
            }
113 31
114
            $this->container->setDefinition('old_sound_rabbit_mq.exchange.'.$exchange['name'], $exchangeDeclaration);
115 31
            return $exchangeDeclaration;
116
        }, $exchanges);
117
    }
118 31
119 22
    /**
120 22
     * @return Definition[]
121 22
     */
122 22
    protected function loadQueues($queues): array
123 22
    {
124 22
        return array_map(function ($queue, $key) use ($queues) {
0 ignored issues
show
Unused Code introduced by
The import $queues is not used and could be removed.

This check looks for imports that have been defined, but are not used in the scope.

Loading history...
125 22
            $queue['name'] = $queue['name'] ?? $key;
126 22
            $queueDeclaration = new Definition(QueueDeclaration::class);
127 22
            $queueDeclaration->setProperties($queue);
128 22
129 22
            foreach ($this->loadBindings($queue['bindings'], null, $queue['name'], false) as $binding) {
130 22
                $this->container->getDefinition('old_sound_rabbit_mq.declaration_registry')->addMethodCall('addBinding', [$binding]);
131
            }
132 22
133
            return $queueDeclaration;
134 22
        }, $queues, array_keys($queues));
135
    }
136 31
137
    protected function createBindingDef($binding, string $exchange = null, string $destination = null, bool $destinationIsExchange = null): Definition
138 31
    {
139
        $routingKeys = $binding['routing_keys'] ?? [$binding['routing_key']];
140 31
141 31
        $definition = new Definition(BindingDeclaration::class);
142 27
        $definition->setProperties([
143 27
            'exchange' => $exchange ? $exchange : $binding['exchange'],
144 27
            'destinationIsExchange' => isset($destinationIsExchange) ? $destinationIsExchange : $binding['destination_is_exchange'],
145 27
            'destination' => $destination ? $destination : $binding['destination'],
146
            'routingKeys' => $routingKeys,
147 27
            // TODO 'arguments' => $binding['arguments'],
148 1
            //'nowait' => $binding['nowait'],
149
        ]);
150 27
151
        return $definition;
152 27
    }
153 27
154
    protected function loadBindings($bindings, string $exchange = null, string $destination = null, bool $destinationIsExchange = null): array
155 27
    {
156 27
        $definitions = [];
157 27
        foreach ($bindings as $binding) {
158 25
            $definitions[] = $this->createBindingDef($binding, $exchange, $destination, $destinationIsExchange);
159
        }
160 27
161
        return $definitions;
162
    }
163
164 27
    protected function loadConnections()
165
    {
166
        $connFactoryDer = new Definition('%old_sound_rabbit_mq.connection_factory.class%');
167
168 27
        foreach ($this->config['connections'] as $key => $connection) {
169
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
170 27
            $classParam =
171 27
                $connection['lazy']
172 22
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
173
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
174
175
            $definition = new Definition($classParam);
176
            $definition->setPublic(false);
177
178
            $definition->setFactory([$connFactoryDer, 'createConnection']);
179
            $definition->setArguments([$classParam, $connection]);
180
181 31
            $definition->addTag('old_sound_rabbit_mq.connection');
182
            $definition->setPublic(true);
183 31
184
            $connectionAliase = sprintf('old_sound_rabbit_mq.connection.%s', $key);
185 31
            $this->container->setDefinition($connectionAliase, $definition);
186 26
187 26
            $channelDef = new Definition(AMQPChannel::class, [
188 26
                new Reference($connectionAliase)
189 26
            ]);
190
            $channelDef->setFactory([self::class, 'getChannelFromConnection']);
191 26
            $channelDef->setPublic(true);
192
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.channel.%s', $key), $channelDef);
193
        }
194 26
    }
195
196 26
    public static function getChannelFromConnection(AbstractConnection $connection)
197
    {
198
        return $connection->channel();
199 26
    }
200 26
201
    protected function loadProducers()
202 26
    {
203 22
        if ($this->config['sandbox']) {
204 22
            foreach ($this->config['producers'] as $key => $producer) {
205 22
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
206 22
                $this->container->setDefinition(sprintf('old_sound_rabbit_mq.producer.%s', $key), $definition);
207
            }
208
            return;
209
        }
210 26
211
        $defaultAutoDeclare = $this->container->getParameter('kernel.environment') !== 'prod';
212
        foreach ($this->config['producers'] as $producerName => $producer) {
213 26
            $alias = sprintf('old_sound_rabbit_mq.producer.%s', $producerName);
214
215
            $definition = new Definition($producer['class']);
216 26
            $definition->setPublic(true);
217
            $definition->addTag('old_sound_rabbit_mq.producer', ['name' => $producerName]);
218
            //this producer doesn't define an exchange -> using AMQP Default
219
            if (!isset($producer['exchange_options'])) {
220
                $producer['exchange_options'] = $this->getDefaultExchangeOptions();
221
            }
222
            //$definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
223
            //this producer doesn't define a queue -> using AMQP Default
224
            if (!isset($producer['queue_options'])) {
225
                $producer['queue_options'] = $this->getDefaultQueueOptions();
226 26
            }
227
            //$definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
228
229
            $definition->addArgument($this->createChannelReference($producer['connection']));
230 26
            $definition->addArgument($producer['exchange']);
231 26
            //$this->injectConnection($definition, $producer['connection']);
232 25
            //if ($this->collectorEnabled) {
233
            //    $this->injectTraceableChannel($definition, $key, $producer['connection']);
234
            //}
235 26
236 1
            if (isset($producer['auto_declare'])) {
237
                $definition->setProperty('autoDeclare', $producer['auto_declare'] ?? $defaultAutoDeclare);
238
            }
239 26
240 26
            $this->container->setDefinition($alias, $definition);
241 26
            if ($producer['logging']) {
242
                $this->injectLogger($alias);
243 31
            }
244
        }
245 31
    }
246
247 31
    private function createChannelReference($connectionName): Reference
248 22
    {
249 22
        return new Reference(sprintf('old_sound_rabbit_mq.channel.%s', $connectionName));
250
    }
251 22
252
    protected function loadConsumers()
253
    {
254
        $simpleExecuteCallbackStrategyAlias = 'old_sound_rabbit_mq.execute_callback_strategy.simple';
255
        $this->container->setDefinition($simpleExecuteCallbackStrategyAlias, new Definition(SimpleExecuteCallbackStrategy::class));
256
257
        foreach ($this->config['consumers'] as $consumerName => $consumer) {
258 22
            $alias = sprintf('old_sound_rabbit_mq.consumer.%s', $consumerName);
259 22
            $serializerAlias = sprintf('old_sound_rabbit_mq.consumer.%s.serializer', $consumerName);// TODO
0 ignored issues
show
Unused Code introduced by
The assignment to $serializerAlias is dead and can be removed.
Loading history...
260 22
261 22
            $connectionName = isset($consumer['connection']) ? $consumer['connection'] : 'default';
262
263
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%', [
264 22
                $this->createChannelReference($connectionName)
265
            ]);
266 22
            $definition->setPublic(true);
267 22
            $definition->addTag('old_sound_rabbit_mq.consumer', ['name' => $consumerName]);
268 22
            // TODO $this->container->setAlias($serializerAlias, SerializerInterface::class);
269 22
            // $definition->addMethodCall('setSerializer', [new Reference($serializerAlias)]);}
270 22
            foreach($consumer['consumeQueues'] as $index => $consumeQueue) {
271
                $queueConsumingDef = new Definition(QueueConsuming::class);
272 22
                $queueConsumingDef->setProperties([
273 22
                    'queueName' => $consumeQueue['queue'],
274 22
                    'callback' => new Reference($consumeQueue['callback']),
275 22
                    //'qosPrefetchSize' => $consumeQueue['qos_prefetch_size'],
276
                    'qosPrefetchCount' => $consumeQueue['qos_prefetch_count'],
277
                    //'consumerTag' => $consumeQueue['consumer_tag'],
278
                    //'noLocal' => $consumeQueue['no_local'],
279 22
                ]);
280
281
                $executeCallbackStrategyRef = isset($consumeQueue['batch_count']) ?
282
                    new Definition(BatchExecuteCallbackStrategy::class, [$consumeQueue['batch_count']]) :
283
                    new Reference($simpleExecuteCallbackStrategyAlias);
284
285
                $definition->addMethodCall('consumeQueue', [
286
                    $queueConsumingDef,
287 22
                    $executeCallbackStrategyRef
288
                ]);
289
            }
290 22
291
            $definition->addMethodCall('setEventDispatcher', [
292
                new Reference('event_dispatcher', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)
293 22
            ]);
294
295
            /* TODO if (array_key_exists('qos_options', $consumer)) {
296
                $definition->addMethodCall('setQosOptions', array(
297
                    $consumer['qos_options']['prefetch_size'],
298
                    $consumer['qos_options']['prefetch_count'],
299
                    $consumer['qos_options']['global']
300
                ));
301
            }*/
302
303 22
            if (isset($consumer['idle_timeout'])) {
304
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
305
            }
306
            if (isset($consumer['idle_timeout_exit_code'])) {
307 22
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
308 22
            }
309 22
            if (isset($consumer['timeout_wait'])) {
310
                $definition->setProperty('timeoutWait', [$consumer['timeout_wait']]);
311
            }
312 22
            if (isset($consumer['graceful_max_execution'])) {
313
                $definition->addMethodCall(
314
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
315
                    array($consumer['graceful_max_execution']['timeout'])
316 22
                );
317 22
                $definition->addMethodCall(
318 22
                    'setGracefulMaxExecutionTimeoutExitCode',
319 22
                    array($consumer['graceful_max_execution']['exit_code'])
320
                );
321 22
            }
322 22
323
            $this->injectConnection($definition, $consumer['connection']);
324
            if ($this->collectorEnabled) {
325 31
                $this->injectTraceableChannel($definition, $consumerName, $consumer['connection']);
326
            }
327 31
328
            $this->container->setDefinition($alias, $definition);
329 31
330
            if ($consumer['logging']) {
331 22
                $this->injectLogger($alias);
332
            }
333
        }
334
    }
335
336
    /**
337
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
338 22
     * parameter. So we revert the change for right configurations.
339
     *
340 22
     * @param array $config
341 22
     *
342 22
     * @return array
343 22
     */
344 22
    private function normalizeArgumentKeys(array $config)
0 ignored issues
show
Unused Code introduced by
The method normalizeArgumentKeys() is not used, and could be removed.

This check looks for private methods that have been defined, but are not used inside the class.

Loading history...
345 22
    {
346
        if (isset($config['arguments'])) {
347 22
            $arguments = $config['arguments'];
348
            // support for old configuration
349
            if (is_string($arguments)) {
350
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

350
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
351
            }
352
353
            $newArguments = [];
354
            foreach ($arguments as $key => $value) {
355 22
                if (strstr($key, '_')) {
356 22
                    $key = str_replace('_', '-', $key);
357 22
                }
358
                $newArguments[$key] = $value;
359
            }
360 22
            $config['arguments'] = $newArguments;
361
        }
362
        return $config;
363 22
    }
364
365
    /**
366 22
     * Support for arguments provided as string. Support for old configuration files.
367
     *
368
     * @deprecated
369
     * @param string $arguments
370
     * @return array
371
     */
372
    private function argumentsStringAsArray($arguments)
373
    {
374
        $argumentsArray = [];
375
376 22
        $argumentPairs = explode(',', $arguments);
377
        foreach ($argumentPairs as $argument) {
378
            $argumentPair = explode(':', $argument);
379
            $type = 'S';
380 22
            if (isset($argumentPair[2])) {
381 22
                $type = $argumentPair[2];
382 22
            }
383
            $argumentsArray[$argumentPair[0]] = [$type, $argumentPair[1]];
384
        }
385 22
386
        return $argumentsArray;
387
    }
388
389 22
    protected function injectTraceableChannel(Definition $definition, $name, $connectionName)
390 22
    {
391 22
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
392 22
        $traceableChannel = new Definition('%old_sound_rabbit_mq.traceable.channel.class%');
393
        $traceableChannel
394 31
            ->setPublic(false)
395
            ->addTag('old_sound_rabbit_mq.traceable_channel');
396 31
        $this->injectConnection($traceableChannel, $connectionName);
397
398 31
        $this->container->setDefinition($id, $traceableChannel);
399
400
        $this->channelIds[] = $id;
401
        $definition->addArgument(new Reference($id));
402
    }
403
404
    protected function injectConnection(Definition $definition, $connectionName)
405
    {
406
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
407
    }
408
409
    public function getAlias()
410
    {
411
        return 'old_sound_rabbit_mq';
412
    }
413
414
    /**
415
     * Add proper dequeuer aware call
416
     *
417
     * @param string $callback
418
     * @param string $name
419
     */
420
    protected function addDequeuerAwareCall($callback, $name)
421
    {
422
        if (!$this->container->has($callback)) {
423
            return;
424
        }
425
426
        $callbackDefinition = $this->container->findDefinition($callback);
427
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
428
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
429
            $callbackDefinition->addMethodCall('setDequeuer', [new Reference($name)]);
430
        }
431
    }
432
433
    private function injectLogger(string $definitionAlias)
434
    {
435
        $definition = $this->container->getDefinition($definitionAlias);
436
        $definition->addTag('monolog.logger', [
437
            'channel' => 'phpamqplib'
438
        ]);
439
        $loggerAlias = $definitionAlias . '.loggeer';
440
        $this->container->setAlias($loggerAlias, 'logger');
441
        $definition->addMethodCall('setLogger', [new Reference($loggerAlias, ContainerInterface::IGNORE_ON_INVALID_REFERENCE)]);
442
    }
443
444
    /**
445
     * Get default AMQP exchange options
446
     * TODO use
447
     * @return array
448
     */
449
    protected function getDefaultExchangeOptions()
450
    {
451
        return array(
452
            'name' => '',
453
            'type' => 'direct',
454
            'passive' => true,
455 31
            'declare' => false
456
        );
457 31
    }
458
459 31
    /**
460 22
     * Get default AMQP queue options
461
     * TODO use
462 22
     * @return array
463 22
     */
464 22
    protected function getDefaultQueueOptions()
465 22
    {
466 22
        return array(
467 22
            'name' => '',
468 22
            'declare' => false
469 22
        );
470
    }
471
}
472