Passed
Push — master ( 2f0dcd...638ea4 )
by Mihai
02:42 queued 12s
created

OldSoundRabbitMqExtension::loadMultipleConsumers()   F

Complexity

Conditions 16
Paths 4098

Size

Total Lines 81
Code Lines 53

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 36
CRAP Score 24.4516

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 16
eloc 53
c 2
b 0
f 0
nc 4098
nop 0
dl 0
loc 81
ccs 36
cts 53
cp 0.6792
crap 24.4516
rs 1.4

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
6
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
7
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
8
use Symfony\Component\Config\FileLocator;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\ContainerInterface;
11
use Symfony\Component\DependencyInjection\Definition;
12
use Symfony\Component\DependencyInjection\Extension\Extension;
13
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
14
use Symfony\Component\DependencyInjection\Reference;
15
16
/**
17
 * OldSoundRabbitMqExtension.
18
 *
19
 * @author Alvaro Videla
20
 * @author Marc Weistroff <[email protected]>
21
 */
22
class OldSoundRabbitMqExtension extends Extension
23
{
24
    /**
25
     * @var ContainerBuilder
26
     */
27
    private $container;
28
29
    /**
30
     * @var Boolean Whether the data collector is enabled
31
     */
32
    private $collectorEnabled;
33
34
    private $channelIds = array();
35
36
    private $config = array();
37
38 34
    public function load(array $configs, ContainerBuilder $container)
39
    {
40 34
        $this->container = $container;
41
42 34
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
43 34
        $loader->load('rabbitmq.xml');
44
45 34
        $configuration = $this->getConfiguration($configs, $container);
46 34
        $this->config = $this->processConfiguration($configuration, $configs);
47
48 34
        $this->collectorEnabled = $this->config['enable_collector'];
49
50 34
        $this->loadConnections();
51 34
        $this->loadBindings();
52 34
        $this->loadProducers();
53 34
        $this->loadConsumers();
54 34
        $this->loadMultipleConsumers();
55 34
        $this->loadDynamicConsumers();
56 34
        $this->loadBatchConsumers();
57 34
        $this->loadAnonConsumers();
58 34
        $this->loadRpcClients();
59 34
        $this->loadRpcServers();
60
61 34
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
62 31
            $channels = array();
63 31
            foreach (array_unique($this->channelIds) as $id) {
64 31
                $channels[] = new Reference($id);
65
            }
66
67 31
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
68 31
            $definition->replaceArgument(0, $channels);
69
        } else {
70 3
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
71
        }
72 34
    }
73
74 34
    public function getConfiguration(array $config, ContainerBuilder $container)
75
    {
76 34
        return new Configuration($this->getAlias());
77
    }
78
79 34
    protected function loadConnections()
80
    {
81 34
        foreach ($this->config['connections'] as $key => $connection) {
82 34
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
83 34
            $classParam =
84 34
                $connection['lazy']
85 25
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
86 34
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
87
88 34
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
89 34
                $classParam, $connection,
90
            ));
91 34
            if (isset($connection['connection_parameters_provider'])) {
92
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
93
                unset($connection['connection_parameters_provider']);
94
            }
95 34
            $definition->setPublic(false);
96 34
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
97 34
            $this->container->setDefinition($factoryName, $definition);
98
99 34
            $definition = new Definition($classParam);
100 34
            if (method_exists($definition, 'setFactory')) {
101
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
102 34
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
103
            } else {
104
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
105
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

105
                $definition->/** @scrutinizer ignore-call */ 
106
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
106
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107
            }
108 34
            $definition->addTag('old_sound_rabbit_mq.connection');
109 34
            $definition->setPublic(true);
110
111 34
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
112
        }
113 34
    }
114
115 34
    protected function loadBindings()
116
    {
117 34
        if ($this->config['sandbox']) {
118
            return;
119
        }
120 34
        foreach ($this->config['bindings'] as $binding) {
121 25
            ksort($binding);
122 25
            $definition = new Definition($binding['class']);
123 25
            $definition->addTag('old_sound_rabbit_mq.binding');
124 25
            $definition->addMethodCall('setArguments', array($binding['arguments']));
125 25
            $definition->addMethodCall('setDestination', array($binding['destination']));
126 25
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
127 25
            $definition->addMethodCall('setExchange', array($binding['exchange']));
128 25
            $definition->addMethodCall('isNowait', array($binding['nowait']));
129 25
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
130 25
            $this->injectConnection($definition, $binding['connection']);
131 25
            $key = md5(json_encode($binding));
132 25
            if ($this->collectorEnabled) {
133
                // in the context of a binding, I don't thing logged channels are needed?
134 25
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
135
            }
136 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
137
        }
138 34
    }
