Passed
Pull Request — master (#661)
by
unknown
05:46
created

OldSoundRabbitMqExtension::loadMultipleConsumers()   F

Complexity

Conditions 16
Paths 4098

Size

Total Lines 81
Code Lines 53

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 34
CRAP Score 25.4786

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 16
eloc 53
c 2
b 0
f 0
nc 4098
nop 0
dl 0
loc 81
ccs 34
cts 51
cp 0.6667
crap 25.4786
rs 1.4

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
6
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
7
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
8
use Symfony\Component\Config\FileLocator;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\ContainerInterface;
11
use Symfony\Component\DependencyInjection\Definition;
12
use Symfony\Component\DependencyInjection\Extension\Extension;
13
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
14
use Symfony\Component\DependencyInjection\Reference;
15
16
/**
17
 * OldSoundRabbitMqExtension.
18
 *
19
 * @author Alvaro Videla
20
 * @author Marc Weistroff <[email protected]>
21
 */
22
class OldSoundRabbitMqExtension extends Extension
23
{
24
    /**
25
     * @var ContainerBuilder
26
     */
27
    private $container;
28
29
    /**
30
     * @var Boolean Whether the data collector is enabled
31
     */
32
    private $collectorEnabled;
33
34
    private $channelIds = array();
35
36
    private $config = array();
37
38 34
    public function load(array $configs, ContainerBuilder $container)
39
    {
40 34
        $this->container = $container;
41
42 34
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
43 34
        $loader->load('rabbitmq.xml');
44
45 34
        $configuration = $this->getConfiguration($configs, $container);
46 34
        $this->config = $this->processConfiguration($configuration, $configs);
47
48 34
        $this->collectorEnabled = $this->config['enable_collector'];
49
50 34
        $this->loadConnections();
51 34
        $this->loadBindings();
52 34
        $this->loadProducers();
53 34
        $this->loadConsumers();
54 34
        $this->loadMultipleConsumers();
55 34
        $this->loadDynamicConsumers();
56 34
        $this->loadBatchConsumers();
57 34
        $this->loadAnonConsumers();
58 34
        $this->loadRpcClients();
59 34
        $this->loadRpcServers();
60
61 34
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
62 31
            $channels = array();
63 31
            foreach (array_unique($this->channelIds) as $id) {
64 31
                $channels[] = new Reference($id);
65
            }
66
67 31
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
68 31
            $definition->replaceArgument(0, $channels);
69
        } else {
70 3
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
71
        }
72 34
    }
73
74 34
    public function getConfiguration(array $config, ContainerBuilder $container)
75
    {
76 34
        return new Configuration($this->getAlias());
77
    }
78
79 34
    protected function loadConnections()
80
    {
81 34
        foreach ($this->config['connections'] as $key => $connection) {
82 34
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
83
            $classParam =
84 34
                $connection['lazy']
85 25
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
86 34
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
87
88 34
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
89 34
                $classParam, $connection,
90
            ));
91 34
            if (isset($connection['connection_parameters_provider'])) {
92
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
93
                unset($connection['connection_parameters_provider']);
94
            }
95 34
            $definition->setPublic(false);
96 34
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
97 34
            $this->container->setDefinition($factoryName, $definition);
98
99 34
            $definition = new Definition($classParam);
100 34
            if (method_exists($definition, 'setFactory')) {
101
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
102 34
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
103
            } else {
104
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
105
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

105
                $definition->/** @scrutinizer ignore-call */ 
106
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
106
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107
            }
108 34
            $definition->addTag('old_sound_rabbit_mq.connection');
109 34
            $definition->setPublic(true);
110
111 34
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
112
        }
113 34
    }
114
115 34
    protected function loadBindings()
116
    {
117 34
        if ($this->config['sandbox']) {
118
            return;
119
        }
120 34
        foreach ($this->config['bindings'] as $binding) {
121 25
            ksort($binding);
122 25
            $definition = new Definition($binding['class']);
123 25
            $definition->addTag('old_sound_rabbit_mq.binding');
124 25
            $definition->addMethodCall('setArguments', array($binding['arguments']));
125 25
            $definition->addMethodCall('setDestination', array($binding['destination']));
126 25
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
127 25
            $definition->addMethodCall('setExchange', array($binding['exchange']));
128 25
            $definition->addMethodCall('isNowait', array($binding['nowait']));
129 25
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
130 25
            $this->injectConnection($definition, $binding['connection']);
131 25
            $key = md5(json_encode($binding));
132 25
            if ($this->collectorEnabled) {
133
                // in the context of a binding, I don't thing logged channels are needed?
134 25
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
135
            }
136 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
137
        }
138 34
    }
