Test Failed
Pull Request — master (#38)
by Aleksandr
07:22
created

OldSoundRabbitMqExtension   F

Complexity

Total Complexity 115

Size/Duplication

Total Lines 704
Duplicated Lines 0 %

Test Coverage

Coverage 71.16%

Importance

Changes 27
Bugs 5 Features 2
Metric Value
eloc 398
c 27
b 5
f 2
dl 0
loc 704
ccs 269
cts 378
cp 0.7116
rs 2
wmc 115

22 Methods

Rating   Name   Duplication   Size   Complexity  
A load() 0 34 4
B loadConnections() 0 33 6
B loadProducers() 0 41 10
A getConfiguration() 0 3 1
A loadBindings() 0 22 4
A loadRpcClients() 0 21 5
F loadBatchConsumers() 0 62 11
B loadRpcServers() 0 31 7
A injectLoggedChannel() 0 13 1
F loadMultipleConsumers() 0 85 17
A injectLogger() 0 6 1
A addDequeuerAwareCall() 0 10 3
A normalizeArgumentKeys() 0 19 5
F loadDynamicConsumers() 0 73 12
A getDefaultQueueOptions() 0 5 1
A injectConnection() 0 3 1
F loadConsumers() 0 65 13
A getDefaultExchangeOptions() 0 7 1
A argumentsStringAsArray() 0 15 3
A getAlias() 0 3 1
A loadGroupConsumers() 0 26 4
A loadAnonConsumers() 0 22 4

How to fix   Complexity   

Complex Class

Complex classes like OldSoundRabbitMqExtension often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use OldSoundRabbitMqExtension, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
6
use Symfony\Component\DependencyInjection\Argument\TaggedIteratorArgument;
7
use Symfony\Component\DependencyInjection\ContainerInterface;
8
use Symfony\Component\DependencyInjection\Extension\Extension;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\Definition;
11
use Symfony\Component\DependencyInjection\Reference;
12
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
13
use Symfony\Component\Config\FileLocator;
14
15
/**
16
 * OldSoundRabbitMqExtension.
17
 *
18
 * @author Alvaro Videla
19
 * @author Marc Weistroff <[email protected]>
20
 */
21
class OldSoundRabbitMqExtension extends Extension
22
{
23
    /**
24
     * @var ContainerBuilder
25
     */
26
    private $container;
27
28
    /**
29
     * @var Boolean Whether the data collector is enabled
30
     */
31
    private $collectorEnabled;
32
33
    private $channelIds = array();
34
35
    private $config = array();
36 31
37
    public function load(array $configs, ContainerBuilder $container)
38 31
    {
39
        $this->container = $container;
40 31
41 31
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
42
        $loader->load('rabbitmq.xml');
43 31
44 31
        $configuration = $this->getConfiguration($configs, $container);
45
        $this->config = $this->processConfiguration($configuration, $configs);
46 31
47
        $this->collectorEnabled = $this->config['enable_collector'];
48 31
49 31
        $this->loadConnections();
50 31
        $this->loadBindings();
51 31
        $this->loadProducers();
52 31
        $this->loadConsumers();
53 31
        $this->loadMultipleConsumers();
54 31
        $this->loadDynamicConsumers();
55 31
        $this->loadBatchConsumers();
56 31
        $this->loadAnonConsumers();
57 31
        $this->loadGroupConsumers();
58
        $this->loadRpcClients();
59 31
        $this->loadRpcServers();
60 28
61 28
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
62 28
            $channels = array();
63
            foreach (array_unique($this->channelIds) as $id) {
64
                $channels[] = new Reference($id);
65 28
            }
66 28
67
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
68 3
            $definition->replaceArgument(0, $channels);
69
        } else {
70 31
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
71
        }
72 31
    }
73
74 31
    public function getConfiguration(array $config, ContainerBuilder $container)
75
    {
76
        return new Configuration($this->getAlias());
77 31
    }
78
79 31
    protected function loadConnections()
80 31
    {
81
        foreach ($this->config['connections'] as $key => $connection) {
82 31
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
83 22
            $classParam =
84 31
                $connection['lazy']
85
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
86 31
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
87 31
88
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
89 31
                $classParam, $connection,
90
            ));
91
            if (isset($connection['connection_parameters_provider'])) {
92
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
93 31
                unset($connection['connection_parameters_provider']);
94 31
            }
95 31
            $definition->setPublic(false);
96
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
97 31
            $this->container->setDefinition($factoryName, $definition);
98 31
99
            $definition = new Definition($classParam);
100 31
            if (method_exists($definition, 'setFactory')) {
101
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
102
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
103
            } else {
104
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
105
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

105
                $definition->/** @scrutinizer ignore-call */ 
106
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
106 31
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107 31
            }
108
            $definition->addTag('old_sound_rabbit_mq.connection');
109 31
            $definition->setPublic(true);
110
111 31
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
112
        }