139
140 34
    protected function loadProducers()
141
    {
142 34
        if ($this->config['sandbox'] == false) {
143 34
            foreach ($this->config['producers'] as $key => $producer) {
144 30
                $definition = new Definition($producer['class']);
145 30
                $definition->setPublic(true);
146 30
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
147 30
                $definition->addTag('old_sound_rabbit_mq.producer');
148
                //this producer doesn't define an exchange -> using AMQP Default
149 30
                if (!isset($producer['exchange_options'])) {
150 1
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
151
                }
152 30
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
153
                //this producer doesn't define a queue -> using AMQP Default
154 30
                if (!isset($producer['queue_options'])) {
155 30
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
156
                }
157 30
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
158 30
                $this->injectConnection($definition, $producer['connection']);
159 30
                if ($this->collectorEnabled) {
160 28
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
161
                }
162 30
                if (!$producer['auto_setup_fabric']) {
163
                    $definition->addMethodCall('disableAutoSetupFabric');
164
                }
165
166 30
                if ($producer['enable_logger']) {
167
                    $this->injectLogger($definition);
168
                }
169
170 30
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
171
172 30
                $this->container->setDefinition($producerServiceName, $definition);
173 30
                if (null !== $producer['service_alias']) {
174 25
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
175
                }
176
177
                // register alias for argument auto wiring
178 30
                if (method_exists($this->container, 'registerAliasForArgument')) {
179 30
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
180 30
                    $this->container
181 30
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
182 30
                        ->setPublic(false);
183
184 30
                    $this->container
185 30
                        ->registerAliasForArgument($producerServiceName, $producer['class'], $argName)
186 30
                        ->setPublic(false);
187
                }
188
189 30
                $definition->addMethodCall('setDefaultRoutingKey', array($producer['default_routing_key']));
190 30
                $definition->addMethodCall('setContentType', array($producer['default_content_type']));
191 30
                $definition->addMethodCall('setDeliveryMode', array($producer['default_delivery_mode']));
192
            }
193
        } else {
194
            foreach ($this->config['producers'] as $key => $producer) {
195
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
196
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
197
                $this->container->setDefinition($producerServiceName, $definition);
198
199
                // register alias for argumen auto wiring
200
                if (method_exists($this->container, 'registerAliasForArgument')) {
201
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
202
                    $this->container
203
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
204
                        ->setPublic(false);
205
                }
206
            }
207
        }
208 34
    }
209
210 34
    protected function loadConsumers()
211
    {
212 34
        foreach ($this->config['consumers'] as $key => $consumer) {
213 29
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
214 29
            $definition->setPublic(true);
215 29
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
216 29
            $definition->addTag('old_sound_rabbit_mq.consumer');
217
            //this consumer doesn't define an exchange -> using AMQP Default
218 29
            if (!isset($consumer['exchange_options'])) {
219
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
220
            }
221 29
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
222
            //this consumer doesn't define a queue -> using AMQP Default
223 29
            if (!isset($consumer['queue_options'])) {
224
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
225
            }
226 29
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
227 29
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
228
229 29
            if (array_key_exists('qos_options', $consumer)) {
230 25
                $definition->addMethodCall('setQosOptions', array(
231 25
                    $consumer['qos_options']['prefetch_size'],
232 25
                    $consumer['qos_options']['prefetch_count'],
233 25
                    $consumer['qos_options']['global']
234
                ));
235
            }
236
237 29
            if (isset($consumer['idle_timeout'])) {
238
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
239
            }
240 29
            if (isset($consumer['idle_timeout_exit_code'])) {
241
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
242
            }
243 29
            if (isset($consumer['timeout_wait'])) {
244 25
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
245
            }
246 29
            if (isset($consumer['graceful_max_execution'])) {
247
                $definition->addMethodCall(
248
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
249
                    array($consumer['graceful_max_execution']['timeout'])
250
                );
251
                $definition->addMethodCall(
252
                    'setGracefulMaxExecutionTimeoutExitCode',
253
                    array($consumer['graceful_max_execution']['exit_code'])
254
                );
255
            }
256 29
            if (!$consumer['auto_setup_fabric']) {
257
                $definition->addMethodCall('disableAutoSetupFabric');
258
            }
259
260 29
            $this->injectConnection($definition, $consumer['connection']);
261 29
            if ($this->collectorEnabled) {
262 28
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
263
            }
264
265 29
            if ($consumer['enable_logger']) {
266 1
                $this->injectLogger($definition);
267
            }
268
269 29
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
270 29
            $this->container->setDefinition($name, $definition);
271 29
            $this->addDequeuerAwareCall($consumer['callback'], $name);
272
273
            // register alias for argument auto wiring
274 29
            if (method_exists($this->container, 'registerAliasForArgument')) {
275 29
                $argName = !str_ends_with(strtolower($key), 'consumer') ? sprintf('%sConsumer', $key) : $key;
276 29
                $this->container
277 29
                    ->registerAliasForArgument($name, ConsumerInterface::class, $argName)
278 29
                    ->setPublic(false);
279
280 29
                $this->container
281 29
                    ->registerAliasForArgument($name, '%old_sound_rabbit_mq.consumer.class%', $argName)
282 29
                    ->setPublic(false);
283
                }
284
        }
285 34
    }
