Passed
Pull Request — master (#662)
by
unknown
08:28
created

OldSoundRabbitMqExtension::loadConsumers()   F

Complexity

Conditions 14
Paths 3073

Size

Total Lines 73
Code Lines 48

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 36
CRAP Score 16.5112

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 14
eloc 48
c 2
b 0
f 0
nc 3073
nop 0
dl 0
loc 73
ccs 36
cts 47
cp 0.766
crap 16.5112
rs 2.1

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
6
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
7
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
8
use Symfony\Component\Config\FileLocator;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\ContainerInterface;
11
use Symfony\Component\DependencyInjection\Definition;
12
use Symfony\Component\DependencyInjection\Extension\Extension;
13
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
14
use Symfony\Component\DependencyInjection\Reference;
15
16
/**
17
 * OldSoundRabbitMqExtension.
18
 *
19
 * @author Alvaro Videla
20
 * @author Marc Weistroff <[email protected]>
21
 */
22
class OldSoundRabbitMqExtension extends Extension
23
{
24
    /**
25
     * @var ContainerBuilder
26
     */
27
    private $container;
28
29
    /**
30
     * @var Boolean Whether the data collector is enabled
31
     */
32
    private $collectorEnabled;
33
34
    private $channelIds = array();
35
36
    private $config = array();
37
38 34
    public function load(array $configs, ContainerBuilder $container)
39
    {
40 34
        $this->container = $container;
41
42 34
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
43 34
        $loader->load('rabbitmq.xml');
44
45 34
        $configuration = $this->getConfiguration($configs, $container);
46 34
        $this->config = $this->processConfiguration($configuration, $configs);
47
48 34
        $this->collectorEnabled = $this->config['enable_collector'];
49
50 34
        $this->loadConnections();
51 34
        $this->loadBindings();
52 34
        $this->loadProducers();
53 34
        $this->loadConsumers();
54 34
        $this->loadMultipleConsumers();
55 34
        $this->loadDynamicConsumers();
56 34
        $this->loadBatchConsumers();
57 34
        $this->loadAnonConsumers();
58 34
        $this->loadRpcClients();
59 34
        $this->loadRpcServers();
60
61 34
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
62 31
            $channels = array();
63 31
            foreach (array_unique($this->channelIds) as $id) {
64 31
                $channels[] = new Reference($id);
65
            }
66
67 31
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
68 31
            $definition->replaceArgument(0, $channels);
69
        } else {
70 3
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
71
        }
72 34
    }
73
74 34
    public function getConfiguration(array $config, ContainerBuilder $container)
75
    {
76 34
        return new Configuration($this->getAlias());
77
    }
78
79 34
    protected function loadConnections()
80
    {
81 34
        foreach ($this->config['connections'] as $key => $connection) {
82 34
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
83
            $classParam =
84 34
                $connection['lazy']
85 25
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
86 34
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
87
88 34
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
89 34
                $classParam, $connection,
90
            ));
91 34
            if (isset($connection['connection_parameters_provider'])) {
92
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
93
                unset($connection['connection_parameters_provider']);
94
            }
95 34
            $definition->setPublic(false);
96 34
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
97 34
            $this->container->setDefinition($factoryName, $definition);
98
99 34
            $definition = new Definition($classParam);
100 34
            if (method_exists($definition, 'setFactory')) {
101
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
102 34
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
103
            } else {
104
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
105
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

105
                $definition->/** @scrutinizer ignore-call */ 
106
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
106
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107
            }
108 34
            $definition->addTag('old_sound_rabbit_mq.connection');
109 34
            $definition->setPublic(true);
110
111 34
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
112
        }
113 34
    }
114
115 34
    protected function loadBindings()