113 31
    }
114
115 31
    protected function loadBindings()
116
    {
117
        if ($this->config['sandbox']) {
118 31
            return;
119 22
        }
120 22
        foreach ($this->config['bindings'] as $binding) {
121 22
            ksort($binding);
122 22
            $definition = new Definition($binding['class']);
123 22
            $definition->addTag('old_sound_rabbit_mq.binding');
124 22
            $definition->addMethodCall('setArguments', array($binding['arguments']));
125 22
            $definition->addMethodCall('setDestination', array($binding['destination']));
126 22
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
127 22
            $definition->addMethodCall('setExchange', array($binding['exchange']));
128 22
            $definition->addMethodCall('isNowait', array($binding['nowait']));
129 22
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
130 22
            $this->injectConnection($definition, $binding['connection']);
131
            $key = md5(json_encode($binding));
132 22
            if ($this->collectorEnabled) {
133
                // in the context of a binding, I don't thing logged channels are needed?
134 22
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
135
            }
136 31
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
137
        }
138 31
    }
139
140 31
    protected function loadProducers()
141 31
    {
142 27
        if ($this->config['sandbox'] == false) {
143 27
            foreach ($this->config['producers'] as $key => $producer) {
144 27
                $definition = new Definition($producer['class']);
145 27
                $definition->setPublic(true);
146
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
147 27
                $definition->addTag('old_sound_rabbit_mq.producer');
148 1
                //this producer doesn't define an exchange -> using AMQP Default
149
                if (!isset($producer['exchange_options'])) {
150 27
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
151
                }
152 27
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
153 27
                //this producer doesn't define a queue -> using AMQP Default
154
                if (!isset($producer['queue_options'])) {
155 27
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
156 27
                }
157 27
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
158 25
                $this->injectConnection($definition, $producer['connection']);
159
                if ($this->collectorEnabled) {
160 27
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
161
                }
162
                if (!$producer['auto_setup_fabric']) {
163
                    $definition->addMethodCall('disableAutoSetupFabric');
164 27
                }
165
166
                if ($producer['enable_logger']) {
167
                    $this->injectLogger($definition);
168 27
                }
169
170 27
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
171 27
172 22
                $this->container->setDefinition($producerServiceName, $definition);
173
                if (null !== $producer['service_alias']) {
174
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
175
                }
176
            }
177
        } else {
178
            foreach ($this->config['producers'] as $key => $producer) {
179
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
180
                $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer', $key), $definition);
181 31
            }
182
        }
183 31
    }
184
185 31
    protected function loadConsumers()
