Passed
Pull Request — master (#661)
by
unknown
07:25
created

OldSoundRabbitMqExtension::loadProducers()   F

Complexity

Conditions 15
Paths 389

Size

Total Lines 66
Code Lines 44

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 31
CRAP Score 19.0418

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 15
eloc 44
c 3
b 0
f 0
nc 389
nop 0
dl 0
loc 66
ccs 31
cts 42
cp 0.7381
crap 19.0418
rs 2.7208

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
6
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
7
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
8
use Symfony\Component\Config\FileLocator;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\ContainerInterface;
11
use Symfony\Component\DependencyInjection\Definition;
12
use Symfony\Component\DependencyInjection\Extension\Extension;
13
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
14
use Symfony\Component\DependencyInjection\Reference;
15
16
/**
17
 * OldSoundRabbitMqExtension.
18
 *
19
 * @author Alvaro Videla
20
 * @author Marc Weistroff <[email protected]>
21
 */
22
class OldSoundRabbitMqExtension extends Extension
23
{
24
    /**
25
     * @var ContainerBuilder
26
     */
27
    private $container;
28
29
    /**
30
     * @var Boolean Whether the data collector is enabled
31
     */
32
    private $collectorEnabled;
33
34
    private $channelIds = array();
35
36
    private $config = array();
37
38 34
    public function load(array $configs, ContainerBuilder $container)
39
    {
40 34
        $this->container = $container;
41
42 34
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
43 34
        $loader->load('rabbitmq.xml');
44
45 34
        $configuration = $this->getConfiguration($configs, $container);
46 34
        $this->config = $this->processConfiguration($configuration, $configs);
47
48 34
        $this->collectorEnabled = $this->config['enable_collector'];
49
50 34
        $this->loadConnections();
51 34
        $this->loadBindings();
52 34
        $this->loadProducers();
53 34
        $this->loadConsumers();
54 34
        $this->loadMultipleConsumers();
55 34
        $this->loadDynamicConsumers();
56 34
        $this->loadBatchConsumers();
57 34
        $this->loadAnonConsumers();
58 34
        $this->loadRpcClients();
59 34
        $this->loadRpcServers();
60
61 34
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
62 31
            $channels = array();
63 31
            foreach (array_unique($this->channelIds) as $id) {
64 31
                $channels[] = new Reference($id);
65
            }
66
67 31
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
68 31
            $definition->replaceArgument(0, $channels);
69
        } else {
70 3
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
71
        }
72 34
    }
73
74 34
    public function getConfiguration(array $config, ContainerBuilder $container)
75
    {
76 34
        return new Configuration($this->getAlias());
77
    }
78
79 34
    protected function loadConnections()
80
    {
81 34
        foreach ($this->config['connections'] as $key => $connection) {
82 34
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
83
            $classParam =
84 34
                $connection['lazy']
85 25
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
86 34
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
87
88 34
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
89 34
                $classParam, $connection,
90
            ));
91 34
            if (isset($connection['connection_parameters_provider'])) {
92
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
93
                unset($connection['connection_parameters_provider']);
94
            }
95 34
            $definition->setPublic(false);
96 34
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
97 34
            $this->container->setDefinition($factoryName, $definition);
98
99 34
            $definition = new Definition($classParam);
100 34
            if (method_exists($definition, 'setFactory')) {
101
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
102 34
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
103
            } else {
104
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
105
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

105
                $definition->/** @scrutinizer ignore-call */ 
106
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
106
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107
            }
108 34
            $definition->addTag('old_sound_rabbit_mq.connection');
109 34
            $definition->setPublic(true);
110
111 34
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
112
        }
113 34
    }
114
115 34
    protected function loadBindings()
116
    {
117 34
        if ($this->config['sandbox']) {
118
            return;
119
        }
120 34
        foreach ($this->config['bindings'] as $binding) {
121 25
            ksort($binding);
122 25
            $definition = new Definition($binding['class']);
123 25
            $definition->addTag('old_sound_rabbit_mq.binding');
124 25
            $definition->addMethodCall('setArguments', array($binding['arguments']));
125 25
            $definition->addMethodCall('setDestination', array($binding['destination']));
126 25
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
127 25
            $definition->addMethodCall('setExchange', array($binding['exchange']));
128 25
            $definition->addMethodCall('isNowait', array($binding['nowait']));
129 25
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
130 25
            $this->injectConnection($definition, $binding['connection']);
131 25
            $key = md5(json_encode($binding));
132 25
            if ($this->collectorEnabled) {
133
                // in the context of a binding, I don't thing logged channels are needed?
134 25
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
135
            }
136 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
137
        }
138 34
    }