116
    {
117 34
        if ($this->config['sandbox']) {
118
            return;
119
        }
120 34
        foreach ($this->config['bindings'] as $binding) {
121 25
            ksort($binding);
122 25
            $definition = new Definition($binding['class']);
123 25
            $definition->addTag('old_sound_rabbit_mq.binding');
124 25
            $definition->addMethodCall('setArguments', array($binding['arguments']));
125 25
            $definition->addMethodCall('setDestination', array($binding['destination']));
126 25
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
127 25
            $definition->addMethodCall('setExchange', array($binding['exchange']));
128 25
            $definition->addMethodCall('isNowait', array($binding['nowait']));
129 25
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
130 25
            $this->injectConnection($definition, $binding['connection']);
131 25
            $key = md5(json_encode($binding));
132 25
            if ($this->collectorEnabled) {
133
                // in the context of a binding, I don't thing logged channels are needed?
134 25
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
135
            }
136 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
137
        }
138 34
    }
139
140 34
    protected function loadProducers()
141
    {
142 34
        if ($this->config['sandbox'] == false) {
143 34
            foreach ($this->config['producers'] as $key => $producer) {
144 30
                $definition = new Definition($producer['class']);
145 30
                $definition->setPublic(true);
146 30
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
147 30
                $definition->addTag('old_sound_rabbit_mq.producer');
148
                //this producer doesn't define an exchange -> using AMQP Default
149 30
                if (!isset($producer['exchange_options'])) {
150 1
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
151
                }
152 30
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
153
                //this producer doesn't define a queue -> using AMQP Default
154 30
                if (!isset($producer['queue_options'])) {
155 30
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
156
                }
157 30
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
158 30
                $this->injectConnection($definition, $producer['connection']);
159 30
                if ($this->collectorEnabled) {
160 28
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
161
                }
162 30
                if (!$producer['auto_setup_fabric']) {
163
                    $definition->addMethodCall('disableAutoSetupFabric');
164
                }
165
                if (isset($producer['confirm_select'])) {
166 30
                    $definition->addArgument(null);
167
                    $confirmSelect = boolval($producer['confirm_select']);
168
                    $definition->addArgument($confirmSelect);
169
                }
170 30
                $confirmationTimeout = 0;
171
                if (isset($producer['confirm_timeout'])) {
172 30
                    $confirmationTimeout = intval($producer['confirm_timeout']);
173 30
                }
174 25
                $definition->addMethodCall('setConfirmationTimeout', array($confirmationTimeout));
175
176
                if ($producer['enable_logger']) {
177
                    $this->injectLogger($definition);
178 30
                }
179 30
180 30
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
181 30
182 30
                $this->container->setDefinition($producerServiceName, $definition);
183
                if (null !== $producer['service_alias']) {
184 30
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
185 30
                }
186 30
187
                // register alias for argument auto wiring
188
                if (method_exists($this->container, 'registerAliasForArgument')) {
189
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
190
                    $this->container
191
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
192
                        ->setPublic(false);
193
194
                    $this->container
195
                        ->registerAliasForArgument($producerServiceName, $producer['class'], $argName)
196
                        ->setPublic(false);
197
                }
198
199
                $definition->addMethodCall('setDefaultRoutingKey', array($producer['default_routing_key']));
200
            }
201
        } else {
202
            foreach ($this->config['producers'] as $key => $producer) {
203
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
204 34
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
205
                $this->container->setDefinition($producerServiceName, $definition);
206 34
207
                // register alias for argumen auto wiring
208 34
                if (method_exists($this->container, 'registerAliasForArgument')) {
209 29
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
210 29
                    $this->container
211 29
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
212 29
                        ->setPublic(false);
213
                }
214 29
            }
215
        }
216
    }
217 29
218
    protected function loadConsumers()
