Passed
Push — master ( 2f0dcd...638ea4 )
by Mihai
02:42 queued 12s
created

OldSoundRabbitMqExtension   F

Complexity

Total Complexity 112

Size/Duplication

Total Lines 692
Duplicated Lines 0 %

Test Coverage

Coverage 71.71%

Importance

Changes 22
Bugs 3 Features 2
Metric Value
eloc 395
c 22
b 3
f 2
dl 0
loc 692
ccs 294
cts 410
cp 0.7171
rs 2
wmc 112

21 Methods

Rating   Name   Duplication   Size   Complexity  
A load() 0 33 4
A loadBindings() 0 22 4
A getConfiguration() 0 3 1
C loadDynamicConsumers() 0 69 11
A loadRpcClients() 0 21 5
C loadBatchConsumers() 0 58 10
B loadRpcServers() 0 31 7
A injectLoggedChannel() 0 13 1
F loadMultipleConsumers() 0 81 16
A injectLogger() 0 6 1
A addDequeuerAwareCall() 0 10 3
C loadProducers() 0 65 14
A normalizeArgumentKeys() 0 19 5
A getDefaultQueueOptions() 0 5 1
A injectConnection() 0 3 1
F loadConsumers() 0 73 14
A getDefaultExchangeOptions() 0 7 1
A argumentsStringAsArray() 0 15 3
A getAlias() 0 3 1
A loadAnonConsumers() 0 18 3
B loadConnections() 0 33 6

How to fix   Complexity   

Complex Class

Complex classes like OldSoundRabbitMqExtension often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use OldSoundRabbitMqExtension, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
6
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
7
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
8
use Symfony\Component\Config\FileLocator;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\ContainerInterface;
11
use Symfony\Component\DependencyInjection\Definition;
12
use Symfony\Component\DependencyInjection\Extension\Extension;
13
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
14
use Symfony\Component\DependencyInjection\Reference;
15
16
/**
17
 * OldSoundRabbitMqExtension.
18
 *
19
 * @author Alvaro Videla
20
 * @author Marc Weistroff <[email protected]>
21
 */
22
class OldSoundRabbitMqExtension extends Extension
23
{
24
    /**
25
     * @var ContainerBuilder
26
     */
27
    private $container;
28
29
    /**
30
     * @var Boolean Whether the data collector is enabled
31
     */
32
    private $collectorEnabled;
33
34
    private $channelIds = array();
35
36
    private $config = array();
37
38 34
    public function load(array $configs, ContainerBuilder $container)
39
    {
40 34
        $this->container = $container;
41
42 34
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
43 34
        $loader->load('rabbitmq.xml');
44
45 34
        $configuration = $this->getConfiguration($configs, $container);
46 34
        $this->config = $this->processConfiguration($configuration, $configs);
47
48 34
        $this->collectorEnabled = $this->config['enable_collector'];
49
50 34
        $this->loadConnections();
51 34
        $this->loadBindings();
52 34
        $this->loadProducers();
53 34
        $this->loadConsumers();
54 34
        $this->loadMultipleConsumers();
55 34
        $this->loadDynamicConsumers();
56 34
        $this->loadBatchConsumers();
57 34
        $this->loadAnonConsumers();
58 34
        $this->loadRpcClients();
59 34
        $this->loadRpcServers();
60
61 34
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
62 31
            $channels = array();
63 31
            foreach (array_unique($this->channelIds) as $id) {
64 31
                $channels[] = new Reference($id);
65
            }
66
67 31
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
68 31
            $definition->replaceArgument(0, $channels);
69
        } else {
70 3
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
71
        }
72 34
    }
73
74 34
    public function getConfiguration(array $config, ContainerBuilder $container)
75
    {
76 34
        return new Configuration($this->getAlias());
77
    }
78
79 34
    protected function loadConnections()
80
    {
81 34
        foreach ($this->config['connections'] as $key => $connection) {
82 34
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
83 34
            $classParam =
84 34
                $connection['lazy']
85 25
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
86 34
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
87
88 34
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
89 34
                $classParam, $connection,
90
            ));