286
287 34
    protected function loadMultipleConsumers()
288
    {
289 34
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
290 25
            $queues = array();
291 25
            $callbacks = array();
292
293 25
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
294
                throw new InvalidConfigurationException(
295
                    "Error on loading $key multiple consumer. " .
296
                    "Either 'queues' or 'queues_provider' parameters should be defined."
297
                );
298
            }
299
300 25
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
301 25
                $queues[$queueOptions['name']] = $queueOptions;
302 25
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
303 25
                $callbacks[] = $queueOptions['callback'];
304
            }
305
306 25
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
307
            $definition
308 25
                ->setPublic(true)
309 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
310 25
                ->addTag('old_sound_rabbit_mq.multi_consumer')
311 25
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
312 25
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
313
314 25
            if ($consumer['queues_provider']) {
315 25
                $definition->addMethodCall(
316 25
                    'setQueuesProvider',
317 25
                    array(new Reference($consumer['queues_provider']))
318
                );
319
            }
320
321 25
            if (array_key_exists('qos_options', $consumer)) {
322
                $definition->addMethodCall('setQosOptions', array(
323
                    $consumer['qos_options']['prefetch_size'],
324
                    $consumer['qos_options']['prefetch_count'],
325
                    $consumer['qos_options']['global']
326
                ));
327
            }
328
329 25
            if (isset($consumer['idle_timeout'])) {
330
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
331
            }
332 25
            if (isset($consumer['idle_timeout_exit_code'])) {
333
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
334
            }
335 25
            if (isset($consumer['timeout_wait'])) {
336 25
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
337
            }
338 25
            if (isset($consumer['graceful_max_execution'])) {
339
                $definition->addMethodCall(
340
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
341
                    array($consumer['graceful_max_execution']['timeout'])
342
                );
343
                $definition->addMethodCall(
344
                    'setGracefulMaxExecutionTimeoutExitCode',
345
                    array($consumer['graceful_max_execution']['exit_code'])
346
                );
347
            }
348 25
            if (!$consumer['auto_setup_fabric']) {
349
                $definition->addMethodCall('disableAutoSetupFabric');
350
            }
351
352 25
            $this->injectConnection($definition, $consumer['connection']);
353 25
            if ($this->collectorEnabled) {
354 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
355
            }
356
357 25
            if ($consumer['enable_logger']) {
358
                $this->injectLogger($definition);
359
            }
360
361 25
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
362 25
            $this->container->setDefinition($name, $definition);
363 25
            if ($consumer['queues_provider']) {
364 25
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
365
            }
366 25
            foreach ($callbacks as $callback) {
367 25
                $this->addDequeuerAwareCall($callback, $name);
368
            }
369
        }
370 34
    }
371
372 34
    protected function loadDynamicConsumers()