139
140 34
    protected function loadProducers()
141
    {
142 34
        if ($this->config['sandbox'] == false) {
143 34
            foreach ($this->config['producers'] as $key => $producer) {
144 30
                $definition = new Definition($producer['class']);
145 30
                $definition->setPublic(true);
146 30
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
147 30
                $definition->addTag('old_sound_rabbit_mq.producer');
148
                //this producer doesn't define an exchange -> using AMQP Default
149 30
                if (!isset($producer['exchange_options'])) {
150 1
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
151
                }
152 30
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
153
                //this producer doesn't define a queue -> using AMQP Default
154 30
                if (!isset($producer['queue_options'])) {
155 30
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
156
                }
157 30
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
158 30
                $this->injectConnection($definition, $producer['connection']);
159 30
                if ($this->collectorEnabled) {
160 28
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
161
                }
162 30
                if (!$producer['auto_setup_fabric']) {
163
                    $definition->addMethodCall('disableAutoSetupFabric');
164
                }
165
166 30
                if ($producer['enable_logger']) {
167
                    $this->injectLogger($definition);
168
                }
169
170 30
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
171
172 30
                $this->container->setDefinition($producerServiceName, $definition);
173 30
                if (null !== $producer['service_alias']) {
174 25
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
175
                }
176
177
                // register alias for argument auto wiring
178 30
                if (method_exists($this->container, 'registerAliasForArgument')) {
179 30
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
180 30
                    $this->container
181 30
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
182 30
                        ->setPublic(false);
183
184 30
                    $this->container
185 30
                        ->registerAliasForArgument($producerServiceName, $producer['class'], $argName)
186 30
                        ->setPublic(false);
187
                }
188
189
                $definition->addMethodCall('setDefaultRoutingKey', array($producer['default_routing_key']));
190
                if (isset($producer['validator'])) {
191
                    $definition->addMethodCall('setValidator', array($producer['validator']['class'], $producer['validator']['schema'], $producer['validator']['definitions']));
192
                }
193
            }
194
        } else {
195
            foreach ($this->config['producers'] as $key => $producer) {
196
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
197
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
198
                $this->container->setDefinition($producerServiceName, $definition);
199
200
                // register alias for argumen auto wiring
201
                if (method_exists($this->container, 'registerAliasForArgument')) {
202
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
203
                    $this->container
204 34
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
205
                        ->setPublic(false);
206 34
                }
207
            }
208 34
        }
209 29
    }
210 29
211 29
    protected function loadConsumers()
212 29
    {
213
        foreach ($this->config['consumers'] as $key => $consumer) {
214 29
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
215
            $definition->setPublic(true);
216
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
217 29
            $definition->addTag('old_sound_rabbit_mq.consumer');
218
            //this consumer doesn't define an exchange -> using AMQP Default
219 29
            if (!isset($consumer['exchange_options'])) {
220
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
221
            }
222 29
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
223 29
            //this consumer doesn't define a queue -> using AMQP Default
224
            if (!isset($consumer['queue_options'])) {
225 29
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
226 25
            }
227 25
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
228 25
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
229 25
230
            if (array_key_exists('qos_options', $consumer)) {
231
                $definition->addMethodCall('setQosOptions', array(
232
                    $consumer['qos_options']['prefetch_size'],
233 29
                    $consumer['qos_options']['prefetch_count'],
234
                    $consumer['qos_options']['global']
235
                ));
236 29
            }
237
238
            if (isset($consumer['idle_timeout'])) {
239 29
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
240 25
            }
241
            if (isset($consumer['idle_timeout_exit_code'])) {
242 29
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
243
            }
244
            if (isset($consumer['timeout_wait'])) {
245
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
246
            }
247
            if (isset($consumer['graceful_max_execution'])) {
248
                $definition->addMethodCall(
249
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
250
                    array($consumer['graceful_max_execution']['timeout'])
251
                );
252 29
                $definition->addMethodCall(
253
                    'setGracefulMaxExecutionTimeoutExitCode',
254
                    array($consumer['graceful_max_execution']['exit_code'])
255
                );
256 29
            }
257 29
            if (!$consumer['auto_setup_fabric']) {
258 28
                $definition->addMethodCall('disableAutoSetupFabric');
259
            }
260
261 29
            $this->injectConnection($definition, $consumer['connection']);
262 1
            if ($this->collectorEnabled) {
263
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
264
            }
265 29
266 29
            if ($consumer['enable_logger']) {
267 29
                $this->injectLogger($definition);
268
            }
269
270 29
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
271 29
            $this->container->setDefinition($name, $definition);
272 29
            $this->addDequeuerAwareCall($consumer['callback'], $name);
273 29
274 29
            // register alias for argument auto wiring
275
            if (method_exists($this->container, 'registerAliasForArgument')) {
276 29
                $argName = !str_ends_with(strtolower($key), 'consumer') ? sprintf('%sConsumer', $key) : $key;
277 29
                $this->container
278 29
                    ->registerAliasForArgument($name, ConsumerInterface::class, $argName)
279
                    ->setPublic(false);
280
281 34
                $this->container
282
                    ->registerAliasForArgument($name, '%old_sound_rabbit_mq.consumer.class%', $argName)
283 34
                    ->setPublic(false);
284
                }
285 34
        }
286 25
    }