91 34
            if (isset($connection['connection_parameters_provider'])) {
92
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
93
                unset($connection['connection_parameters_provider']);
94
            }
95 34
            $definition->setPublic(false);
96 34
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
97 34
            $this->container->setDefinition($factoryName, $definition);
98
99 34
            $definition = new Definition($classParam);
100 34
            if (method_exists($definition, 'setFactory')) {
101
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
102 34
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
103
            } else {
104
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
105
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

105
                $definition->/** @scrutinizer ignore-call */ 
106
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
106
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107
            }
108 34
            $definition->addTag('old_sound_rabbit_mq.connection');
109 34
            $definition->setPublic(true);
110
111 34
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
112
        }
113 34
    }
114
115 34
    protected function loadBindings()
116
    {
117 34
        if ($this->config['sandbox']) {
118
            return;
119
        }
120 34
        foreach ($this->config['bindings'] as $binding) {
121 25
            ksort($binding);
122 25
            $definition = new Definition($binding['class']);
123 25
            $definition->addTag('old_sound_rabbit_mq.binding');
124 25
            $definition->addMethodCall('setArguments', array($binding['arguments']));
125 25
            $definition->addMethodCall('setDestination', array($binding['destination']));
126 25
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
127 25
            $definition->addMethodCall('setExchange', array($binding['exchange']));
128 25
            $definition->addMethodCall('isNowait', array($binding['nowait']));
129 25
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
130 25
            $this->injectConnection($definition, $binding['connection']);
131 25
            $key = md5(json_encode($binding));
132 25
            if ($this->collectorEnabled) {
133
                // in the context of a binding, I don't thing logged channels are needed?
134 25
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
135
            }
136 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
137
        }
138 34
    }
139
140 34
    protected function loadProducers()
141
    {
142 34
        if ($this->config['sandbox'] == false) {
143 34
            foreach ($this->config['producers'] as $key => $producer) {
144 30
                $definition = new Definition($producer['class']);
145 30
                $definition->setPublic(true);
146 30
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
147 30
                $definition->addTag('old_sound_rabbit_mq.producer');
148
                //this producer doesn't define an exchange -> using AMQP Default
149 30
                if (!isset($producer['exchange_options'])) {
150 1
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
151
                }
152 30
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
153
                //this producer doesn't define a queue -> using AMQP Default
154 30
                if (!isset($producer['queue_options'])) {
155 30
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
156
                }
157 30
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
158 30
                $this->injectConnection($definition, $producer['connection']);
159 30
                if ($this->collectorEnabled) {
160 28
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
161
                }
162 30
                if (!$producer['auto_setup_fabric']) {
163
                    $definition->addMethodCall('disableAutoSetupFabric');
164
                }
165
166 30
                if ($producer['enable_logger']) {
167
                    $this->injectLogger($definition);
168
                }
169
170 30
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
171
172 30
                $this->container->setDefinition($producerServiceName, $definition);
173 30
                if (null !== $producer['service_alias']) {
174 25
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
175
                }
176
177
                // register alias for argument auto wiring
178 30
                if (method_exists($this->container, 'registerAliasForArgument')) {
179 30
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
180 30
                    $this->container
181 30
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
182 30
                        ->setPublic(false);
183
184 30
                    $this->container
185 30
                        ->registerAliasForArgument($producerServiceName, $producer['class'], $argName)
186 30
                        ->setPublic(false);
187
                }
188
189 30
                $definition->addMethodCall('setDefaultRoutingKey', array($producer['default_routing_key']));
190 30
                $definition->addMethodCall('setContentType', array($producer['default_content_type']));
191 30
                $definition->addMethodCall('setDeliveryMode', array($producer['default_delivery_mode']));
192
            }
193
        } else {
194
            foreach ($this->config['producers'] as $key => $producer) {
195
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
196
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
197
                $this->container->setDefinition($producerServiceName, $definition);
198
199
                // register alias for argumen auto wiring
200
                if (method_exists($this->container, 'registerAliasForArgument')) {
201
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
202
                    $this->container
203
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
204
                        ->setPublic(false);
205
                }
206
            }
207
        }
208 34
    }
