Passed
Push — master ( 5a8d35...554209 )
by Mihai
07:44
created

OldSoundRabbitMqExtension   F

Complexity

Total Complexity 106

Size/Duplication

Total Lines 655
Duplicated Lines 0 %

Test Coverage

Coverage 71.16%

Importance

Changes 27
Bugs 5 Features 2
Metric Value
eloc 370
c 27
b 5
f 2
dl 0
loc 655
ccs 269
cts 378
cp 0.7116
rs 2
wmc 106

21 Methods

Rating   Name   Duplication   Size   Complexity  
A getConfiguration() 0 3 1
A loadBindings() 0 22 4
A load() 0 33 4
B loadConnections() 0 33 6
B loadProducers() 0 41 10
A loadRpcClients() 0 21 5
C loadBatchConsumers() 0 58 10
B loadRpcServers() 0 31 7
A injectLoggedChannel() 0 13 1
F loadMultipleConsumers() 0 81 16
A injectLogger() 0 6 1
A addDequeuerAwareCall() 0 10 3
A normalizeArgumentKeys() 0 19 5
C loadDynamicConsumers() 0 69 11
A getDefaultQueueOptions() 0 5 1
A injectConnection() 0 3 1
F loadConsumers() 0 62 12
A getDefaultExchangeOptions() 0 7 1
A argumentsStringAsArray() 0 15 3
A getAlias() 0 3 1
A loadAnonConsumers() 0 18 3

How to fix   Complexity   

Complex Class

Complex classes like OldSoundRabbitMqExtension often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use OldSoundRabbitMqExtension, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
6
use Symfony\Component\DependencyInjection\ContainerInterface;
7
use Symfony\Component\DependencyInjection\Extension\Extension;
8
use Symfony\Component\DependencyInjection\ContainerBuilder;
9
use Symfony\Component\DependencyInjection\Definition;
10
use Symfony\Component\DependencyInjection\Reference;
11
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
12
use Symfony\Component\Config\FileLocator;
13
14
/**
15
 * OldSoundRabbitMqExtension.
16
 *
17
 * @author Alvaro Videla
18
 * @author Marc Weistroff <[email protected]>
19
 */
20
class OldSoundRabbitMqExtension extends Extension
21
{
22
    /**
23
     * @var ContainerBuilder
24
     */
25
    private $container;
26
27
    /**
28
     * @var Boolean Whether the data collector is enabled
29
     */
30
    private $collectorEnabled;
31
32
    private $channelIds = array();
33
34
    private $config = array();
35
36 31
    public function load(array $configs, ContainerBuilder $container)
37
    {
38 31
        $this->container = $container;
39
40 31
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
41 31
        $loader->load('rabbitmq.xml');
42
43 31
        $configuration = $this->getConfiguration($configs, $container);
44 31
        $this->config = $this->processConfiguration($configuration, $configs);
45
46 31
        $this->collectorEnabled = $this->config['enable_collector'];
47
48 31
        $this->loadConnections();
49 31
        $this->loadBindings();
50 31
        $this->loadProducers();
51 31
        $this->loadConsumers();
52 31
        $this->loadMultipleConsumers();
53 31
        $this->loadDynamicConsumers();
54 31
        $this->loadBatchConsumers();
55 31
        $this->loadAnonConsumers();
56 31
        $this->loadRpcClients();
57 31
        $this->loadRpcServers();
58
59 31
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
60 28
            $channels = array();
61 28
            foreach (array_unique($this->channelIds) as $id) {
62 28
                $channels[] = new Reference($id);
63
            }
64
65 28
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
66 28
            $definition->replaceArgument(0, $channels);
67
        } else {
68 3
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
69
        }
70 31
    }
71
72 31
    public function getConfiguration(array $config, ContainerBuilder $container)
73
    {
74 31
        return new Configuration($this->getAlias());
75
    }
76
77 31
    protected function loadConnections()
78
    {
79 31
        foreach ($this->config['connections'] as $key => $connection) {
80 31
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
81
            $classParam =
82 31
                $connection['lazy']
83 22
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
84 31
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
85
86 31
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
87 31
                $classParam, $connection,
88
            ));
89 31
            if (isset($connection['connection_parameters_provider'])) {
90
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
91
                unset($connection['connection_parameters_provider']);
92
            }
93 31
            $definition->setPublic(false);
94 31
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
95 31
            $this->container->setDefinition($factoryName, $definition);
96
97 31
            $definition = new Definition($classParam);