287 25
288
    protected function loadMultipleConsumers()
289 25
    {
290
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
291
            $queues = array();
292
            $callbacks = array();
293
294
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
295
                throw new InvalidConfigurationException(
296 25
                    "Error on loading $key multiple consumer. " .
297 25
                    "Either 'queues' or 'queues_provider' parameters should be defined."
298 25
                );
299 25
            }
300
301
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
302 25
                $queues[$queueOptions['name']] = $queueOptions;
303
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
304 25
                $callbacks[] = $queueOptions['callback'];
305 25
            }
306 25
307 25
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
308 25
            $definition
309
                ->setPublic(true)
310 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
311 25
                ->addTag('old_sound_rabbit_mq.multi_consumer')
312 25
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
313 25
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
314
315
            if ($consumer['queues_provider']) {
316
                $definition->addMethodCall(
317 25
                    'setQueuesProvider',
318
                    array(new Reference($consumer['queues_provider']))
319
                );
320
            }
321
322
            if (array_key_exists('qos_options', $consumer)) {
323
                $definition->addMethodCall('setQosOptions', array(
324
                    $consumer['qos_options']['prefetch_size'],
325 25
                    $consumer['qos_options']['prefetch_count'],
326
                    $consumer['qos_options']['global']
327
                ));
328 25
            }
329
330
            if (isset($consumer['idle_timeout'])) {
331 25
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
332 25
            }
333
            if (isset($consumer['idle_timeout_exit_code'])) {
334 25
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
335
            }
336
            if (isset($consumer['timeout_wait'])) {
337
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
338
            }
339
            if (isset($consumer['graceful_max_execution'])) {
340
                $definition->addMethodCall(
341
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
342
                    array($consumer['graceful_max_execution']['timeout'])
343
                );
344 25
                $definition->addMethodCall(
345
                    'setGracefulMaxExecutionTimeoutExitCode',
346
                    array($consumer['graceful_max_execution']['exit_code'])
347
                );
348 25
            }
349 25
            if (!$consumer['auto_setup_fabric']) {
350 25
                $definition->addMethodCall('disableAutoSetupFabric');
351
            }
352
353 25
            $this->injectConnection($definition, $consumer['connection']);
354
            if ($this->collectorEnabled) {
355
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
356
            }
357 25
358 25
            if ($consumer['enable_logger']) {
359 25
                $this->injectLogger($definition);
360 25
            }
361
362 25
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
363 25
            $this->container->setDefinition($name, $definition);
364
            if ($consumer['queues_provider']) {
365
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
366 34
            }
367
            foreach ($callbacks as $callback) {
368 34
                $this->addDequeuerAwareCall($callback, $name);
369
            }
370 34
        }
371
    }
372 25
373
    protected function loadDynamicConsumers()