209
210 34
    protected function loadConsumers()
211
    {
212 34
        foreach ($this->config['consumers'] as $key => $consumer) {
213 29
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
214 29
            $definition->setPublic(true);
215 29
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
216 29
            $definition->addTag('old_sound_rabbit_mq.consumer');
217
            //this consumer doesn't define an exchange -> using AMQP Default
218 29
            if (!isset($consumer['exchange_options'])) {
219
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
220
            }
221 29
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
222
            //this consumer doesn't define a queue -> using AMQP Default
223 29
            if (!isset($consumer['queue_options'])) {
224
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
225
            }
226 29
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
227 29
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
228
229 29
            if (array_key_exists('qos_options', $consumer)) {
230 25
                $definition->addMethodCall('setQosOptions', array(
231 25
                    $consumer['qos_options']['prefetch_size'],
232 25
                    $consumer['qos_options']['prefetch_count'],
233 25
                    $consumer['qos_options']['global']
234
                ));
235
            }
236
237 29
            if (isset($consumer['idle_timeout'])) {
238
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
239
            }
240 29
            if (isset($consumer['idle_timeout_exit_code'])) {
241
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
242
            }
243 29
            if (isset($consumer['timeout_wait'])) {
244 25
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
245
            }
246 29
            if (isset($consumer['graceful_max_execution'])) {
247
                $definition->addMethodCall(
248
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
249
                    array($consumer['graceful_max_execution']['timeout'])
250
                );
251
                $definition->addMethodCall(
252
                    'setGracefulMaxExecutionTimeoutExitCode',
253
                    array($consumer['graceful_max_execution']['exit_code'])
254
                );
255
            }
256 29
            if (!$consumer['auto_setup_fabric']) {
257
                $definition->addMethodCall('disableAutoSetupFabric');
258
            }
259
260 29
            $this->injectConnection($definition, $consumer['connection']);
261 29
            if ($this->collectorEnabled) {
262 28
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
263
            }
264
265 29
            if ($consumer['enable_logger']) {
266 1
                $this->injectLogger($definition);
267
            }
268
269 29
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
270 29
            $this->container->setDefinition($name, $definition);
271 29
            $this->addDequeuerAwareCall($consumer['callback'], $name);
272
273
            // register alias for argument auto wiring
274 29
            if (method_exists($this->container, 'registerAliasForArgument')) {
275 29
                $argName = !str_ends_with(strtolower($key), 'consumer') ? sprintf('%sConsumer', $key) : $key;
276 29
                $this->container
277 29
                    ->registerAliasForArgument($name, ConsumerInterface::class, $argName)
278 29
                    ->setPublic(false);
279
280 29
                $this->container
281 29
                    ->registerAliasForArgument($name, '%old_sound_rabbit_mq.consumer.class%', $argName)
282 29
                    ->setPublic(false);
283
                }
284
        }
285 34
    }
286
287 34
    protected function loadMultipleConsumers()
288
    {
289 34
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
290 25
            $queues = array();
291 25
            $callbacks = array();
292
293 25
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
294
                throw new InvalidConfigurationException(
295
                    "Error on loading $key multiple consumer. " .
296
                    "Either 'queues' or 'queues_provider' parameters should be defined."
297
                );
298
            }
299
300 25
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
301 25
                $queues[$queueOptions['name']] = $queueOptions;
302 25
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
303 25
                $callbacks[] = $queueOptions['callback'];
304
            }
305
306 25
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
307
            $definition
308 25
                ->setPublic(true)
309 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
310 25
                ->addTag('old_sound_rabbit_mq.multi_consumer')
311 25
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
312 25
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
313
314 25
            if ($consumer['queues_provider']) {
315 25
                $definition->addMethodCall(
316 25
                    'setQueuesProvider',
317 25
                    array(new Reference($consumer['queues_provider']))
318
                );
319
            }