186 26
    {
187 26
        foreach ($this->config['consumers'] as $key => $consumer) {
188 26
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
189 26
            $definition->setPublic(true);
190
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
191 26
            $definition->addTag('old_sound_rabbit_mq.consumer');
192
            foreach ($consumer['groups'] as $group) {
193
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
194 26
            }
195
            //this consumer doesn't define an exchange -> using AMQP Default
196 26
            if (!isset($consumer['exchange_options'])) {
197
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
198
            }
199 26
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
200 26
            //this consumer doesn't define a queue -> using AMQP Default
201
            if (!isset($consumer['queue_options'])) {
202 26
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
203 22
            }
204 22
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
205 22
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
206 22
207
            if (array_key_exists('qos_options', $consumer)) {
208
                $definition->addMethodCall('setQosOptions', array(
209
                    $consumer['qos_options']['prefetch_size'],
210 26
                    $consumer['qos_options']['prefetch_count'],
211
                    $consumer['qos_options']['global']
212
                ));
213 26
            }
214
215
            if (isset($consumer['idle_timeout'])) {
216 26
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
217
            }
218
            if (isset($consumer['idle_timeout_exit_code'])) {
219
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
220
            }
221
            if (isset($consumer['timeout_wait'])) {
222
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
223
            }
224
            if (isset($consumer['graceful_max_execution'])) {
225
                $definition->addMethodCall(
226 26
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
227
                    array($consumer['graceful_max_execution']['timeout'])
228
                );
229
                $definition->addMethodCall(
230 26
                    'setGracefulMaxExecutionTimeoutExitCode',
231 26
                    array($consumer['graceful_max_execution']['exit_code'])
232 25
                );
233
            }
234
            if (!$consumer['auto_setup_fabric']) {
235 26
                $definition->addMethodCall('disableAutoSetupFabric');
236 1
            }
237
238
            $this->injectConnection($definition, $consumer['connection']);
239 26
            if ($this->collectorEnabled) {
240 26
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
241 26
            }
242
243 31
            if ($consumer['enable_logger']) {
244
                $this->injectLogger($definition);
245 31
            }
246
247 31
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
248 22
            $this->container->setDefinition($name, $definition);
249 22
            $this->addDequeuerAwareCall($consumer['callback'], $name);
250
        }
251 22
    }
252
253
    protected function loadMultipleConsumers()
254
    {
255
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
256
            $queues = array();
257
            $callbacks = array();
258 22
259 22
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
260 22
                throw new InvalidConfigurationException(
261 22
                    "Error on loading $key multiple consumer. " .
262
                    "Either 'queues' or 'queues_provider' parameters should be defined."
263
                );
264 22
            }
265
266 22
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
267 22
                $queues[$queueOptions['name']] = $queueOptions;
268 22
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
269 22
                $callbacks[] = $queueOptions['callback'];
270 22
            }
271
272 22
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
273 22
            $definition
274 22
                ->setPublic(true)
275 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
276
                ->addTag('old_sound_rabbit_mq.multi_consumer')
277
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
278
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
279 22
280
            foreach ($consumer['groups'] as $group) {
281
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
282
            }
283
284
            if ($consumer['queues_provider']) {
285
                $definition->addMethodCall(
286
                    'setQueuesProvider',
287 22
                    array(new Reference($consumer['queues_provider']))
288
                );
289
            }
290 22
291
            if (array_key_exists('qos_options', $consumer)) {
292
                $definition->addMethodCall('setQosOptions', array(
293 22
                    $consumer['qos_options']['prefetch_size'],
294
                    $consumer['qos_options']['prefetch_count'],
295
                    $consumer['qos_options']['global']
296
                ));
297
            }
298
299
            if (isset($consumer['idle_timeout'])) {
300
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
301
            }
302
            if (isset($consumer['idle_timeout_exit_code'])) {
303 22
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
304
            }
305
            if (isset($consumer['timeout_wait'])) {
306
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
307 22
            }
308 22
            if (isset($consumer['graceful_max_execution'])) {
309 22
                $definition->addMethodCall(
310
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
311
                    array($consumer['graceful_max_execution']['timeout'])
312 22
                );
313
                $definition->addMethodCall(
314
                    'setGracefulMaxExecutionTimeoutExitCode',
315
                    array($consumer['graceful_max_execution']['exit_code'])
316 22
                );
317 22
            }
318 22
            if (!$consumer['auto_setup_fabric']) {
319 22
                $definition->addMethodCall('disableAutoSetupFabric');
320
            }
321 22
322 22
            $this->injectConnection($definition, $consumer['connection']);
323
            if ($this->collectorEnabled) {
324
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
325 31
            }
326
327 31
            if ($consumer['enable_logger']) {
328
                $this->injectLogger($definition);
329 31
            }