98 31
            if (method_exists($definition, 'setFactory')) {
99
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
100 31
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
101
            } else {
102
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
103
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

103
                $definition->/** @scrutinizer ignore-call */ 
104
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
104
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

104
                $definition->/** @scrutinizer ignore-call */ 
105
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
105
            }
106 31
            $definition->addTag('old_sound_rabbit_mq.connection');
107 31
            $definition->setPublic(true);
108
109 31
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
110
        }
111 31
    }
112
113 31
    protected function loadBindings()
114
    {
115 31
        if ($this->config['sandbox']) {
116
            return;
117
        }
118 31
        foreach ($this->config['bindings'] as $binding) {
119 22
            ksort($binding);
120 22
            $definition = new Definition($binding['class']);
121 22
            $definition->addTag('old_sound_rabbit_mq.binding');
122 22
            $definition->addMethodCall('setArguments', array($binding['arguments']));
123 22
            $definition->addMethodCall('setDestination', array($binding['destination']));
124 22
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
125 22
            $definition->addMethodCall('setExchange', array($binding['exchange']));
126 22
            $definition->addMethodCall('isNowait', array($binding['nowait']));
127 22
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
128 22
            $this->injectConnection($definition, $binding['connection']);
129 22
            $key = md5(json_encode($binding));
130 22
            if ($this->collectorEnabled) {
131
                // in the context of a binding, I don't thing logged channels are needed?
132 22
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
133
            }
134 22
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
135
        }
136 31
    }
137
138 31
    protected function loadProducers()
139
    {
140 31
        if ($this->config['sandbox'] == false) {
141 31
            foreach ($this->config['producers'] as $key => $producer) {
142 27
                $definition = new Definition($producer['class']);
143 27
                $definition->setPublic(true);
144 27
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
145 27
                $definition->addTag('old_sound_rabbit_mq.producer');
146
                //this producer doesn't define an exchange -> using AMQP Default
147 27
                if (!isset($producer['exchange_options'])) {
148 1
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
149
                }
150 27
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
151
                //this producer doesn't define a queue -> using AMQP Default
152 27
                if (!isset($producer['queue_options'])) {
153 27
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
154
                }
155 27
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
156 27
                $this->injectConnection($definition, $producer['connection']);
157 27
                if ($this->collectorEnabled) {
158 25
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
159
                }
160 27
                if (!$producer['auto_setup_fabric']) {
161
                    $definition->addMethodCall('disableAutoSetupFabric');
162
                }
163
164 27
                if ($producer['enable_logger']) {
165
                    $this->injectLogger($definition);
166
                }
167
168 27
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
169
170 27
                $this->container->setDefinition($producerServiceName, $definition);
171 27
                if (null !== $producer['service_alias']) {
172 22
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
173
                }
174
            }
175
        } else {
176
            foreach ($this->config['producers'] as $key => $producer) {
177
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
178
                $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer', $key), $definition);
179
            }
180
        }
181 31
    }
182
183 31
    protected function loadConsumers()
184
    {
185 31
        foreach ($this->config['consumers'] as $key => $consumer) {
186 26
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
187 26
            $definition->setPublic(true);
188 26
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
189 26
            $definition->addTag('old_sound_rabbit_mq.consumer');
190
            //this consumer doesn't define an exchange -> using AMQP Default
191 26
            if (!isset($consumer['exchange_options'])) {
192
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
193
            }
194 26
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
195
            //this consumer doesn't define a queue -> using AMQP Default
196 26
            if (!isset($consumer['queue_options'])) {
197
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
198
            }
199 26
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
200 26
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
201
202 26
            if (array_key_exists('qos_options', $consumer)) {
203 22
                $definition->addMethodCall('setQosOptions', array(
204 22
                    $consumer['qos_options']['prefetch_size'],
205 22
                    $consumer['qos_options']['prefetch_count'],
206 22
                    $consumer['qos_options']['global']
207
                ));
208
            }
209
210 26
            if (isset($consumer['idle_timeout'])) {
211
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
212
            }
213 26
            if (isset($consumer['idle_timeout_exit_code'])) {
214
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
215
            }
216 26
            if (isset($consumer['timeout_wait'])) {
217
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
218
            }
219
            if (isset($consumer['graceful_max_execution'])) {
220
                $definition->addMethodCall(
221
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
222
                    array($consumer['graceful_max_execution']['timeout'])
223
                );
224
                $definition->addMethodCall(
225
                    'setGracefulMaxExecutionTimeoutExitCode',
226 26
                    array($consumer['graceful_max_execution']['exit_code'])
227
                );
228
            }
229
            if (!$consumer['auto_setup_fabric']) {
230 26
                $definition->addMethodCall('disableAutoSetupFabric');
231 26
            }
232 25
233
            $this->injectConnection($definition, $consumer['connection']);
234
            if ($this->collectorEnabled) {
235 26
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
236 1
            }
237
238
            if ($consumer['enable_logger']) {
239 26
                $this->injectLogger($definition);
240 26
            }
241 26
242
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
243 31
            $this->container->setDefinition($name, $definition);
244
            $this->addDequeuerAwareCall($consumer['callback'], $name);
245 31
        }
246
    }