320
321 25
            if (array_key_exists('qos_options', $consumer)) {
322
                $definition->addMethodCall('setQosOptions', array(
323
                    $consumer['qos_options']['prefetch_size'],
324
                    $consumer['qos_options']['prefetch_count'],
325
                    $consumer['qos_options']['global']
326
                ));
327
            }
328
329 25
            if (isset($consumer['idle_timeout'])) {
330
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
331
            }
332 25
            if (isset($consumer['idle_timeout_exit_code'])) {
333
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
334
            }
335 25
            if (isset($consumer['timeout_wait'])) {
336 25
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
337
            }
338 25
            if (isset($consumer['graceful_max_execution'])) {
339
                $definition->addMethodCall(
340
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
341
                    array($consumer['graceful_max_execution']['timeout'])
342
                );
343
                $definition->addMethodCall(
344
                    'setGracefulMaxExecutionTimeoutExitCode',
345
                    array($consumer['graceful_max_execution']['exit_code'])
346
                );
347
            }
348 25
            if (!$consumer['auto_setup_fabric']) {
349
                $definition->addMethodCall('disableAutoSetupFabric');
350
            }
351
352 25
            $this->injectConnection($definition, $consumer['connection']);
353 25
            if ($this->collectorEnabled) {
354 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
355
            }
356
357 25
            if ($consumer['enable_logger']) {
358
                $this->injectLogger($definition);
359
            }
360
361 25
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
362 25
            $this->container->setDefinition($name, $definition);
363 25
            if ($consumer['queues_provider']) {
364 25
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
365
            }
366 25
            foreach ($callbacks as $callback) {
367 25
                $this->addDequeuerAwareCall($callback, $name);
368
            }
369
        }
370 34
    }
371
372 34
    protected function loadDynamicConsumers()
373
    {
374 34
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
375
376 25
            if (empty($consumer['queue_options_provider'])) {
377
                throw new InvalidConfigurationException(
378
                    "Error on loading $key dynamic consumer. " .
379
                    "'queue_provider' parameter should be defined."
380
                );
381
            }
382
383 25
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
384
            $definition
385 25
                ->setPublic(true)
386 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
387 25
                ->addTag('old_sound_rabbit_mq.consumer')
388 25
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
389 25
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
390 25
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
391
392 25
            if (array_key_exists('qos_options', $consumer)) {
393
                $definition->addMethodCall('setQosOptions', array(
394
                    $consumer['qos_options']['prefetch_size'],
395
                    $consumer['qos_options']['prefetch_count'],
396
                    $consumer['qos_options']['global']
397
                ));
398
            }
399
400 25
            $definition->addMethodCall(
401 25
                'setQueueOptionsProvider',
402 25
                array(new Reference($consumer['queue_options_provider']))
403
            );
404
405 25
            if (isset($consumer['idle_timeout'])) {
406
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
407
            }
408 25
            if (isset($consumer['idle_timeout_exit_code'])) {
409
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
410
            }
411 25
            if (isset($consumer['timeout_wait'])) {
412
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
413
            }
414 25
            if (isset($consumer['graceful_max_execution'])) {
415
                $definition->addMethodCall(
416
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
417
                    array($consumer['graceful_max_execution']['timeout'])
418
                );
419
                $definition->addMethodCall(
420
                    'setGracefulMaxExecutionTimeoutExitCode',
421
                    array($consumer['graceful_max_execution']['exit_code'])
422
                );
423
            }
424 25
            if (!$consumer['auto_setup_fabric']) {
425
                $definition->addMethodCall('disableAutoSetupFabric');
426
            }
427
428 25
            $this->injectConnection($definition, $consumer['connection']);
429 25
            if ($this->collectorEnabled) {
430 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
431
            }
432
433 25
            if ($consumer['enable_logger']) {
434
                $this->injectLogger($definition);
435
            }
436
437 25
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
438 25
            $this->container->setDefinition($name, $definition);
439 25
            $this->addDequeuerAwareCall($consumer['callback'], $name);
440 25
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
441
        }