330
331 22
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
332
            $this->container->setDefinition($name, $definition);
333
            if ($consumer['queues_provider']) {
334
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
335
            }
336
            foreach ($callbacks as $callback) {
337
                $this->addDequeuerAwareCall($callback, $name);
338 22
            }
339
        }
340 22
    }
341 22
342 22
    protected function loadDynamicConsumers()
343 22
    {
344 22
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
345 22
346
            if (empty($consumer['queue_options_provider'])) {
347 22
                throw new InvalidConfigurationException(
348
                    "Error on loading $key dynamic consumer. " .
349
                    "'queue_provider' parameter should be defined."
350
                );
351
            }
352
353
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
354
            $definition
355 22
                ->setPublic(true)
356 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
357 22
                ->addTag('old_sound_rabbit_mq.consumer')
358
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
359
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
360 22
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
361
362
            foreach ($consumer['groups'] as $group) {
363 22
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
364
            }
365
366 22
            if (array_key_exists('qos_options', $consumer)) {
367
                $definition->addMethodCall('setQosOptions', array(
368
                    $consumer['qos_options']['prefetch_size'],
369
                    $consumer['qos_options']['prefetch_count'],
370
                    $consumer['qos_options']['global']
371
                ));
372
            }
373
374
            $definition->addMethodCall(
375
                'setQueueOptionsProvider',
376 22
                array(new Reference($consumer['queue_options_provider']))
377
            );
378
379
            if (isset($consumer['idle_timeout'])) {
380 22
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
381 22
            }
382 22
            if (isset($consumer['idle_timeout_exit_code'])) {
383
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
384
            }
385 22
            if (isset($consumer['timeout_wait'])) {
386
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
387
            }
388
            if (isset($consumer['graceful_max_execution'])) {
389 22
                $definition->addMethodCall(
390 22
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
391 22
                    array($consumer['graceful_max_execution']['timeout'])
392 22
                );
393
                $definition->addMethodCall(
394 31
                    'setGracefulMaxExecutionTimeoutExitCode',
395
                    array($consumer['graceful_max_execution']['exit_code'])
396 31
                );
397
            }
398 31
            if (!$consumer['auto_setup_fabric']) {
399
                $definition->addMethodCall('disableAutoSetupFabric');
400
            }
401
402
            $this->injectConnection($definition, $consumer['connection']);
403
            if ($this->collectorEnabled) {
404
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
405
            }
406
407
            if ($consumer['enable_logger']) {
408
                $this->injectLogger($definition);
409
            }
410
411
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
412
            $this->container->setDefinition($name, $definition);
413
            $this->addDequeuerAwareCall($consumer['callback'], $name);
414
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
415
        }
416
    }
417
418
    protected function loadBatchConsumers()
419
    {
420
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
421
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
422
423
            if (!isset($consumer['exchange_options'])) {
424
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
425
            }
426
427
            $definition
428
                ->setPublic(true)
429
                ->addTag('old_sound_rabbit_mq.base_amqp')
430
                ->addTag('old_sound_rabbit_mq.batch_consumer')
431
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
432
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
433
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
434
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
435
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
436
                ->addMethodCall('setQosOptions', array(
437
                    $consumer['qos_options']['prefetch_size'],
438
                    $consumer['qos_options']['prefetch_count'],
439
                    $consumer['qos_options']['global']
440
                ))
441
            ;
442
443
            foreach ($consumer['groups'] as $group) {
444
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
445
            }
446
447
            if (isset($consumer['idle_timeout_exit_code'])) {
448
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
449
            }
450
451
            if (isset($consumer['idle_timeout'])) {
452
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
453
            }
454
455 31
            if (isset($consumer['graceful_max_execution'])) {
456
                $definition->addMethodCall(
457 31
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
458
                    array($consumer['graceful_max_execution']['timeout'])
459 31
                );
460 22
            }
461
462 22
            if (!$consumer['auto_setup_fabric']) {
463 22
                $definition->addMethodCall('disableAutoSetupFabric');
464 22
            }
465 22
466 22
            if ($consumer['keep_alive']) {
467 22
                $definition->addMethodCall('keepAlive');
468 22
            }
469 22
470
            $this->injectConnection($definition, $consumer['connection']);
471
            if ($this->collectorEnabled) {
472 22
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
473 22
            }
474 22
475
            if ($consumer['enable_logger']) {
476 31
                $this->injectLogger($definition);
477
            }
478
479
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
480
        }