373
    {
374 34
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
375
376 25
            if (empty($consumer['queue_options_provider'])) {
377
                throw new InvalidConfigurationException(
378
                    "Error on loading $key dynamic consumer. " .
379
                    "'queue_provider' parameter should be defined."
380
                );
381
            }
382
383 25
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
384
            $definition
385 25
                ->setPublic(true)
386 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
387 25
                ->addTag('old_sound_rabbit_mq.consumer')
388 25
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
389 25
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
390 25
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
391
392 25
            if (array_key_exists('qos_options', $consumer)) {
393
                $definition->addMethodCall('setQosOptions', array(
394
                    $consumer['qos_options']['prefetch_size'],
395
                    $consumer['qos_options']['prefetch_count'],
396
                    $consumer['qos_options']['global']
397
                ));
398
            }
399
400 25
            $definition->addMethodCall(
401 25
                'setQueueOptionsProvider',
402 25
                array(new Reference($consumer['queue_options_provider']))
403
            );
404
405 25
            if (isset($consumer['idle_timeout'])) {
406
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
407
            }
408 25
            if (isset($consumer['idle_timeout_exit_code'])) {
409
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
410
            }
411 25
            if (isset($consumer['timeout_wait'])) {
412
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
413
            }
414 25
            if (isset($consumer['graceful_max_execution'])) {
415
                $definition->addMethodCall(
416
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
417
                    array($consumer['graceful_max_execution']['timeout'])
418
                );
419
                $definition->addMethodCall(
420
                    'setGracefulMaxExecutionTimeoutExitCode',
421
                    array($consumer['graceful_max_execution']['exit_code'])
422
                );
423
            }
424 25
            if (!$consumer['auto_setup_fabric']) {
425
                $definition->addMethodCall('disableAutoSetupFabric');
426
            }
427
428 25
            $this->injectConnection($definition, $consumer['connection']);
429 25
            if ($this->collectorEnabled) {
430 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
431
            }
432
433 25
            if ($consumer['enable_logger']) {
434
                $this->injectLogger($definition);
435
            }
436
437 25
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
438 25
            $this->container->setDefinition($name, $definition);
439 25
            $this->addDequeuerAwareCall($consumer['callback'], $name);
440 25
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
441
        }
442 34
    }
443
444 34
    protected function loadBatchConsumers()
445
    {
446 34
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
447
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
448
449
            if (!isset($consumer['exchange_options'])) {
450
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
451
            }
452
453
            $definition
454
                ->setPublic(true)
455
                ->addTag('old_sound_rabbit_mq.base_amqp')
456
                ->addTag('old_sound_rabbit_mq.batch_consumer')
457
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
458
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
459
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
460
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
461
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
462
                ->addMethodCall('setQosOptions', array(
463
                    $consumer['qos_options']['prefetch_size'],
464
                    $consumer['qos_options']['prefetch_count'],
465
                    $consumer['qos_options']['global']
466
                ))
467
            ;
468
469
            if (isset($consumer['idle_timeout_exit_code'])) {
470
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
471
            }
472
473
            if (isset($consumer['idle_timeout'])) {
474
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
475
            }
476
477
            if (isset($consumer['graceful_max_execution'])) {
478
                $definition->addMethodCall(
479
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
480
                    array($consumer['graceful_max_execution']['timeout'])
481
                );
482
            }
483
484
            if (!$consumer['auto_setup_fabric']) {
485
                $definition->addMethodCall('disableAutoSetupFabric');
486
            }
487
488
            if ($consumer['keep_alive']) {
489
                $definition->addMethodCall('keepAlive');
490
            }
491
492
            $this->injectConnection($definition, $consumer['connection']);
493
            if ($this->collectorEnabled) {
494
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
495
            }
496
497
            if ($consumer['enable_logger']) {
498
                $this->injectLogger($definition);
499
            }
500
501
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
502
        }
503 34
    }
504
505 34
    protected function loadAnonConsumers()
506
    {
507 34
        foreach ($this->config['anon_consumers'] as $key => $anon) {
508 25
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
509
            $definition
510 25
                ->setPublic(true)
511 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
512 25
                ->addTag('old_sound_rabbit_mq.anon_consumer')
513 25
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
514 25
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
515 25
            $this->injectConnection($definition, $anon['connection']);
516 25
            if ($this->collectorEnabled) {
517 25
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
518
            }
519
520 25
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
521 25
            $this->container->setDefinition($name, $definition);
522 25
            $this->addDequeuerAwareCall($anon['callback'], $name);
523
        }
524 34
    }
525
526
    /**
527
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
528
     * parameter. So we revert the change for right configurations.
529
     *
530
     * @param array $config
531
     *
532
     * @return array
533
     */
534 30
    private function normalizeArgumentKeys(array $config)
535
    {
536 30
        if (isset($config['arguments'])) {
537 1
            $arguments = $config['arguments'];
538
            // support for old configuration
539 1
            if (is_string($arguments)) {
540
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

540
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
541
            }
542
543 1
            $newArguments = array();
544 1
            foreach ($arguments as $key => $value) {
545 1
                if (strstr($key, '_')) {
546
                    $key = str_replace('_', '-', $key);
547
                }
548 1
                $newArguments[$key] = $value;
549
            }
550 1
            $config['arguments'] = $newArguments;
551
        }
552 30
        return $config;
553
    }
554
555
    /**
556
     * Support for arguments provided as string. Support for old configuration files.
557
     *
558
     * @deprecated
559
     * @param string $arguments
560
     * @return array
561
     */