139
140 34
    protected function loadProducers()
141
    {
142 34
        if ($this->config['sandbox'] == false) {
143 34
            foreach ($this->config['producers'] as $key => $producer) {
144 30
                $definition = new Definition($producer['class']);
145 30
                $definition->setPublic(true);
146 30
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
147 30
                $definition->addTag('old_sound_rabbit_mq.producer');
148
                //this producer doesn't define an exchange -> using AMQP Default
149 30
                if (!isset($producer['exchange_options'])) {
150 1
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
151
                }
152 30
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
153
                //this producer doesn't define a queue -> using AMQP Default
154 30
                if (!isset($producer['queue_options'])) {
155 30
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
156
                }
157 30
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
158 30
                $this->injectConnection($definition, $producer['connection']);
159 30
                if ($this->collectorEnabled) {
160 28
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
161
                }
162 30
                if (!$producer['auto_setup_fabric']) {
163
                    $definition->addMethodCall('disableAutoSetupFabric');
164
                }
165
166 30
                if ($producer['enable_logger']) {
167
                    $this->injectLogger($definition);
168
                }
169
170 30
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
171
172 30
                $this->container->setDefinition($producerServiceName, $definition);
173 30
                if (null !== $producer['service_alias']) {
174 25
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
175
                }
176
177
                // register alias for argument auto wiring
178 30
                if (method_exists($this->container, 'registerAliasForArgument')) {
179 30
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
180 30
                    $this->container
181 30
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
182 30
                        ->setPublic(false);
183
184 30
                    $this->container
185 30
                        ->registerAliasForArgument($producerServiceName, $producer['class'], $argName)
186 30
                        ->setPublic(false);
187
                }
188
189
                $definition->addMethodCall('setDefaultRoutingKey', array($producer['default_routing_key']));
190
                if (isset($producer['validator'])) {
191
                    $definition->addMethodCall('setValidator', array($producer['validator']['class'],
192
                                                                     $producer['validator']['schema'],
193
                                                                     $producer['validator']['additionalProperties']));
194
                }
195
            }
196
        } else {
197
            foreach ($this->config['producers'] as $key => $producer) {
198
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
199
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
200
                $this->container->setDefinition($producerServiceName, $definition);
201
202
                // register alias for argumen auto wiring
203
                if (method_exists($this->container, 'registerAliasForArgument')) {
204 34
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
205
                    $this->container
206 34
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
207
                        ->setPublic(false);
208 34
                }
209 29
            }
210 29
        }
211 29
    }
212 29
213
    protected function loadConsumers()
214 29
    {
215
        foreach ($this->config['consumers'] as $key => $consumer) {
216
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
217 29
            $definition->setPublic(true);
218
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
219 29
            $definition->addTag('old_sound_rabbit_mq.consumer');
220
            //this consumer doesn't define an exchange -> using AMQP Default
221
            if (!isset($consumer['exchange_options'])) {
222 29
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
223 29
            }
224
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
225 29
            //this consumer doesn't define a queue -> using AMQP Default
226 25
            if (!isset($consumer['queue_options'])) {
227 25
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
228 25
            }
229 25
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
230
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
231
232
            if (array_key_exists('qos_options', $consumer)) {
233 29
                $definition->addMethodCall('setQosOptions', array(
234
                    $consumer['qos_options']['prefetch_size'],
235
                    $consumer['qos_options']['prefetch_count'],
236 29
                    $consumer['qos_options']['global']
237
                ));
238
            }
239 29
240 25
            if (isset($consumer['idle_timeout'])) {
241
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
242 29
            }
243
            if (isset($consumer['idle_timeout_exit_code'])) {
244
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
245
            }
246
            if (isset($consumer['timeout_wait'])) {
247
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
248
            }
249
            if (isset($consumer['graceful_max_execution'])) {
250
                $definition->addMethodCall(
251
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
252 29
                    array($consumer['graceful_max_execution']['timeout'])
253
                );
254
                $definition->addMethodCall(
255
                    'setGracefulMaxExecutionTimeoutExitCode',
256 29
                    array($consumer['graceful_max_execution']['exit_code'])
257 29
                );
258 28
            }
259
            if (!$consumer['auto_setup_fabric']) {
260
                $definition->addMethodCall('disableAutoSetupFabric');
261 29
            }
262 1
263
            $this->injectConnection($definition, $consumer['connection']);
264
            if ($this->collectorEnabled) {
265 29
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
266 29
            }
267 29
268
            if ($consumer['enable_logger']) {
269
                $this->injectLogger($definition);
270 29
            }
271 29
272 29
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
273 29
            $this->container->setDefinition($name, $definition);
274 29
            $this->addDequeuerAwareCall($consumer['callback'], $name);
275
276 29
            // register alias for argument auto wiring
277 29
            if (method_exists($this->container, 'registerAliasForArgument')) {
278 29
                $argName = !str_ends_with(strtolower($key), 'consumer') ? sprintf('%sConsumer', $key) : $key;
279
                $this->container
280
                    ->registerAliasForArgument($name, ConsumerInterface::class, $argName)
281 34
                    ->setPublic(false);
282
283 34
                $this->container
284
                    ->registerAliasForArgument($name, '%old_sound_rabbit_mq.consumer.class%', $argName)
285 34
                    ->setPublic(false);
286 25
                }
287 25
        }
288
    }