219 29
    {
220
        foreach ($this->config['consumers'] as $key => $consumer) {
221
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
222 29
            $definition->setPublic(true);
223 29
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
224
            $definition->addTag('old_sound_rabbit_mq.consumer');
225 29
            //this consumer doesn't define an exchange -> using AMQP Default
226 25
            if (!isset($consumer['exchange_options'])) {
227 25
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
228 25
            }
229 25
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
230
            //this consumer doesn't define a queue -> using AMQP Default
231
            if (!isset($consumer['queue_options'])) {
232
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
233 29
            }
234
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
235
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
236 29
237
            if (array_key_exists('qos_options', $consumer)) {
238
                $definition->addMethodCall('setQosOptions', array(
239 29
                    $consumer['qos_options']['prefetch_size'],
240 25
                    $consumer['qos_options']['prefetch_count'],
241
                    $consumer['qos_options']['global']
242 29
                ));
243
            }
244
245
            if (isset($consumer['idle_timeout'])) {
246
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
247
            }
248
            if (isset($consumer['idle_timeout_exit_code'])) {
249
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
250
            }
251
            if (isset($consumer['timeout_wait'])) {
252 29
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
253
            }
254
            if (isset($consumer['graceful_max_execution'])) {
255
                $definition->addMethodCall(
256 29
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
257 29
                    array($consumer['graceful_max_execution']['timeout'])
258 28
                );
259
                $definition->addMethodCall(
260
                    'setGracefulMaxExecutionTimeoutExitCode',
261 29
                    array($consumer['graceful_max_execution']['exit_code'])
262 1
                );
263
            }
264
            if (!$consumer['auto_setup_fabric']) {
265 29
                $definition->addMethodCall('disableAutoSetupFabric');
266 29
            }
267 29
268
            $this->injectConnection($definition, $consumer['connection']);
269
            if ($this->collectorEnabled) {
270 29
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
271 29
            }
272 29
273 29
            if ($consumer['enable_logger']) {
274 29
                $this->injectLogger($definition);
275
            }
276 29
277 29
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
278 29
            $this->container->setDefinition($name, $definition);
279
            $this->addDequeuerAwareCall($consumer['callback'], $name);
280
281 34
            // register alias for argument auto wiring
282
            if (method_exists($this->container, 'registerAliasForArgument')) {
283 34
                $argName = !str_ends_with(strtolower($key), 'consumer') ? sprintf('%sConsumer', $key) : $key;
284
                $this->container
285 34
                    ->registerAliasForArgument($name, ConsumerInterface::class, $argName)
286 25
                    ->setPublic(false);
287 25
288
                $this->container
289 25
                    ->registerAliasForArgument($name, '%old_sound_rabbit_mq.consumer.class%', $argName)
290
                    ->setPublic(false);
291
                }
292
        }
293
    }
294
295
    protected function loadMultipleConsumers()
296 25
    {
297 25
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
298 25
            $queues = array();
299 25
            $callbacks = array();
300
301
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
302 25
                throw new InvalidConfigurationException(
303
                    "Error on loading $key multiple consumer. " .
304 25
                    "Either 'queues' or 'queues_provider' parameters should be defined."
305 25
                );
306 25
            }
307 25
308 25
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
309
                $queues[$queueOptions['name']] = $queueOptions;
310 25
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
311 25
                $callbacks[] = $queueOptions['callback'];
312 25
            }
313 25
314
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
315
            $definition
316
                ->setPublic(true)
317 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
318
                ->addTag('old_sound_rabbit_mq.multi_consumer')
319
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
320
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
321
322
            if ($consumer['queues_provider']) {
323
                $definition->addMethodCall(
324
                    'setQueuesProvider',
325 25
                    array(new Reference($consumer['queues_provider']))
326
                );
327
            }
328 25
329
            if (array_key_exists('qos_options', $consumer)) {
330
                $definition->addMethodCall('setQosOptions', array(
331 25
                    $consumer['qos_options']['prefetch_size'],
332 25
                    $consumer['qos_options']['prefetch_count'],
333
                    $consumer['qos_options']['global']
334 25
                ));
335
            }
336
337
            if (isset($consumer['idle_timeout'])) {
338
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
339
            }
