Passed
Pull Request — master (#661)
by
unknown
08:36
created

OldSoundRabbitMqExtension::loadMultipleConsumers()   F

Complexity

Conditions 16
Paths 4098

Size

Total Lines 81
Code Lines 53

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 34
CRAP Score 24.3886

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 16
eloc 53
c 2
b 0
f 0
nc 4098
nop 0
dl 0
loc 81
ccs 34
cts 50
cp 0.68
crap 24.3886
rs 1.4

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
6
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
7
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
8
use Symfony\Component\Config\FileLocator;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\ContainerInterface;
11
use Symfony\Component\DependencyInjection\Definition;
12
use Symfony\Component\DependencyInjection\Extension\Extension;
13
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
14
use Symfony\Component\DependencyInjection\Reference;
15
16
/**
17
 * OldSoundRabbitMqExtension.
18
 *
19
 * @author Alvaro Videla
20
 * @author Marc Weistroff <[email protected]>
21
 */
22
class OldSoundRabbitMqExtension extends Extension
23
{
24
    /**
25
     * @var ContainerBuilder
26
     */
27
    private $container;
28
29
    /**
30
     * @var Boolean Whether the data collector is enabled
31
     */
32
    private $collectorEnabled;
33
34
    private $channelIds = array();
35
36
    private $config = array();
37
38 34
    public function load(array $configs, ContainerBuilder $container)
39
    {
40 34
        $this->container = $container;
41
42 34
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
43 34
        $loader->load('rabbitmq.xml');
44
45 34
        $configuration = $this->getConfiguration($configs, $container);
46 34
        $this->config = $this->processConfiguration($configuration, $configs);
47
48 34
        $this->collectorEnabled = $this->config['enable_collector'];
49
50 34
        $this->loadConnections();
51 34
        $this->loadBindings();
52 34
        $this->loadProducers();
53 34
        $this->loadConsumers();
54 34
        $this->loadMultipleConsumers();
55 34
        $this->loadDynamicConsumers();
56 34
        $this->loadBatchConsumers();
57 34
        $this->loadAnonConsumers();
58 34
        $this->loadRpcClients();
59 34
        $this->loadRpcServers();
60
61 34
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
62 31
            $channels = array();
63 31
            foreach (array_unique($this->channelIds) as $id) {
64 31
                $channels[] = new Reference($id);
65
            }
66
67 31
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
68 31
            $definition->replaceArgument(0, $channels);
69
        } else {
70 3
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
71
        }
72 34
    }
73
74 34
    public function getConfiguration(array $config, ContainerBuilder $container)
75
    {
76 34
        return new Configuration($this->getAlias());
77
    }
78
79 34
    protected function loadConnections()
80
    {
81 34
        foreach ($this->config['connections'] as $key => $connection) {
82 34
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
83
            $classParam =
84 34
                $connection['lazy']
85 25
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
86 34
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
87
88 34
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
89 34
                $classParam, $connection,
90
            ));
91 34
            if (isset($connection['connection_parameters_provider'])) {
92
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
93
                unset($connection['connection_parameters_provider']);
94
            }
95 34
            $definition->setPublic(false);
96 34
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
97 34
            $this->container->setDefinition($factoryName, $definition);
98
99 34
            $definition = new Definition($classParam);
100 34
            if (method_exists($definition, 'setFactory')) {
101
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
102 34
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
103
            } else {
104
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
105
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

105
                $definition->/** @scrutinizer ignore-call */ 
106
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
106
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107
            }
108 34
            $definition->addTag('old_sound_rabbit_mq.connection');
109 34
            $definition->setPublic(true);
110
111 34
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
112
        }
113 34
    }
114
115 34
    protected function loadBindings()
116
    {
117 34
        if ($this->config['sandbox']) {
118
            return;
119
        }
120 34
        foreach ($this->config['bindings'] as $binding) {
121 25
            ksort($binding);
122 25
            $definition = new Definition($binding['class']);
123 25
            $definition->addTag('old_sound_rabbit_mq.binding');
124 25
            $definition->addMethodCall('setArguments', array($binding['arguments']));
125 25
            $definition->addMethodCall('setDestination', array($binding['destination']));
126 25
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
127 25
            $definition->addMethodCall('setExchange', array($binding['exchange']));
128 25
            $definition->addMethodCall('isNowait', array($binding['nowait']));
129 25
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
130 25
            $this->injectConnection($definition, $binding['connection']);
131 25
            $key = md5(json_encode($binding));
132 25
            if ($this->collectorEnabled) {
133
                // in the context of a binding, I don't thing logged channels are needed?
134 25
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
135
            }
136 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
137
        }
138 34
    }