289 25
290
    protected function loadMultipleConsumers()
291
    {
292
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
293
            $queues = array();
294
            $callbacks = array();
295
296 25
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
297 25
                throw new InvalidConfigurationException(
298 25
                    "Error on loading $key multiple consumer. " .
299 25
                    "Either 'queues' or 'queues_provider' parameters should be defined."
300
                );
301
            }
302 25
303
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
304 25
                $queues[$queueOptions['name']] = $queueOptions;
305 25
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
306 25
                $callbacks[] = $queueOptions['callback'];
307 25
            }
308 25
309
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
310 25
            $definition
311 25
                ->setPublic(true)
312 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
313 25
                ->addTag('old_sound_rabbit_mq.multi_consumer')
314
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
315
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
316
317 25
            if ($consumer['queues_provider']) {
318
                $definition->addMethodCall(
319
                    'setQueuesProvider',
320
                    array(new Reference($consumer['queues_provider']))
321
                );
322
            }
323
324
            if (array_key_exists('qos_options', $consumer)) {
325 25
                $definition->addMethodCall('setQosOptions', array(
326
                    $consumer['qos_options']['prefetch_size'],
327
                    $consumer['qos_options']['prefetch_count'],
328 25
                    $consumer['qos_options']['global']
329
                ));
330
            }
331 25
332 25
            if (isset($consumer['idle_timeout'])) {
333
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
334 25
            }
335
            if (isset($consumer['idle_timeout_exit_code'])) {
336
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
337
            }
338
            if (isset($consumer['timeout_wait'])) {
339
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
340
            }
341
            if (isset($consumer['graceful_max_execution'])) {
342
                $definition->addMethodCall(
343
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
344 25
                    array($consumer['graceful_max_execution']['timeout'])
345
                );
346
                $definition->addMethodCall(
347
                    'setGracefulMaxExecutionTimeoutExitCode',
348 25
                    array($consumer['graceful_max_execution']['exit_code'])
349 25
                );
350 25
            }
351
            if (!$consumer['auto_setup_fabric']) {
352
                $definition->addMethodCall('disableAutoSetupFabric');
353 25
            }
354
355
            $this->injectConnection($definition, $consumer['connection']);
356
            if ($this->collectorEnabled) {
357 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
358 25
            }
359 25
360 25
            if ($consumer['enable_logger']) {
361
                $this->injectLogger($definition);
362 25
            }
363 25
364
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
365
            $this->container->setDefinition($name, $definition);
366 34
            if ($consumer['queues_provider']) {
367
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
368 34
            }
369
            foreach ($callbacks as $callback) {
370 34
                $this->addDequeuerAwareCall($callback, $name);
371
            }
372 25
        }
373
    }
374
375
    protected function loadDynamicConsumers()