247 31
248 22
    protected function loadMultipleConsumers()
249 22
    {
250
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
251 22
            $queues = array();
252
            $callbacks = array();
253
254
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
255
                throw new InvalidConfigurationException(
256
                    "Error on loading $key multiple consumer. " .
257
                    "Either 'queues' or 'queues_provider' parameters should be defined."
258 22
                );
259 22
            }
260 22
261 22
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
262
                $queues[$queueOptions['name']] = $queueOptions;
263
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
264 22
                $callbacks[] = $queueOptions['callback'];
265
            }
266 22
267 22
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
268 22
            $definition
269 22
                ->setPublic(true)
270 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
271
                ->addTag('old_sound_rabbit_mq.multi_consumer')
272 22
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
273 22
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
274 22
275 22
            if ($consumer['queues_provider']) {
276
                $definition->addMethodCall(
277
                    'setQueuesProvider',
278
                    array(new Reference($consumer['queues_provider']))
279 22
                );
280
            }
281
282
            if (array_key_exists('qos_options', $consumer)) {
283
                $definition->addMethodCall('setQosOptions', array(
284
                    $consumer['qos_options']['prefetch_size'],
285
                    $consumer['qos_options']['prefetch_count'],
286
                    $consumer['qos_options']['global']
287 22
                ));
288
            }
289
290 22
            if (isset($consumer['idle_timeout'])) {
291
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
292
            }
293 22
            if (isset($consumer['idle_timeout_exit_code'])) {
294
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
295
            }
296
            if (isset($consumer['timeout_wait'])) {
297
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
298
            }
299
            if (isset($consumer['graceful_max_execution'])) {
300
                $definition->addMethodCall(
301
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
302
                    array($consumer['graceful_max_execution']['timeout'])
303 22
                );
304
                $definition->addMethodCall(
305
                    'setGracefulMaxExecutionTimeoutExitCode',
306
                    array($consumer['graceful_max_execution']['exit_code'])
307 22
                );
308 22
            }
309 22
            if (!$consumer['auto_setup_fabric']) {
310
                $definition->addMethodCall('disableAutoSetupFabric');
311
            }
312 22
313
            $this->injectConnection($definition, $consumer['connection']);
314
            if ($this->collectorEnabled) {
315
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
316 22
            }
317 22
318 22
            if ($consumer['enable_logger']) {
319 22
                $this->injectLogger($definition);
320
            }
321 22
322 22
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
323
            $this->container->setDefinition($name, $definition);
324
            if ($consumer['queues_provider']) {
325 31
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
326
            }
327 31
            foreach ($callbacks as $callback) {
328
                $this->addDequeuerAwareCall($callback, $name);
329 31
            }
330
        }
331 22
    }
332
333
    protected function loadDynamicConsumers()