340
            if (isset($consumer['idle_timeout_exit_code'])) {
341
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
342
            }
343
            if (isset($consumer['timeout_wait'])) {
344 25
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
345
            }
346
            if (isset($consumer['graceful_max_execution'])) {
347
                $definition->addMethodCall(
348 25
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
349 25
                    array($consumer['graceful_max_execution']['timeout'])
350 25
                );
351
                $definition->addMethodCall(
352
                    'setGracefulMaxExecutionTimeoutExitCode',
353 25
                    array($consumer['graceful_max_execution']['exit_code'])
354
                );
355
            }
356
            if (!$consumer['auto_setup_fabric']) {
357 25
                $definition->addMethodCall('disableAutoSetupFabric');
358 25
            }
359 25
360 25
            $this->injectConnection($definition, $consumer['connection']);
361
            if ($this->collectorEnabled) {
362 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
363 25
            }
364
365
            if ($consumer['enable_logger']) {
366 34
                $this->injectLogger($definition);
367
            }
368 34
369
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
370 34
            $this->container->setDefinition($name, $definition);
371
            if ($consumer['queues_provider']) {
372 25
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
373
            }
374
            foreach ($callbacks as $callback) {
375
                $this->addDequeuerAwareCall($callback, $name);
376
            }
377
        }
378
    }
379 25
380
    protected function loadDynamicConsumers()
381 25
    {
382 25
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
383 25
384 25
            if (empty($consumer['queue_options_provider'])) {
385 25
                throw new InvalidConfigurationException(
386 25
                    "Error on loading $key dynamic consumer. " .
387
                    "'queue_provider' parameter should be defined."
388 25
                );
389
            }
390
391
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
392
            $definition
393
                ->setPublic(true)
394
                ->addTag('old_sound_rabbit_mq.base_amqp')
395
                ->addTag('old_sound_rabbit_mq.consumer')
396 25
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
397 25
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
398 25
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
399
400
            if (array_key_exists('qos_options', $consumer)) {
401 25
                $definition->addMethodCall('setQosOptions', array(
402
                    $consumer['qos_options']['prefetch_size'],
403
                    $consumer['qos_options']['prefetch_count'],
404 25
                    $consumer['qos_options']['global']
405
                ));
406
            }
407 25
408
            $definition->addMethodCall(
409
                'setQueueOptionsProvider',
410 25
                array(new Reference($consumer['queue_options_provider']))
411
            );
412
413
            if (isset($consumer['idle_timeout'])) {
414
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
415
            }
416
            if (isset($consumer['idle_timeout_exit_code'])) {
417
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
418
            }
419
            if (isset($consumer['timeout_wait'])) {
420 25
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
421
            }
422
            if (isset($consumer['graceful_max_execution'])) {
423
                $definition->addMethodCall(
424 25
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
425 25
                    array($consumer['graceful_max_execution']['timeout'])
426 25
                );
427
                $definition->addMethodCall(
428
                    'setGracefulMaxExecutionTimeoutExitCode',
429 25
                    array($consumer['graceful_max_execution']['exit_code'])
430
                );
431
            }
432
            if (!$consumer['auto_setup_fabric']) {
433 25
                $definition->addMethodCall('disableAutoSetupFabric');
434 25
            }
435 25
436 25
            $this->injectConnection($definition, $consumer['connection']);
437
            if ($this->collectorEnabled) {
438 34
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
439
            }
440 34
441
            if ($consumer['enable_logger']) {
442 34
                $this->injectLogger($definition);
443
            }
444
445
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
446
            $this->container->setDefinition($name, $definition);
447
            $this->addDequeuerAwareCall($consumer['callback'], $name);
448
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
449
        }
450
    }
451
452
    protected function loadBatchConsumers()