374
    {
375
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
376
377
            if (empty($consumer['queue_options_provider'])) {
378
                throw new InvalidConfigurationException(
379 25
                    "Error on loading $key dynamic consumer. " .
380
                    "'queue_provider' parameter should be defined."
381 25
                );
382 25
            }
383 25
384 25
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
385 25
            $definition
386 25
                ->setPublic(true)
387
                ->addTag('old_sound_rabbit_mq.base_amqp')
388 25
                ->addTag('old_sound_rabbit_mq.consumer')
389
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
390
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
391
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
392
393
            if (array_key_exists('qos_options', $consumer)) {
394
                $definition->addMethodCall('setQosOptions', array(
395
                    $consumer['qos_options']['prefetch_size'],
396 25
                    $consumer['qos_options']['prefetch_count'],
397 25
                    $consumer['qos_options']['global']
398 25
                ));
399
            }
400
401 25
            $definition->addMethodCall(
402
                'setQueueOptionsProvider',
403
                array(new Reference($consumer['queue_options_provider']))
404 25
            );
405
406
            if (isset($consumer['idle_timeout'])) {
407 25
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
408
            }
409
            if (isset($consumer['idle_timeout_exit_code'])) {
410 25
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
411
            }
412
            if (isset($consumer['timeout_wait'])) {
413
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
414
            }
415
            if (isset($consumer['graceful_max_execution'])) {
416
                $definition->addMethodCall(
417
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
418
                    array($consumer['graceful_max_execution']['timeout'])
419
                );
420 25
                $definition->addMethodCall(
421
                    'setGracefulMaxExecutionTimeoutExitCode',
422
                    array($consumer['graceful_max_execution']['exit_code'])
423
                );
424 25
            }
425 25
            if (!$consumer['auto_setup_fabric']) {
426 25
                $definition->addMethodCall('disableAutoSetupFabric');
427
            }
428
429 25
            $this->injectConnection($definition, $consumer['connection']);
430
            if ($this->collectorEnabled) {
431
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
432
            }
433 25
434 25
            if ($consumer['enable_logger']) {
435 25
                $this->injectLogger($definition);
436 25
            }
437
438 34
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
439
            $this->container->setDefinition($name, $definition);
440 34
            $this->addDequeuerAwareCall($consumer['callback'], $name);
441
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
442 34
        }
443
    }
444
445
    protected function loadBatchConsumers()
446
    {
447
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
448
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
449
450
            if (!isset($consumer['exchange_options'])) {
451
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
452
            }
453
454
            $definition
455
                ->setPublic(true)
456
                ->addTag('old_sound_rabbit_mq.base_amqp')
457
                ->addTag('old_sound_rabbit_mq.batch_consumer')
458
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
459
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
460
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
461
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
462
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
463
                ->addMethodCall('setQosOptions', array(
464
                    $consumer['qos_options']['prefetch_size'],
465
                    $consumer['qos_options']['prefetch_count'],
466
                    $consumer['qos_options']['global']
467
                ))
468
            ;
469
470
            if (isset($consumer['idle_timeout_exit_code'])) {
471
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
472
            }
473
474
            if (isset($consumer['idle_timeout'])) {
475
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
476
            }
477
478
            if (isset($consumer['graceful_max_execution'])) {
479
                $definition->addMethodCall(
480
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
481
                    array($consumer['graceful_max_execution']['timeout'])
482
                );
483
            }
484
485
            if (!$consumer['auto_setup_fabric']) {
486
                $definition->addMethodCall('disableAutoSetupFabric');
487
            }
488
489
            if ($consumer['keep_alive']) {
490
                $definition->addMethodCall('keepAlive');
491
            }
492
493
            $this->injectConnection($definition, $consumer['connection']);
494
            if ($this->collectorEnabled) {
495
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
496
            }
497
498
            if ($consumer['enable_logger']) {
499 34
                $this->injectLogger($definition);
500
            }
501 34
502
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
503 34
        }
504 25
    }
505
506 25
    protected function loadAnonConsumers()
507 25
    {
508 25
        foreach ($this->config['anon_consumers'] as $key => $anon) {
509 25
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
510 25
            $definition
511 25
                ->setPublic(true)
512 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
513 25
                ->addTag('old_sound_rabbit_mq.anon_consumer')
514
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
515
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
516 25
            $this->injectConnection($definition, $anon['connection']);
517 25
            if ($this->collectorEnabled) {
518 25
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
519
            }
520 34
521
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
522
            $this->container->setDefinition($name, $definition);
523
            $this->addDequeuerAwareCall($anon['callback'], $name);
524
        }
525
    }
526
527
    /**
528
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
529
     * parameter. So we revert the change for right configurations.
530 30
     *
531
     * @param array $config
532 30
     *
533 1
     * @return array
534
     */
535 1
    private function normalizeArgumentKeys(array $config)
536
    {
537
        if (isset($config['arguments'])) {
538
            $arguments = $config['arguments'];
539 1
            // support for old configuration
540 1
            if (is_string($arguments)) {
541 1
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

541
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
542
            }
543
544 1
            $newArguments = array();
545
            foreach ($arguments as $key => $value) {
546 1
                if (strstr($key, '_')) {
547
                    $key = str_replace('_', '-', $key);
548 30
                }
549
                $newArguments[$key] = $value;
550
            }
551
            $config['arguments'] = $newArguments;
552
        }
553
        return $config;
554
    }
555
556
    /**
557
     * Support for arguments provided as string. Support for old configuration files.
558
     *
559
     * @deprecated
560
     * @param string $arguments
561
     * @return array
562
     */