376
    {
377
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
378
379 25
            if (empty($consumer['queue_options_provider'])) {
380
                throw new InvalidConfigurationException(
381 25
                    "Error on loading $key dynamic consumer. " .
382 25
                    "'queue_provider' parameter should be defined."
383 25
                );
384 25
            }
385 25
386 25
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
387
            $definition
388 25
                ->setPublic(true)
389
                ->addTag('old_sound_rabbit_mq.base_amqp')
390
                ->addTag('old_sound_rabbit_mq.consumer')
391
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
392
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
393
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
394
395
            if (array_key_exists('qos_options', $consumer)) {
396 25
                $definition->addMethodCall('setQosOptions', array(
397 25
                    $consumer['qos_options']['prefetch_size'],
398 25
                    $consumer['qos_options']['prefetch_count'],
399
                    $consumer['qos_options']['global']
400
                ));
401 25
            }
402
403
            $definition->addMethodCall(
404 25
                'setQueueOptionsProvider',
405
                array(new Reference($consumer['queue_options_provider']))
406
            );
407 25
408
            if (isset($consumer['idle_timeout'])) {
409
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
410 25
            }
411
            if (isset($consumer['idle_timeout_exit_code'])) {
412
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
413
            }
414
            if (isset($consumer['timeout_wait'])) {
415
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
416
            }
417
            if (isset($consumer['graceful_max_execution'])) {
418
                $definition->addMethodCall(
419
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
420 25
                    array($consumer['graceful_max_execution']['timeout'])
421
                );
422
                $definition->addMethodCall(
423
                    'setGracefulMaxExecutionTimeoutExitCode',
424 25
                    array($consumer['graceful_max_execution']['exit_code'])
425 25
                );
426 25
            }
427
            if (!$consumer['auto_setup_fabric']) {
428
                $definition->addMethodCall('disableAutoSetupFabric');
429 25
            }
430
431
            $this->injectConnection($definition, $consumer['connection']);
432
            if ($this->collectorEnabled) {
433 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
434 25
            }
435 25
436 25
            if ($consumer['enable_logger']) {
437
                $this->injectLogger($definition);
438 34
            }
439
440 34
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
441
            $this->container->setDefinition($name, $definition);
442 34
            $this->addDequeuerAwareCall($consumer['callback'], $name);
443
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
444
        }
445
    }
446
447
    protected function loadBatchConsumers()
448
    {
449
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
450
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
451
452
            if (!isset($consumer['exchange_options'])) {
453
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
454
            }
455
456
            $definition
457
                ->setPublic(true)
458
                ->addTag('old_sound_rabbit_mq.base_amqp')
459
                ->addTag('old_sound_rabbit_mq.batch_consumer')
460
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
461
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
462
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
463
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
464
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
465
                ->addMethodCall('setQosOptions', array(
466
                    $consumer['qos_options']['prefetch_size'],
467
                    $consumer['qos_options']['prefetch_count'],
468
                    $consumer['qos_options']['global']
469
                ))
470
            ;
471
472
            if (isset($consumer['idle_timeout_exit_code'])) {
473
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
474
            }
475
476
            if (isset($consumer['idle_timeout'])) {
477
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
478
            }
479
480
            if (isset($consumer['graceful_max_execution'])) {
481
                $definition->addMethodCall(
482
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
483
                    array($consumer['graceful_max_execution']['timeout'])
484
                );
485
            }
486
487
            if (!$consumer['auto_setup_fabric']) {
488
                $definition->addMethodCall('disableAutoSetupFabric');
489
            }
490
491
            if ($consumer['keep_alive']) {
492
                $definition->addMethodCall('keepAlive');
493
            }
494
495
            $this->injectConnection($definition, $consumer['connection']);
496
            if ($this->collectorEnabled) {
497
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
498
            }
499 34
500
            if ($consumer['enable_logger']) {
501 34
                $this->injectLogger($definition);
502
            }
503 34
504 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
505
        }
506 25
    }
507 25
508 25
    protected function loadAnonConsumers()
509 25
    {
510 25
        foreach ($this->config['anon_consumers'] as $key => $anon) {
511 25
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
512 25
            $definition
513 25
                ->setPublic(true)
514
                ->addTag('old_sound_rabbit_mq.base_amqp')
515
                ->addTag('old_sound_rabbit_mq.anon_consumer')
516 25
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
517 25
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
518 25
            $this->injectConnection($definition, $anon['connection']);
519
            if ($this->collectorEnabled) {
520 34
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
521
            }
522
523
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
524
            $this->container->setDefinition($name, $definition);
525
            $this->addDequeuerAwareCall($anon['callback'], $name);
526
        }
527
    }
528
529
    /**
530 30
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
531
     * parameter. So we revert the change for right configurations.
532 30
     *
533 1
     * @param array $config
534
     *
535 1
     * @return array
536
     */
537
    private function normalizeArgumentKeys(array $config)
538
    {
539 1
        if (isset($config['arguments'])) {
540 1
            $arguments = $config['arguments'];
541 1
            // support for old configuration
542
            if (is_string($arguments)) {
543
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

543
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
544 1
            }
545
546 1
            $newArguments = array();
547
            foreach ($arguments as $key => $value) {
548 30
                if (strstr($key, '_')) {
549
                    $key = str_replace('_', '-', $key);
550
                }
551
                $newArguments[$key] = $value;
552
            }
553
            $config['arguments'] = $newArguments;
554
        }
555
        return $config;
556
    }
557
558
    /**
559
     * Support for arguments provided as string. Support for old configuration files.
560
     *
561
     * @deprecated
562
     * @param string $arguments
563
     * @return array
564
     */