442 34
    }
443
444 34
    protected function loadBatchConsumers()
445
    {
446 34
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
447
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
448
449
            if (!isset($consumer['exchange_options'])) {
450
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
451
            }
452
453
            $definition
454
                ->setPublic(true)
455
                ->addTag('old_sound_rabbit_mq.base_amqp')
456
                ->addTag('old_sound_rabbit_mq.batch_consumer')
457
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
458
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
459
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
460
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
461
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
462
                ->addMethodCall('setQosOptions', array(
463
                    $consumer['qos_options']['prefetch_size'],
464
                    $consumer['qos_options']['prefetch_count'],
465
                    $consumer['qos_options']['global']
466
                ))
467
            ;
468
469
            if (isset($consumer['idle_timeout_exit_code'])) {
470
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
471
            }
472
473
            if (isset($consumer['idle_timeout'])) {
474
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
475
            }
476
477
            if (isset($consumer['graceful_max_execution'])) {
478
                $definition->addMethodCall(
479
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
480
                    array($consumer['graceful_max_execution']['timeout'])
481
                );
482
            }
483
484
            if (!$consumer['auto_setup_fabric']) {
485
                $definition->addMethodCall('disableAutoSetupFabric');
486
            }
487
488
            if ($consumer['keep_alive']) {
489
                $definition->addMethodCall('keepAlive');
490
            }
491
492
            $this->injectConnection($definition, $consumer['connection']);
493
            if ($this->collectorEnabled) {
494
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
495
            }
496
497
            if ($consumer['enable_logger']) {
498
                $this->injectLogger($definition);
499
            }
500
501
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
502
        }
503 34
    }
504
505 34
    protected function loadAnonConsumers()
506
    {
507 34
        foreach ($this->config['anon_consumers'] as $key => $anon) {
508 25
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
509
            $definition
510 25
                ->setPublic(true)
511 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
512 25
                ->addTag('old_sound_rabbit_mq.anon_consumer')
513 25
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
514 25
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
515 25
            $this->injectConnection($definition, $anon['connection']);
516 25
            if ($this->collectorEnabled) {
517 25
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
518
            }
519
520 25
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
521 25
            $this->container->setDefinition($name, $definition);
522 25
            $this->addDequeuerAwareCall($anon['callback'], $name);
523
        }
524 34
    }
525
526
    /**
527
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
528
     * parameter. So we revert the change for right configurations.
529
     *
530
     * @param array $config
531
     *
532
     * @return array
533
     */
534 30
    private function normalizeArgumentKeys(array $config)
535
    {
536 30
        if (isset($config['arguments'])) {
537 1
            $arguments = $config['arguments'];
538
            // support for old configuration
539 1
            if (is_string($arguments)) {
540
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

540
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
541
            }
542
543 1
            $newArguments = array();
544 1
            foreach ($arguments as $key => $value) {
545 1
                if (strstr($key, '_')) {
546
                    $key = str_replace('_', '-', $key);
547
                }
548 1
                $newArguments[$key] = $value;
549
            }
550 1
            $config['arguments'] = $newArguments;
551
        }
552 30
        return $config;
553
    }
554
555
    /**
556
     * Support for arguments provided as string. Support for old configuration files.
557
     *
558
     * @deprecated
559
     * @param string $arguments
560
     * @return array
561
     */
562
    private function argumentsStringAsArray($arguments)
563
    {
564
        $argumentsArray = array();
565
566
        $argumentPairs = explode(',', $arguments);
567
        foreach ($argumentPairs as $argument) {
568
            $argumentPair = explode(':', $argument);
569
            $type = 'S';
570
            if (isset($argumentPair[2])) {
571
                $type = $argumentPair[2];
572
            }
573
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
574
        }
575
576
        return $argumentsArray;
577
    }
578
579 34
    protected function loadRpcClients()