563
    private function argumentsStringAsArray($arguments)
564
    {
565
        $argumentsArray = array();
566
567
        $argumentPairs = explode(',', $arguments);
568
        foreach ($argumentPairs as $argument) {
569
            $argumentPair = explode(':', $argument);
570
            $type = 'S';
571
            if (isset($argumentPair[2])) {
572
                $type = $argumentPair[2];
573
            }
574
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
575 34
        }
576
577 34
        return $argumentsArray;
578 28
    }
579 28
580
    protected function loadRpcClients()
581 28
    {
582 28
        foreach ($this->config['rpc_clients'] as $key => $client) {
583 28
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
584 28
            $definition->setLazy($client['lazy']);
585 28
            $definition
586
                ->addTag('old_sound_rabbit_mq.rpc_client')
587 28
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
588 28
            $this->injectConnection($definition, $client['connection']);
589
            if ($this->collectorEnabled) {
590 28
                $this->injectLoggedChannel($definition, $key, $client['connection']);
591 28
            }
592
            if (array_key_exists('unserializer', $client)) {
593 28
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
594
            }
595 28
            if (array_key_exists('direct_reply_to', $client)) {
596
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
597 34
            }
598
            $definition->setPublic(true);
599 34
600
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
601 34
        }
602 25
    }
603
604 25
    protected function loadRpcServers()
605 25
    {
606 25
        foreach ($this->config['rpc_servers'] as $key => $server) {
607 25
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
608 25
            $definition
609 25
                ->setPublic(true)
610 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
611 25
                ->addTag('old_sound_rabbit_mq.rpc_server')
612
                ->addMethodCall('initServer', array($key))
613 25
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
614
            $this->injectConnection($definition, $server['connection']);
615
            if ($this->collectorEnabled) {
616
                $this->injectLoggedChannel($definition, $key, $server['connection']);
617
            }
618
            if (array_key_exists('qos_options', $server)) {
619
                $definition->addMethodCall('setQosOptions', array(
620 25
                    $server['qos_options']['prefetch_size'],
621 25
                    $server['qos_options']['prefetch_count'],
622
                    $server['qos_options']['global']
623 25
                ));
624 25
            }
625
            if (array_key_exists('exchange_options', $server)) {
626 25
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
627 25
            }
628
            if (array_key_exists('queue_options', $server)) {
629 25
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
630
            }
631 34
            if (array_key_exists('serializer', $server)) {
632
                $definition->addMethodCall('setSerializer', array($server['serializer']));
633 31
            }
634
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
635 31
        }
636 31
    }
637
638 31
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
639 31
    {
640 31
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
641
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
642 31
        $channel
643
            ->setPublic(false)
644 31
            ->addTag('old_sound_rabbit_mq.logged_channel');
645 31
        $this->injectConnection($channel, $connectionName);
646 31
647
        $this->container->setDefinition($id, $channel);
648 33
649
        $this->channelIds[] = $id;
650 33
        $definition->addArgument(new Reference($id));
651 33
    }
652
653 34
    protected function injectConnection(Definition $definition, $connectionName)
654
    {
655 34
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
656
    }
657
658
    public function getAlias()
659
    {
660
        return 'old_sound_rabbit_mq';
661
    }
662
663
    /**
664 29
     * Add proper dequeuer aware call
665
     *
666 29
     * @param string $callback
667 29
     * @param string $name
668
     */
669
    protected function addDequeuerAwareCall($callback, $name)
670
    {
671
        if (!$this->container->has($callback)) {
672
            return;
673
        }
674
675
        $callbackDefinition = $this->container->findDefinition($callback);
676
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
677 1
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
678
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
679 1
        }
680 1
    }
681
682 1
    private function injectLogger(Definition $definition)
683 1
    {
684
        $definition->addTag('monolog.logger', array(
685
            'channel' => 'phpamqplib'
686
        ));
687
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
688
    }
689
690 1
    /**
691
     * Get default AMQP exchange options
692
     *
693 1
     * @return array
694
     */
695
    protected function getDefaultExchangeOptions()
696
    {
697
        return array(
698
            'name' => '',
699
            'type' => 'direct',
700
            'passive' => true,
701
            'declare' => false
702
        );
703
    }
704
705 30
    /**
706
     * Get default AMQP queue options
707
     *
708 30
     * @return array
709
     */
710
    protected function getDefaultQueueOptions()
711
    {
712
        return array(
713
            'name' => '',
714
            'declare' => false
715
        );
716
    }
717
}
718