481
    }
482
483
    protected function loadAnonConsumers()
484
    {
485
        foreach ($this->config['anon_consumers'] as $key => $anon) {
486 27
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
487
            $definition
488 27
                ->setPublic(true)
489 1
                ->addTag('old_sound_rabbit_mq.base_amqp')
490
                ->addTag('old_sound_rabbit_mq.anon_consumer')
491 1
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
492
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
493
            $this->injectConnection($definition, $anon['connection']);
494
            if ($this->collectorEnabled) {
495 1
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
496 1
            }
497 1
498
            foreach ($anon['groups'] as $group) {
499
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
500 1
            }
501
502 1
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
503
            $this->container->setDefinition($name, $definition);
504 27
            $this->addDequeuerAwareCall($anon['callback'], $name);
505
        }
506
    }
507
    
508
    protected function loadGroupConsumers()
509
    {
510
        foreach (array_merge_recursive(['default' => ['connection' => 'default']], $this->config['group_consumers']) as $key => $group) {
511
            $definition = new Definition('%old_sound_rabbit_mq.group_consumer.class%');
512
            $definition
513
                ->setPublic(true)
514
                ->addTag('old_sound_rabbit_mq.base_amqp')
515
                ->addTag('old_sound_rabbit_mq.group_consumer')
516
            ;
517
            $this->injectConnection($definition, $group['connection']);
518
            if ($this->collectorEnabled) {
519
                $this->injectLoggedChannel($definition, $key, $group['connection']);
520
            }
521
522
            $definition->addMethodCall('addConsumers', [new TaggedIteratorArgument('old_sound_rabbit_mq.consumer.group.' . $key)]);
523
524
            if (array_key_exists('qos_options', $group)) {
525
                $definition->addMethodCall('setQosOptions', array(
526
                    $consumer['qos_options']['prefetch_size'],
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $consumer seems to be never defined.
Loading history...
527
                    $consumer['qos_options']['prefetch_count'],
528
                    $consumer['qos_options']['global']
529
                ));
530
            }
531 31
532
            $name = sprintf('old_sound_rabbit_mq.%s_group', $key);
533 31
            $this->container->setDefinition($name, $definition);
534 25
        }
535 25
    }
536
537 25
    /**
538 25
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
539 25
     * parameter. So we revert the change for right configurations.
540 25
     *
541 25
     * @param array $config
542
     *
543 25
     * @return array
544 25
     */
545
    private function normalizeArgumentKeys(array $config)
546 25
    {
547 25
        if (isset($config['arguments'])) {
548
            $arguments = $config['arguments'];
549 25
            // support for old configuration
550
            if (is_string($arguments)) {
551 25
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

551
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
552
            }
553 31
554
            $newArguments = array();
555 31
            foreach ($arguments as $key => $value) {
556
                if (strstr($key, '_')) {
557 31
                    $key = str_replace('_', '-', $key);
558 22
                }
559
                $newArguments[$key] = $value;
560 22
            }
561 22
            $config['arguments'] = $newArguments;
562 22
        }
563 22
        return $config;
564 22
    }
565 22
566 22
    /**
567 22
     * Support for arguments provided as string. Support for old configuration files.
568
     *
569 22
     * @deprecated
570
     * @param string $arguments
571
     * @return array
572
     */
573
    private function argumentsStringAsArray($arguments)
574
    {
575
        $argumentsArray = array();
576 22
577 22
        $argumentPairs = explode(',', $arguments);
578
        foreach ($argumentPairs as $argument) {
579 22
            $argumentPair = explode(':', $argument);
580 22
            $type = 'S';
581
            if (isset($argumentPair[2])) {
582 22
                $type = $argumentPair[2];
583 22
            }
584
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
585 22
        }
586
587 31
        return $argumentsArray;
588
    }