139
140 34
    protected function loadProducers()
141
    {
142 34
        if ($this->config['sandbox'] == false) {
143 34
            foreach ($this->config['producers'] as $key => $producer) {
144 30
                $definition = new Definition($producer['class']);
145 30
                $definition->setPublic(true);
146 30
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
147 30
                $definition->addTag('old_sound_rabbit_mq.producer');
148
                //this producer doesn't define an exchange -> using AMQP Default
149 30
                if (!isset($producer['exchange_options'])) {
150 1
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
151
                }
152 30
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
153
                //this producer doesn't define a queue -> using AMQP Default
154 30
                if (!isset($producer['queue_options'])) {
155 30
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
156
                }
157 30
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
158 30
                $this->injectConnection($definition, $producer['connection']);
159 30
                if ($this->collectorEnabled) {
160 28
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
161
                }
162 30
                if (!$producer['auto_setup_fabric']) {
163
                    $definition->addMethodCall('disableAutoSetupFabric');
164
                }
165
166 30
                if ($producer['enable_logger']) {
167
                    $this->injectLogger($definition);
168
                }
169
170 30
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
171
172 30
                $this->container->setDefinition($producerServiceName, $definition);
173 30
                if (null !== $producer['service_alias']) {
174 25
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
175
                }
176
177
                // register alias for argument auto wiring
178 30
                if (method_exists($this->container, 'registerAliasForArgument')) {
179 30
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
180 30
                    $this->container
181 30
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
182 30
                        ->setPublic(false);
183
184 30
                    $this->container
185 30
                        ->registerAliasForArgument($producerServiceName, $producer['class'], $argName)
186 30
                        ->setPublic(false);
187
                }
188
189
                $definition->addMethodCall('setDefaultRoutingKey', array($producer['default_routing_key']));
190
                if (isset($producer['validator'])) {
191
                    $definition->addMethodCall('setValidator', array($producer['validator']['class'],
192
                                                                     $producer['validator']['schema'],
193
                                                                     $producer['validator']['schema_url'],
194
                                                                     $producer['validator']['definitions']));
195
                }
196
            }
197
        } else {
198
            foreach ($this->config['producers'] as $key => $producer) {
199
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
200
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
201
                $this->container->setDefinition($producerServiceName, $definition);
202
203
                // register alias for argumen auto wiring
204 34
                if (method_exists($this->container, 'registerAliasForArgument')) {
205
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
206 34
                    $this->container
207
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
208 34
                        ->setPublic(false);
209 29
                }
210 29
            }
211 29
        }
212 29
    }
213
214 29
    protected function loadConsumers()
215
    {
216
        foreach ($this->config['consumers'] as $key => $consumer) {
217 29
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
218
            $definition->setPublic(true);
219 29
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
220
            $definition->addTag('old_sound_rabbit_mq.consumer');
221
            //this consumer doesn't define an exchange -> using AMQP Default
222 29
            if (!isset($consumer['exchange_options'])) {
223 29
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
224
            }
225 29
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
226 25
            //this consumer doesn't define a queue -> using AMQP Default
227 25
            if (!isset($consumer['queue_options'])) {
228 25
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
229 25
            }
230
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
231
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
232
233 29
            if (array_key_exists('qos_options', $consumer)) {
234
                $definition->addMethodCall('setQosOptions', array(
235
                    $consumer['qos_options']['prefetch_size'],
236 29
                    $consumer['qos_options']['prefetch_count'],
237
                    $consumer['qos_options']['global']
238
                ));
239 29
            }
240 25
241
            if (isset($consumer['idle_timeout'])) {
242 29
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
243
            }
244
            if (isset($consumer['idle_timeout_exit_code'])) {
245
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
246
            }
247
            if (isset($consumer['timeout_wait'])) {
248
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
249
            }
250
            if (isset($consumer['graceful_max_execution'])) {
251
                $definition->addMethodCall(
252 29
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
253
                    array($consumer['graceful_max_execution']['timeout'])
254
                );
255
                $definition->addMethodCall(
256 29
                    'setGracefulMaxExecutionTimeoutExitCode',
257 29
                    array($consumer['graceful_max_execution']['exit_code'])
258 28
                );
259
            }
260
            if (!$consumer['auto_setup_fabric']) {
261 29
                $definition->addMethodCall('disableAutoSetupFabric');
262 1
            }
263
264
            $this->injectConnection($definition, $consumer['connection']);
265 29
            if ($this->collectorEnabled) {
266 29
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
267 29
            }
268
269
            if ($consumer['enable_logger']) {
270 29
                $this->injectLogger($definition);
271 29
            }
272 29
273 29
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
274 29
            $this->container->setDefinition($name, $definition);
275
            $this->addDequeuerAwareCall($consumer['callback'], $name);
276 29
277 29
            // register alias for argument auto wiring
278 29
            if (method_exists($this->container, 'registerAliasForArgument')) {
279
                $argName = !str_ends_with(strtolower($key), 'consumer') ? sprintf('%sConsumer', $key) : $key;
280
                $this->container
281 34
                    ->registerAliasForArgument($name, ConsumerInterface::class, $argName)
282
                    ->setPublic(false);
283 34
284
                $this->container
285 34
                    ->registerAliasForArgument($name, '%old_sound_rabbit_mq.consumer.class%', $argName)
286 25
                    ->setPublic(false);
287 25
                }
288
        }
289 25
    }