334
    {
335
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
336
337
            if (empty($consumer['queue_options_provider'])) {
338 22
                throw new InvalidConfigurationException(
339
                    "Error on loading $key dynamic consumer. " .
340 22
                    "'queue_provider' parameter should be defined."
341 22
                );
342 22
            }
343 22
344 22
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
345 22
            $definition
346
                ->setPublic(true)
347 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
348
                ->addTag('old_sound_rabbit_mq.consumer')
349
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
350
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
351
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
352
353
            if (array_key_exists('qos_options', $consumer)) {
354
                $definition->addMethodCall('setQosOptions', array(
355 22
                    $consumer['qos_options']['prefetch_size'],
356 22
                    $consumer['qos_options']['prefetch_count'],
357 22
                    $consumer['qos_options']['global']
358
                ));
359
            }
360 22
361
            $definition->addMethodCall(
362
                'setQueueOptionsProvider',
363 22
                array(new Reference($consumer['queue_options_provider']))
364
            );
365
366 22
            if (isset($consumer['idle_timeout'])) {
367
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
368
            }
369
            if (isset($consumer['idle_timeout_exit_code'])) {
370
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
371
            }
372
            if (isset($consumer['timeout_wait'])) {
373
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
374
            }
375
            if (isset($consumer['graceful_max_execution'])) {
376 22
                $definition->addMethodCall(
377
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
378
                    array($consumer['graceful_max_execution']['timeout'])
379
                );
380 22
                $definition->addMethodCall(
381 22
                    'setGracefulMaxExecutionTimeoutExitCode',
382 22
                    array($consumer['graceful_max_execution']['exit_code'])
383
                );
384
            }
385 22
            if (!$consumer['auto_setup_fabric']) {
386
                $definition->addMethodCall('disableAutoSetupFabric');
387
            }
388
389 22
            $this->injectConnection($definition, $consumer['connection']);
390 22
            if ($this->collectorEnabled) {
391 22
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
392 22
            }
393
394 31
            if ($consumer['enable_logger']) {
395
                $this->injectLogger($definition);
396 31
            }
397
398 31
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
399
            $this->container->setDefinition($name, $definition);
400
            $this->addDequeuerAwareCall($consumer['callback'], $name);
401
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
402
        }
403
    }
404
405
    protected function loadBatchConsumers()
406
    {
407
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
408
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
409
410
            if (!isset($consumer['exchange_options'])) {
411
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
412
            }
413
414
            $definition
415
                ->setPublic(true)
416
                ->addTag('old_sound_rabbit_mq.base_amqp')
417
                ->addTag('old_sound_rabbit_mq.batch_consumer')
418
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
419
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
420
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
421
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
422
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
423
                ->addMethodCall('setQosOptions', array(
424
                    $consumer['qos_options']['prefetch_size'],
425
                    $consumer['qos_options']['prefetch_count'],
426
                    $consumer['qos_options']['global']
427
                ))
428
            ;
429
430
            if (isset($consumer['idle_timeout_exit_code'])) {
431
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
432
            }
433
434
            if (isset($consumer['idle_timeout'])) {
435
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
436
            }
437
438
            if (isset($consumer['graceful_max_execution'])) {
439
                $definition->addMethodCall(
440
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
441
                    array($consumer['graceful_max_execution']['timeout'])
442
                );
443
            }
444
445
            if (!$consumer['auto_setup_fabric']) {
446
                $definition->addMethodCall('disableAutoSetupFabric');
447
            }
448
449
            if ($consumer['keep_alive']) {
450
                $definition->addMethodCall('keepAlive');
451
            }
452
453
            $this->injectConnection($definition, $consumer['connection']);
454
            if ($this->collectorEnabled) {
455 31
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
456
            }
457 31
458
            if ($consumer['enable_logger']) {
459 31
                $this->injectLogger($definition);
460 22
            }
461
462 22
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
463 22
        }
464 22
    }
465 22
466 22
    protected function loadAnonConsumers()
467 22
    {
468 22
        foreach ($this->config['anon_consumers'] as $key => $anon) {
469 22
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
470
            $definition
471
                ->setPublic(true)
472 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
473 22
                ->addTag('old_sound_rabbit_mq.anon_consumer')
474 22
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
475
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
476 31
            $this->injectConnection($definition, $anon['connection']);
477
            if ($this->collectorEnabled) {
478
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
479
            }
480
481
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
482
            $this->container->setDefinition($name, $definition);
483
            $this->addDequeuerAwareCall($anon['callback'], $name);
484
        }
485
    }
486 27
487
    /**
488 27
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
489 1
     * parameter. So we revert the change for right configurations.
490
     *
491 1
     * @param array $config
492
     *
493
     * @return array
494
     */
495 1
    private function normalizeArgumentKeys(array $config)
496 1
    {
497 1
        if (isset($config['arguments'])) {
498
            $arguments = $config['arguments'];
499
            // support for old configuration
500 1
            if (is_string($arguments)) {
501
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

501
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
502 1
            }
503
504 27
            $newArguments = array();
505
            foreach ($arguments as $key => $value) {
506
                if (strstr($key, '_')) {
507
                    $key = str_replace('_', '-', $key);
508
                }
509
                $newArguments[$key] = $value;
510
            }
511
            $config['arguments'] = $newArguments;
512
        }
513
        return $config;
514
    }
515
516
    /**
517
     * Support for arguments provided as string. Support for old configuration files.
518
     *
519
     * @deprecated
520
     * @param string $arguments
521
     * @return array
522
     */
523
    private function argumentsStringAsArray($arguments)