589 28
590
    protected function loadRpcClients()
591 28
    {
592 28
        foreach ($this->config['rpc_clients'] as $key => $client) {
593
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
594 28
            $definition->setLazy($client['lazy']);
595 28
            $definition
596 28
                ->addTag('old_sound_rabbit_mq.rpc_client')
597
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
598 28
            $this->injectConnection($definition, $client['connection']);
599
            if ($this->collectorEnabled) {
600 28
                $this->injectLoggedChannel($definition, $key, $client['connection']);
601 28
            }
602 28
            if (array_key_exists('unserializer', $client)) {
603
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
604 30
            }
605
            if (array_key_exists('direct_reply_to', $client)) {
606 30
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
607 30
            }
608
            $definition->setPublic(true);
609 31
610
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
611 31
        }
612
    }
613
614
    protected function loadRpcServers()
615
    {
616
        foreach ($this->config['rpc_servers'] as $key => $server) {
617
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
618
            $definition
619
                ->setPublic(true)
620 26
                ->addTag('old_sound_rabbit_mq.base_amqp')
621
                ->addTag('old_sound_rabbit_mq.rpc_server')
622 26
                ->addMethodCall('initServer', array($key))
623 26
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
624
            $this->injectConnection($definition, $server['connection']);
625
            if ($this->collectorEnabled) {
626
                $this->injectLoggedChannel($definition, $key, $server['connection']);
627
            }
628
            if (array_key_exists('qos_options', $server)) {
629
                $definition->addMethodCall('setQosOptions', array(
630
                    $server['qos_options']['prefetch_size'],
631
                    $server['qos_options']['prefetch_count'],
632
                    $server['qos_options']['global']
633 1
                ));
634
            }
635 1
            if (array_key_exists('exchange_options', $server)) {
636 1
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
637
            }
638 1
            if (array_key_exists('queue_options', $server)) {
639 1
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
640
            }
641
            if (array_key_exists('serializer', $server)) {
642
                $definition->addMethodCall('setSerializer', array($server['serializer']));
643
            }
644
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
645
        }
646 1
    }
647
648
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
649 1
    {
650
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
651
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
652
        $channel
653
            ->setPublic(false)
654
            ->addTag('old_sound_rabbit_mq.logged_channel');
655
        $this->injectConnection($channel, $connectionName);
656
657
        $this->container->setDefinition($id, $channel);
658
659
        $this->channelIds[] = $id;
660
        $definition->addArgument(new Reference($id));
661 27
    }
662
663
    protected function injectConnection(Definition $definition, $connectionName)
664 27
    {
665
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
666
    }
667
668
    public function getAlias()
669
    {
670
        return 'old_sound_rabbit_mq';
671
    }
672
673
    /**
674
     * Add proper dequeuer aware call
675
     *
676
     * @param string $callback
677
     * @param string $name
678
     */
679
    protected function addDequeuerAwareCall($callback, $name)
680
    {
681
        if (!$this->container->has($callback)) {
682
            return;
683
        }
684
685
        $callbackDefinition = $this->container->findDefinition($callback);
686
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
687
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
688
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
689
        }
690
    }
691
692
    private function injectLogger(Definition $definition)
693
    {
694
        $definition->addTag('monolog.logger', array(
695
            'channel' => 'phpamqplib'
696
        ));
697
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
698
    }
699
700
    /**
701
     * Get default AMQP exchange options
702
     *
703
     * @return array
704
     */
705
    protected function getDefaultExchangeOptions()
706
    {
707
        return array(
708
            'name' => '',
709
            'type' => 'direct',
710
            'passive' => true,
711
            'declare' => false
712
        );
713
    }
714
715
    /**
716
     * Get default AMQP queue options
717
     *
718
     * @return array
719
     */
720
    protected function getDefaultQueueOptions()
721
    {
722
        return array(
723
            'name' => '',
724
            'declare' => false
725
        );
726
    }
727
}
728