290
291
    protected function loadMultipleConsumers()
292
    {
293
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
294
            $queues = array();
295
            $callbacks = array();
296 25
297 25
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
298 25
                throw new InvalidConfigurationException(
299 25
                    "Error on loading $key multiple consumer. " .
300
                    "Either 'queues' or 'queues_provider' parameters should be defined."
301
                );
302 25
            }
303
304 25
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
305 25
                $queues[$queueOptions['name']] = $queueOptions;
306 25
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
307 25
                $callbacks[] = $queueOptions['callback'];
308 25
            }
309
310 25
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
311 25
            $definition
312 25
                ->setPublic(true)
313 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
314
                ->addTag('old_sound_rabbit_mq.multi_consumer')
315
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
316
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
317 25
318
            if ($consumer['queues_provider']) {
319
                $definition->addMethodCall(
320
                    'setQueuesProvider',
321
                    array(new Reference($consumer['queues_provider']))
322
                );
323
            }
324
325 25
            if (array_key_exists('qos_options', $consumer)) {
326
                $definition->addMethodCall('setQosOptions', array(
327
                    $consumer['qos_options']['prefetch_size'],
328 25
                    $consumer['qos_options']['prefetch_count'],
329
                    $consumer['qos_options']['global']
330
                ));
331 25
            }
332 25
333
            if (isset($consumer['idle_timeout'])) {
334 25
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
335
            }
336
            if (isset($consumer['idle_timeout_exit_code'])) {
337
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
338
            }
339
            if (isset($consumer['timeout_wait'])) {
340
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
341
            }
342
            if (isset($consumer['graceful_max_execution'])) {
343
                $definition->addMethodCall(
344 25
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
345
                    array($consumer['graceful_max_execution']['timeout'])
346
                );
347
                $definition->addMethodCall(
348 25
                    'setGracefulMaxExecutionTimeoutExitCode',
349 25
                    array($consumer['graceful_max_execution']['exit_code'])
350 25
                );
351
            }
352
            if (!$consumer['auto_setup_fabric']) {
353 25
                $definition->addMethodCall('disableAutoSetupFabric');
354
            }
355
356
            $this->injectConnection($definition, $consumer['connection']);
357 25
            if ($this->collectorEnabled) {
358 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
359 25
            }
360 25
361
            if ($consumer['enable_logger']) {
362 25
                $this->injectLogger($definition);
363 25
            }
364
365
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
366 34
            $this->container->setDefinition($name, $definition);
367
            if ($consumer['queues_provider']) {
368 34
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
369
            }
370 34
            foreach ($callbacks as $callback) {
371
                $this->addDequeuerAwareCall($callback, $name);
372 25
            }
373
        }
374
    }
375
376
    protected function loadDynamicConsumers()