565
    private function argumentsStringAsArray($arguments)
566
    {
567
        $argumentsArray = array();
568
569
        $argumentPairs = explode(',', $arguments);
570
        foreach ($argumentPairs as $argument) {
571
            $argumentPair = explode(':', $argument);
572
            $type = 'S';
573
            if (isset($argumentPair[2])) {
574
                $type = $argumentPair[2];
575 34
            }
576
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
577 34
        }
578 28
579 28
        return $argumentsArray;
580
    }
581 28
582 28
    protected function loadRpcClients()
583 28
    {
584 28
        foreach ($this->config['rpc_clients'] as $key => $client) {
585 28
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
586
            $definition->setLazy($client['lazy']);
587 28
            $definition
588 28
                ->addTag('old_sound_rabbit_mq.rpc_client')
589
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
590 28
            $this->injectConnection($definition, $client['connection']);
591 28
            if ($this->collectorEnabled) {
592
                $this->injectLoggedChannel($definition, $key, $client['connection']);
593 28
            }
594
            if (array_key_exists('unserializer', $client)) {
595 28
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
596
            }
597 34
            if (array_key_exists('direct_reply_to', $client)) {
598
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
599 34
            }
600
            $definition->setPublic(true);
601 34
602 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
603
        }
604 25
    }
605 25
606 25
    protected function loadRpcServers()
607 25
    {
608 25
        foreach ($this->config['rpc_servers'] as $key => $server) {
609 25
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
610 25
            $definition
611 25
                ->setPublic(true)
612
                ->addTag('old_sound_rabbit_mq.base_amqp')
613 25
                ->addTag('old_sound_rabbit_mq.rpc_server')
614
                ->addMethodCall('initServer', array($key))
615
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
616
            $this->injectConnection($definition, $server['connection']);
617
            if ($this->collectorEnabled) {
618
                $this->injectLoggedChannel($definition, $key, $server['connection']);
619
            }
620 25
            if (array_key_exists('qos_options', $server)) {
621 25
                $definition->addMethodCall('setQosOptions', array(
622
                    $server['qos_options']['prefetch_size'],
623 25
                    $server['qos_options']['prefetch_count'],
624 25
                    $server['qos_options']['global']
625
                ));
626 25
            }
627 25
            if (array_key_exists('exchange_options', $server)) {
628
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
629 25
            }
630
            if (array_key_exists('queue_options', $server)) {
631 34
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
632
            }
633 31
            if (array_key_exists('serializer', $server)) {
634
                $definition->addMethodCall('setSerializer', array($server['serializer']));
635 31
            }
636 31
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
637
        }
638 31
    }
639 31
640 31
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
641
    {
642 31
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
643
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
644 31
        $channel
645 31
            ->setPublic(false)
646 31
            ->addTag('old_sound_rabbit_mq.logged_channel');
647
        $this->injectConnection($channel, $connectionName);
648 33
649
        $this->container->setDefinition($id, $channel);
650 33
651 33
        $this->channelIds[] = $id;
652
        $definition->addArgument(new Reference($id));
653 34
    }
654
655 34
    protected function injectConnection(Definition $definition, $connectionName)
656
    {
657
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
658
    }
659
660
    public function getAlias()
661
    {
662
        return 'old_sound_rabbit_mq';
663
    }
664 29
665
    /**
666 29
     * Add proper dequeuer aware call
667 29
     *
668
     * @param string $callback
669
     * @param string $name
670
     */
671
    protected function addDequeuerAwareCall($callback, $name)
672
    {
673
        if (!$this->container->has($callback)) {
674
            return;
675
        }
676
677 1
        $callbackDefinition = $this->container->findDefinition($callback);
678
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
679 1
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
680 1
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
681
        }
682 1
    }
683 1
684
    private function injectLogger(Definition $definition)
685
    {
686
        $definition->addTag('monolog.logger', array(
687
            'channel' => 'phpamqplib'
688
        ));
689
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
690 1
    }
691
692
    /**
693 1
     * Get default AMQP exchange options
694
     *
695
     * @return array
696
     */
697
    protected function getDefaultExchangeOptions()
698
    {
699
        return array(
700
            'name' => '',
701
            'type' => 'direct',
702
            'passive' => true,
703
            'declare' => false
704
        );
705 30
    }
706
707
    /**
708 30
     * Get default AMQP queue options
709
     *
710
     * @return array
711
     */
712
    protected function getDefaultQueueOptions()
713
    {
714
        return array(
715
            'name' => '',
716
            'declare' => false
717
        );
718
    }
719
}
720