580
    {
581 34
        foreach ($this->config['rpc_clients'] as $key => $client) {
582 28
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
583 28
            $definition->setLazy($client['lazy']);
584
            $definition
585 28
                ->addTag('old_sound_rabbit_mq.rpc_client')
586 28
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
587 28
            $this->injectConnection($definition, $client['connection']);
588 28
            if ($this->collectorEnabled) {
589 28
                $this->injectLoggedChannel($definition, $key, $client['connection']);
590
            }
591 28
            if (array_key_exists('unserializer', $client)) {
592 28
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
593
            }
594 28
            if (array_key_exists('direct_reply_to', $client)) {
595 28
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
596
            }
597 28
            $definition->setPublic(true);
598
599 28
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
600
        }
601 34
    }
602
603 34
    protected function loadRpcServers()
604
    {
605 34
        foreach ($this->config['rpc_servers'] as $key => $server) {
606 25
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
607
            $definition
608 25
                ->setPublic(true)
609 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
610 25
                ->addTag('old_sound_rabbit_mq.rpc_server')
611 25
                ->addMethodCall('initServer', array($key))
612 25
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
613 25
            $this->injectConnection($definition, $server['connection']);
614 25
            if ($this->collectorEnabled) {
615 25
                $this->injectLoggedChannel($definition, $key, $server['connection']);
616
            }
617 25
            if (array_key_exists('qos_options', $server)) {
618
                $definition->addMethodCall('setQosOptions', array(
619
                    $server['qos_options']['prefetch_size'],
620
                    $server['qos_options']['prefetch_count'],
621
                    $server['qos_options']['global']
622
                ));
623
            }
624 25
            if (array_key_exists('exchange_options', $server)) {
625 25
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
626
            }
627 25
            if (array_key_exists('queue_options', $server)) {
628 25
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
629
            }
630 25
            if (array_key_exists('serializer', $server)) {
631 25
                $definition->addMethodCall('setSerializer', array($server['serializer']));
632
            }
633 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
634
        }
635 34
    }
636
637 31
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
638
    {
639 31
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
640 31
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
641
        $channel
642 31
            ->setPublic(false)
643 31
            ->addTag('old_sound_rabbit_mq.logged_channel');
644 31
        $this->injectConnection($channel, $connectionName);
645
646 31
        $this->container->setDefinition($id, $channel);
647
648 31
        $this->channelIds[] = $id;
649 31
        $definition->addArgument(new Reference($id));
650 31
    }
651
652 33
    protected function injectConnection(Definition $definition, $connectionName)
653
    {
654 33
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
655 33
    }
656
657 34
    public function getAlias() : string
658
    {
659 34
        return 'old_sound_rabbit_mq';
660
    }
661
662
    /**
663
     * Add proper dequeuer aware call
664
     *
665
     * @param string $callback
666
     * @param string $name
667
     */
668 29
    protected function addDequeuerAwareCall($callback, $name)
669
    {
670 29
        if (!$this->container->has($callback)) {
671 29
            return;
672
        }
673
674
        $callbackDefinition = $this->container->findDefinition($callback);
675
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
676
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
677
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
678
        }
679
    }
680
681 1
    private function injectLogger(Definition $definition)
682
    {
683 1
        $definition->addTag('monolog.logger', array(
684 1
            'channel' => 'phpamqplib'
685
        ));
686 1
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
687 1
    }
688
689
    /**
690
     * Get default AMQP exchange options
691
     *
692
     * @return array
693
     */
694 1
    protected function getDefaultExchangeOptions()
695
    {
696
        return array(
697 1
            'name' => '',
698
            'type' => 'direct',
699
            'passive' => true,
700
            'declare' => false
701
        );
702
    }
703
704
    /**
705
     * Get default AMQP queue options
706
     *
707
     * @return array
708
     */
709 30
    protected function getDefaultQueueOptions()
710
    {
711
        return array(
712 30
            'name' => '',
713
            'declare' => false
714
        );
715
    }
716
}
717