377
    {
378
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
379 25
380
            if (empty($consumer['queue_options_provider'])) {
381 25
                throw new InvalidConfigurationException(
382 25
                    "Error on loading $key dynamic consumer. " .
383 25
                    "'queue_provider' parameter should be defined."
384 25
                );
385 25
            }
386 25
387
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
388 25
            $definition
389
                ->setPublic(true)
390
                ->addTag('old_sound_rabbit_mq.base_amqp')
391
                ->addTag('old_sound_rabbit_mq.consumer')
392
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
393
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
394
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
395
396 25
            if (array_key_exists('qos_options', $consumer)) {
397 25
                $definition->addMethodCall('setQosOptions', array(
398 25
                    $consumer['qos_options']['prefetch_size'],
399
                    $consumer['qos_options']['prefetch_count'],
400
                    $consumer['qos_options']['global']
401 25
                ));
402
            }
403
404 25
            $definition->addMethodCall(
405
                'setQueueOptionsProvider',
406
                array(new Reference($consumer['queue_options_provider']))
407 25
            );
408
409
            if (isset($consumer['idle_timeout'])) {
410 25
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
411
            }
412
            if (isset($consumer['idle_timeout_exit_code'])) {
413
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
414
            }
415
            if (isset($consumer['timeout_wait'])) {
416
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
417
            }
418
            if (isset($consumer['graceful_max_execution'])) {
419
                $definition->addMethodCall(
420 25
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
421
                    array($consumer['graceful_max_execution']['timeout'])
422
                );
423
                $definition->addMethodCall(
424 25
                    'setGracefulMaxExecutionTimeoutExitCode',
425 25
                    array($consumer['graceful_max_execution']['exit_code'])
426 25
                );
427
            }
428
            if (!$consumer['auto_setup_fabric']) {
429 25
                $definition->addMethodCall('disableAutoSetupFabric');
430
            }
431
432
            $this->injectConnection($definition, $consumer['connection']);
433 25
            if ($this->collectorEnabled) {
434 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
435 25
            }
436 25
437
            if ($consumer['enable_logger']) {
438 34
                $this->injectLogger($definition);
439
            }
440 34
441
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
442 34
            $this->container->setDefinition($name, $definition);
443
            $this->addDequeuerAwareCall($consumer['callback'], $name);
444
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
445
        }
446
    }
447
448
    protected function loadBatchConsumers()
449
    {
450
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
451
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
452
453
            if (!isset($consumer['exchange_options'])) {
454
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
455
            }
456
457
            $definition
458
                ->setPublic(true)
459
                ->addTag('old_sound_rabbit_mq.base_amqp')
460
                ->addTag('old_sound_rabbit_mq.batch_consumer')
461
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
462
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
463
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
464
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
465
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
466
                ->addMethodCall('setQosOptions', array(
467
                    $consumer['qos_options']['prefetch_size'],
468
                    $consumer['qos_options']['prefetch_count'],
469
                    $consumer['qos_options']['global']
470
                ))
471
            ;
472
473
            if (isset($consumer['idle_timeout_exit_code'])) {
474
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
475
            }
476
477
            if (isset($consumer['idle_timeout'])) {
478
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
479
            }
480
481
            if (isset($consumer['graceful_max_execution'])) {
482
                $definition->addMethodCall(
483
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
484
                    array($consumer['graceful_max_execution']['timeout'])
485
                );
486
            }
487
488
            if (!$consumer['auto_setup_fabric']) {
489
                $definition->addMethodCall('disableAutoSetupFabric');
490
            }
491
492
            if ($consumer['keep_alive']) {
493
                $definition->addMethodCall('keepAlive');
494
            }
495
496
            $this->injectConnection($definition, $consumer['connection']);
497
            if ($this->collectorEnabled) {
498
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
499 34
            }
500
501 34
            if ($consumer['enable_logger']) {
502
                $this->injectLogger($definition);
503 34
            }
504 25
505
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
506 25
        }
507 25
    }
508 25
509 25
    protected function loadAnonConsumers()
510 25
    {
511 25
        foreach ($this->config['anon_consumers'] as $key => $anon) {
512 25
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
513 25
            $definition
514
                ->setPublic(true)
515
                ->addTag('old_sound_rabbit_mq.base_amqp')
516 25
                ->addTag('old_sound_rabbit_mq.anon_consumer')
517 25
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
518 25
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
519
            $this->injectConnection($definition, $anon['connection']);
520 34
            if ($this->collectorEnabled) {
521
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
522
            }
523
524
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
525
            $this->container->setDefinition($name, $definition);
526
            $this->addDequeuerAwareCall($anon['callback'], $name);
527
        }
528
    }
529
530 30
    /**
531
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
532 30
     * parameter. So we revert the change for right configurations.
533 1
     *
534
     * @param array $config
535 1
     *
536
     * @return array
537
     */
538
    private function normalizeArgumentKeys(array $config)
539 1
    {
540 1
        if (isset($config['arguments'])) {
541 1
            $arguments = $config['arguments'];
542
            // support for old configuration
543
            if (is_string($arguments)) {
544 1
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

544
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
545
            }
546 1
547
            $newArguments = array();
548 30
            foreach ($arguments as $key => $value) {
549
                if (strstr($key, '_')) {
550
                    $key = str_replace('_', '-', $key);
551
                }
552
                $newArguments[$key] = $value;
553
            }
554
            $config['arguments'] = $newArguments;
555
        }
556
        return $config;
557
    }