453
    {
454
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
455
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
456
457
            if (!isset($consumer['exchange_options'])) {
458
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
459
            }
460
461
            $definition
462
                ->setPublic(true)
463
                ->addTag('old_sound_rabbit_mq.base_amqp')
464
                ->addTag('old_sound_rabbit_mq.batch_consumer')
465
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
466
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
467
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
468
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
469
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
470
                ->addMethodCall('setQosOptions', array(
471
                    $consumer['qos_options']['prefetch_size'],
472
                    $consumer['qos_options']['prefetch_count'],
473
                    $consumer['qos_options']['global']
474
                ))
475
            ;
476
477
            if (isset($consumer['idle_timeout_exit_code'])) {
478
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
479
            }
480
481
            if (isset($consumer['idle_timeout'])) {
482
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
483
            }
484
485
            if (isset($consumer['graceful_max_execution'])) {
486
                $definition->addMethodCall(
487
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
488
                    array($consumer['graceful_max_execution']['timeout'])
489
                );
490
            }
491
492
            if (!$consumer['auto_setup_fabric']) {
493
                $definition->addMethodCall('disableAutoSetupFabric');
494
            }
495
496
            if ($consumer['keep_alive']) {
497
                $definition->addMethodCall('keepAlive');
498
            }
499 34
500
            $this->injectConnection($definition, $consumer['connection']);
501 34
            if ($this->collectorEnabled) {
502
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
503 34
            }
504 25
505
            if ($consumer['enable_logger']) {
506 25
                $this->injectLogger($definition);
507 25
            }
508 25
509 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
510 25
        }
511 25
    }
512 25
513 25
    protected function loadAnonConsumers()
514
    {
515
        foreach ($this->config['anon_consumers'] as $key => $anon) {
516 25
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
517 25
            $definition
518 25
                ->setPublic(true)
519
                ->addTag('old_sound_rabbit_mq.base_amqp')
520 34
                ->addTag('old_sound_rabbit_mq.anon_consumer')
521
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
522
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
523
            $this->injectConnection($definition, $anon['connection']);
524
            if ($this->collectorEnabled) {
525
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
526
            }
527
528
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
529
            $this->container->setDefinition($name, $definition);
530 30
            $this->addDequeuerAwareCall($anon['callback'], $name);
531
        }
532 30
    }
533 1
534
    /**
535 1
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
536
     * parameter. So we revert the change for right configurations.
537
     *
538
     * @param array $config
539 1
     *
540 1
     * @return array
541 1
     */
542
    private function normalizeArgumentKeys(array $config)
543
    {
544 1
        if (isset($config['arguments'])) {
545
            $arguments = $config['arguments'];
546 1
            // support for old configuration
547
            if (is_string($arguments)) {
548 30
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

548
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
549
            }
550
551
            $newArguments = array();
552
            foreach ($arguments as $key => $value) {
553
                if (strstr($key, '_')) {
554
                    $key = str_replace('_', '-', $key);
555
                }
556
                $newArguments[$key] = $value;
557
            }
558
            $config['arguments'] = $newArguments;
559
        }
560
        return $config;
561
    }
562
563
    /**
564
     * Support for arguments provided as string. Support for old configuration files.
565
     *
566
     * @deprecated
567
     * @param string $arguments
568
     * @return array
569
     */
570
    private function argumentsStringAsArray($arguments)
571
    {
572
        $argumentsArray = array();
573
574
        $argumentPairs = explode(',', $arguments);
575 34
        foreach ($argumentPairs as $argument) {
576
            $argumentPair = explode(':', $argument);
577 34
            $type = 'S';
578 28
            if (isset($argumentPair[2])) {
579 28
                $type = $argumentPair[2];
580
            }
581 28
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
582 28
        }
583 28
584 28
        return $argumentsArray;
585 28
    }
586
587 28
    protected function loadRpcClients()