562
    private function argumentsStringAsArray($arguments)
563
    {
564
        $argumentsArray = array();
565
566
        $argumentPairs = explode(',', $arguments);
567
        foreach ($argumentPairs as $argument) {
568
            $argumentPair = explode(':', $argument);
569
            $type = 'S';
570
            if (isset($argumentPair[2])) {
571
                $type = $argumentPair[2];
572
            }
573
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
574
        }
575
576
        return $argumentsArray;
577
    }
578
579 34
    protected function loadRpcClients()
580
    {
581 34
        foreach ($this->config['rpc_clients'] as $key => $client) {
582 28
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
583 28
            $definition->setLazy($client['lazy']);
584
            $definition
585 28
                ->addTag('old_sound_rabbit_mq.rpc_client')
586 28
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
587 28
            $this->injectConnection($definition, $client['connection']);
588 28
            if ($this->collectorEnabled) {
589 28
                $this->injectLoggedChannel($definition, $key, $client['connection']);
590
            }
591 28
            if (array_key_exists('unserializer', $client)) {
592 28
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
593
            }
594 28
            if (array_key_exists('direct_reply_to', $client)) {
595 28
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
596
            }
597 28
            $definition->setPublic(true);
598
599 28
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
600
        }
601 34
    }
602
603 34
    protected function loadRpcServers()
604
    {
605 34
        foreach ($this->config['rpc_servers'] as $key => $server) {
606 25
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
607
            $definition
608 25
                ->setPublic(true)
609 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
610 25
                ->addTag('old_sound_rabbit_mq.rpc_server')
611 25
                ->addMethodCall('initServer', array($key))
612 25
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
613 25
            $this->injectConnection($definition, $server['connection']);
614 25
            if ($this->collectorEnabled) {
615 25
                $this->injectLoggedChannel($definition, $key, $server['connection']);
616
            }
617 25
            if (array_key_exists('qos_options', $server)) {
618
                $definition->addMethodCall('setQosOptions', array(
619
                    $server['qos_options']['prefetch_size'],
620
                    $server['qos_options']['prefetch_count'],
621
                    $server['qos_options']['global']
622
                ));
623
            }
624 25
            if (array_key_exists('exchange_options', $server)) {
625 25
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
626
            }
627 25
            if (array_key_exists('queue_options', $server)) {
628 25
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
629
            }
630 25
            if (array_key_exists('serializer', $server)) {
631 25
                $definition->addMethodCall('setSerializer', array($server['serializer']));
632
            }
633 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
634
        }
635 34
    }
636
637 31
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
638
    {
639 31
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
640 31
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
641
        $channel
642 31
            ->setPublic(false)
643 31
            ->addTag('old_sound_rabbit_mq.logged_channel');
644 31
        $this->injectConnection($channel, $connectionName);
645
646 31
        $this->container->setDefinition($id, $channel);
647
648 31
        $this->channelIds[] = $id;
649 31
        $definition->addArgument(new Reference($id));
650 31
    }
651
652 33
    protected function injectConnection(Definition $definition, $connectionName)
653
    {
654 33
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
655 33
    }
656
657 34
    public function getAlias() : string
658
    {
659 34
        return 'old_sound_rabbit_mq';
660
    }
661
662
    /**
663
     * Add proper dequeuer aware call
664
     *
665
     * @param string $callback
666
     * @param string $name
667
     */
668 29
    protected function addDequeuerAwareCall($callback, $name)
669
    {
670 29
        if (!$this->container->has($callback)) {
671 29
            return;
672
        }
673
674
        $callbackDefinition = $this->container->findDefinition($callback);
675
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
676
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
677
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
678
        }
679
    }
680
681 1
    private function injectLogger(Definition $definition)
682
    {
683 1
        $definition->addTag('monolog.logger', array(
684 1
            'channel' => 'phpamqplib'
685
        ));
686 1
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
687 1
    }
688
689
    /**
690
     * Get default AMQP exchange options
691
     *
692
     * @return array
693
     */
694 1
    protected function getDefaultExchangeOptions()
695
    {
696
        return array(
697 1
            'name' => '',
698
            'type' => 'direct',
699
            'passive' => true,
700
            'declare' => false
701
        );
702
    }
703
704
    /**
705
     * Get default AMQP queue options
706
     *
707
     * @return array
708
     */
709 30
    protected function getDefaultQueueOptions()
710
    {
711
        return array(
712 30
            'name' => '',
713
            'declare' => false
714
        );
715
    }
716
}
717