558
559
    /**
560
     * Support for arguments provided as string. Support for old configuration files.
561
     *
562
     * @deprecated
563
     * @param string $arguments
564
     * @return array
565
     */
566
    private function argumentsStringAsArray($arguments)
567
    {
568
        $argumentsArray = array();
569
570
        $argumentPairs = explode(',', $arguments);
571
        foreach ($argumentPairs as $argument) {
572
            $argumentPair = explode(':', $argument);
573
            $type = 'S';
574
            if (isset($argumentPair[2])) {
575 34
                $type = $argumentPair[2];
576
            }
577 34
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
578 28
        }
579 28
580
        return $argumentsArray;
581 28
    }
582 28
583 28
    protected function loadRpcClients()
584 28
    {
585 28
        foreach ($this->config['rpc_clients'] as $key => $client) {
586
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
587 28
            $definition->setLazy($client['lazy']);
588 28
            $definition
589
                ->addTag('old_sound_rabbit_mq.rpc_client')
590 28
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
591 28
            $this->injectConnection($definition, $client['connection']);
592
            if ($this->collectorEnabled) {
593 28
                $this->injectLoggedChannel($definition, $key, $client['connection']);
594
            }
595 28
            if (array_key_exists('unserializer', $client)) {
596
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
597 34
            }
598
            if (array_key_exists('direct_reply_to', $client)) {
599 34
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
600
            }
601 34
            $definition->setPublic(true);
602 25
603
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
604 25
        }
605 25
    }
606 25
607 25
    protected function loadRpcServers()
608 25
    {
609 25
        foreach ($this->config['rpc_servers'] as $key => $server) {
610 25
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
611 25
            $definition
612
                ->setPublic(true)
613 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
614
                ->addTag('old_sound_rabbit_mq.rpc_server')
615
                ->addMethodCall('initServer', array($key))
616
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
617
            $this->injectConnection($definition, $server['connection']);
618
            if ($this->collectorEnabled) {
619
                $this->injectLoggedChannel($definition, $key, $server['connection']);
620 25
            }
621 25
            if (array_key_exists('qos_options', $server)) {
622
                $definition->addMethodCall('setQosOptions', array(
623 25
                    $server['qos_options']['prefetch_size'],
624 25
                    $server['qos_options']['prefetch_count'],
625
                    $server['qos_options']['global']
626 25
                ));
627 25
            }
628
            if (array_key_exists('exchange_options', $server)) {
629 25
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
630
            }
631 34
            if (array_key_exists('queue_options', $server)) {
632
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
633 31
            }
634
            if (array_key_exists('serializer', $server)) {
635 31
                $definition->addMethodCall('setSerializer', array($server['serializer']));
636 31
            }
637
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
638 31
        }
639 31
    }
640 31
641
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
642 31
    {
643
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
644 31
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
645 31
        $channel
646 31
            ->setPublic(false)
647
            ->addTag('old_sound_rabbit_mq.logged_channel');
648 33
        $this->injectConnection($channel, $connectionName);
649
650 33
        $this->container->setDefinition($id, $channel);
651 33
652
        $this->channelIds[] = $id;
653 34
        $definition->addArgument(new Reference($id));
654
    }
655 34
656
    protected function injectConnection(Definition $definition, $connectionName)
657
    {
658
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
659
    }
660
661
    public function getAlias()
662
    {
663
        return 'old_sound_rabbit_mq';
664 29
    }
665
666 29
    /**
667 29
     * Add proper dequeuer aware call
668
     *
669
     * @param string $callback
670
     * @param string $name
671
     */
672
    protected function addDequeuerAwareCall($callback, $name)
673
    {
674
        if (!$this->container->has($callback)) {
675
            return;
676
        }
677 1
678
        $callbackDefinition = $this->container->findDefinition($callback);
679 1
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
680 1
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
681
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
682 1
        }
683 1
    }
684
685
    private function injectLogger(Definition $definition)
686
    {
687
        $definition->addTag('monolog.logger', array(
688
            'channel' => 'phpamqplib'
689
        ));
690 1
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
691
    }
692
693 1
    /**
694
     * Get default AMQP exchange options
695
     *
696
     * @return array
697
     */
698
    protected function getDefaultExchangeOptions()
699
    {
700
        return array(
701
            'name' => '',
702
            'type' => 'direct',
703
            'passive' => true,
704
            'declare' => false
705 30
        );
706
    }
707
708 30
    /**
709
     * Get default AMQP queue options
710
     *
711
     * @return array
712
     */
713
    protected function getDefaultQueueOptions()
714
    {
715
        return array(
716
            'name' => '',
717
            'declare' => false
718
        );
719
    }
720
}
721