524
    {
525
        $argumentsArray = array();
526
527
        $argumentPairs = explode(',', $arguments);
528
        foreach ($argumentPairs as $argument) {
529
            $argumentPair = explode(':', $argument);
530
            $type = 'S';
531 31
            if (isset($argumentPair[2])) {
532
                $type = $argumentPair[2];
533 31
            }
534 25
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
535 25
        }
536
537 25
        return $argumentsArray;
538 25
    }
539 25
540 25
    protected function loadRpcClients()
541 25
    {
542
        foreach ($this->config['rpc_clients'] as $key => $client) {
543 25
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
544 25
            $definition->setLazy($client['lazy']);
545
            $definition
546 25
                ->addTag('old_sound_rabbit_mq.rpc_client')
547 25
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
548
            $this->injectConnection($definition, $client['connection']);
549 25
            if ($this->collectorEnabled) {
550
                $this->injectLoggedChannel($definition, $key, $client['connection']);
551 25
            }
552
            if (array_key_exists('unserializer', $client)) {
553 31
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
554
            }
555 31
            if (array_key_exists('direct_reply_to', $client)) {
556
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
557 31
            }
558 22
            $definition->setPublic(true);
559
560 22
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
561 22
        }
562 22
    }
563 22
564 22
    protected function loadRpcServers()
565 22
    {
566 22
        foreach ($this->config['rpc_servers'] as $key => $server) {
567 22
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
568
            $definition
569 22
                ->setPublic(true)
570
                ->addTag('old_sound_rabbit_mq.base_amqp')
571
                ->addTag('old_sound_rabbit_mq.rpc_server')
572
                ->addMethodCall('initServer', array($key))
573
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
574
            $this->injectConnection($definition, $server['connection']);
575
            if ($this->collectorEnabled) {
576 22
                $this->injectLoggedChannel($definition, $key, $server['connection']);
577 22
            }
578
            if (array_key_exists('qos_options', $server)) {
579 22
                $definition->addMethodCall('setQosOptions', array(
580 22
                    $server['qos_options']['prefetch_size'],
581
                    $server['qos_options']['prefetch_count'],
582 22
                    $server['qos_options']['global']
583 22
                ));
584
            }
585 22
            if (array_key_exists('exchange_options', $server)) {
586
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
587 31
            }
588
            if (array_key_exists('queue_options', $server)) {
589 28
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
590
            }
591 28
            if (array_key_exists('serializer', $server)) {
592 28
                $definition->addMethodCall('setSerializer', array($server['serializer']));
593
            }
594 28
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
595 28
        }
596 28
    }
597
598 28
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
599
    {
600 28
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
601 28
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
602 28
        $channel
603
            ->setPublic(false)
604 30
            ->addTag('old_sound_rabbit_mq.logged_channel');
605
        $this->injectConnection($channel, $connectionName);
606 30
607 30
        $this->container->setDefinition($id, $channel);
608
609 31
        $this->channelIds[] = $id;
610
        $definition->addArgument(new Reference($id));
611 31
    }
612
613
    protected function injectConnection(Definition $definition, $connectionName)
614
    {
615
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
616
    }
617
618
    public function getAlias()
619
    {
620 26
        return 'old_sound_rabbit_mq';
621
    }
622 26
623 26
    /**
624
     * Add proper dequeuer aware call
625
     *
626
     * @param string $callback
627
     * @param string $name
628
     */
629
    protected function addDequeuerAwareCall($callback, $name)
630
    {
631
        if (!$this->container->has($callback)) {
632
            return;
633 1
        }
634
635 1
        $callbackDefinition = $this->container->findDefinition($callback);
636 1
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
637
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
638 1
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
639 1
        }
640
    }
641
642
    private function injectLogger(Definition $definition)
643
    {
644
        $definition->addTag('monolog.logger', array(
645
            'channel' => 'phpamqplib'
646 1
        ));
647
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
648
    }
649 1
650
    /**
651
     * Get default AMQP exchange options
652
     *
653
     * @return array
654
     */
655
    protected function getDefaultExchangeOptions()
656
    {
657
        return array(
658
            'name' => '',
659
            'type' => 'direct',
660
            'passive' => true,
661 27
            'declare' => false
662
        );
663
    }
664 27
665
    /**
666
     * Get default AMQP queue options
667
     *
668
     * @return array
669
     */
670
    protected function getDefaultQueueOptions()
671
    {
672
        return array(
673
            'name' => '',
674
            'declare' => false
675
        );
676
    }
677
}
678