588 28
    {
589
        foreach ($this->config['rpc_clients'] as $key => $client) {
590 28
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
591 28
            $definition->setLazy($client['lazy']);
592
            $definition
593 28
                ->addTag('old_sound_rabbit_mq.rpc_client')
594
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
595 28
            $this->injectConnection($definition, $client['connection']);
596
            if ($this->collectorEnabled) {
597 34
                $this->injectLoggedChannel($definition, $key, $client['connection']);
598
            }
599 34
            if (array_key_exists('unserializer', $client)) {
600
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
601 34
            }
602 25
            if (array_key_exists('direct_reply_to', $client)) {
603
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
604 25
            }
605 25
            $definition->setPublic(true);
606 25
607 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
608 25
        }
609 25
    }
610 25
611 25
    protected function loadRpcServers()
612
    {
613 25
        foreach ($this->config['rpc_servers'] as $key => $server) {
614
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
615
            $definition
616
                ->setPublic(true)
617
                ->addTag('old_sound_rabbit_mq.base_amqp')
618
                ->addTag('old_sound_rabbit_mq.rpc_server')
619
                ->addMethodCall('initServer', array($key))
620 25
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
621 25
            $this->injectConnection($definition, $server['connection']);
622
            if ($this->collectorEnabled) {
623 25
                $this->injectLoggedChannel($definition, $key, $server['connection']);
624 25
            }
625
            if (array_key_exists('qos_options', $server)) {
626 25
                $definition->addMethodCall('setQosOptions', array(
627 25
                    $server['qos_options']['prefetch_size'],
628
                    $server['qos_options']['prefetch_count'],
629 25
                    $server['qos_options']['global']
630
                ));
631 34
            }
632
            if (array_key_exists('exchange_options', $server)) {
633 31
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
634
            }
635 31
            if (array_key_exists('queue_options', $server)) {
636 31
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
637
            }
638 31
            if (array_key_exists('serializer', $server)) {
639 31
                $definition->addMethodCall('setSerializer', array($server['serializer']));
640 31
            }
641
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
642 31
        }
643
    }
644 31
645 31
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
646 31
    {
647
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
648 33
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
649
        $channel
650 33
            ->setPublic(false)
651 33
            ->addTag('old_sound_rabbit_mq.logged_channel');
652
        $this->injectConnection($channel, $connectionName);
653 34
654
        $this->container->setDefinition($id, $channel);
655 34
656
        $this->channelIds[] = $id;
657
        $definition->addArgument(new Reference($id));
658
    }
659
660
    protected function injectConnection(Definition $definition, $connectionName)
661
    {
662
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
663
    }
664 29
665
    public function getAlias()
666 29
    {
667 29
        return 'old_sound_rabbit_mq';
668
    }
669
670
    /**
671
     * Add proper dequeuer aware call
672
     *
673
     * @param string $callback
674
     * @param string $name
675
     */
676
    protected function addDequeuerAwareCall($callback, $name)
677 1
    {
678
        if (!$this->container->has($callback)) {
679 1
            return;
680 1
        }
681
682 1
        $callbackDefinition = $this->container->findDefinition($callback);
683 1
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
684
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
685
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
686
        }
687
    }
688
689
    private function injectLogger(Definition $definition)
690 1
    {
691
        $definition->addTag('monolog.logger', array(
692
            'channel' => 'phpamqplib'
693 1
        ));
694
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
695
    }
696
697
    /**
698
     * Get default AMQP exchange options
699
     *
700
     * @return array
701
     */
702
    protected function getDefaultExchangeOptions()
703
    {
704
        return array(
705 30
            'name' => '',
706
            'type' => 'direct',
707
            'passive' => true,
708 30
            'declare' => false
709
        );
710
    }
711
712
    /**
713
     * Get default AMQP queue options
714
     *
715
     * @return array
716
     */
717
    protected function getDefaultQueueOptions()
718
    {
719
        return array(
720
            'name' => '',
721
            'declare' => false
722
        );
723
    }
724
}
725