Passed
Pull Request — master (#661)
by
unknown
05:46
created

OldSoundRabbitMqExtension::loadProducers()   F

Complexity

Conditions 15
Paths 389

Size

Total Lines 68
Code Lines 46

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 32
CRAP Score 18.7659

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 15
eloc 46
nc 389
nop 0
dl 0
loc 68
rs 2.7208
c 3
b 0
f 0
ccs 32
cts 43
cp 0.7442
crap 18.7659

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
6
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
7
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
8
use Symfony\Component\Config\FileLocator;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\ContainerInterface;
11
use Symfony\Component\DependencyInjection\Definition;
12
use Symfony\Component\DependencyInjection\Extension\Extension;
13
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
14
use Symfony\Component\DependencyInjection\Reference;
15
16
/**
17
 * OldSoundRabbitMqExtension.
18
 *
19
 * @author Alvaro Videla
20
 * @author Marc Weistroff <[email protected]>
21
 */
22
class OldSoundRabbitMqExtension extends Extension
23
{
24
    /**
25
     * @var ContainerBuilder
26
     */
27
    private $container;
28
29
    /**
30
     * @var Boolean Whether the data collector is enabled
31
     */
32
    private $collectorEnabled;
33
34
    private $channelIds = array();
35
36
    private $config = array();
37
38 34
    public function load(array $configs, ContainerBuilder $container)
39
    {
40 34
        $this->container = $container;
41
42 34
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
43 34
        $loader->load('rabbitmq.xml');
44
45 34
        $configuration = $this->getConfiguration($configs, $container);
46 34
        $this->config = $this->processConfiguration($configuration, $configs);
47
48 34
        $this->collectorEnabled = $this->config['enable_collector'];
49
50 34
        $this->loadConnections();
51 34
        $this->loadBindings();
52 34
        $this->loadProducers();
53 34
        $this->loadConsumers();
54 34
        $this->loadMultipleConsumers();
55 34
        $this->loadDynamicConsumers();
56 34
        $this->loadBatchConsumers();
57 34
        $this->loadAnonConsumers();
58 34
        $this->loadRpcClients();
59 34
        $this->loadRpcServers();
60
61 34
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
62 31
            $channels = array();
63 31
            foreach (array_unique($this->channelIds) as $id) {
64 31
                $channels[] = new Reference($id);
65
            }
66
67 31
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
68 31
            $definition->replaceArgument(0, $channels);
69
        } else {
70 3
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
71
        }
72 34
    }
73
74 34
    public function getConfiguration(array $config, ContainerBuilder $container)
75
    {
76 34
        return new Configuration($this->getAlias());
77
    }
78
79 34
    protected function loadConnections()
80
    {
81 34
        foreach ($this->config['connections'] as $key => $connection) {
82 34
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
83
            $classParam =
84 34
                $connection['lazy']
85 25
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
86 34
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
87
88 34
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
89 34
                $classParam, $connection,
90
            ));
91 34
            if (isset($connection['connection_parameters_provider'])) {
92
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
93
                unset($connection['connection_parameters_provider']);
94
            }
95 34
            $definition->setPublic(false);
96 34
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
97 34
            $this->container->setDefinition($factoryName, $definition);
98
99 34
            $definition = new Definition($classParam);
100 34
            if (method_exists($definition, 'setFactory')) {
101
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
102 34
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
103
            } else {
104
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
105
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

105
                $definition->/** @scrutinizer ignore-call */ 
106
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
106
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107
            }
108 34
            $definition->addTag('old_sound_rabbit_mq.connection');
109 34
            $definition->setPublic(true);
110
111 34
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
112
        }
113 34
    }
114
115 34
    protected function loadBindings()
116
    {
117 34
        if ($this->config['sandbox']) {
118
            return;
119
        }
120 34
        foreach ($this->config['bindings'] as $binding) {
121 25
            ksort($binding);
122 25
            $definition = new Definition($binding['class']);
123 25
            $definition->addTag('old_sound_rabbit_mq.binding');
124 25
            $definition->addMethodCall('setArguments', array($binding['arguments']));
125 25
            $definition->addMethodCall('setDestination', array($binding['destination']));
126 25
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
127 25
            $definition->addMethodCall('setExchange', array($binding['exchange']));
128 25
            $definition->addMethodCall('isNowait', array($binding['nowait']));
129 25
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
130 25
            $this->injectConnection($definition, $binding['connection']);
131 25
            $key = md5(json_encode($binding));
132 25
            if ($this->collectorEnabled) {
133
                // in the context of a binding, I don't thing logged channels are needed?
134 25
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
135
            }
136 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
137
        }
138 34
    }
139
140 34
    protected function loadProducers()
141
    {
142 34
        if ($this->config['sandbox'] == false) {
143 34
            foreach ($this->config['producers'] as $key => $producer) {
144 30
                $definition = new Definition($producer['class']);
145 30
                $definition->setPublic(true);
146 30
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
147 30
                $definition->addTag('old_sound_rabbit_mq.producer');
148
                //this producer doesn't define an exchange -> using AMQP Default
149 30
                if (!isset($producer['exchange_options'])) {
150 1
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
151
                }
152 30
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
153
                //this producer doesn't define a queue -> using AMQP Default
154 30
                if (!isset($producer['queue_options'])) {
155 30
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
156
                }
157 30
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
158 30
                $this->injectConnection($definition, $producer['connection']);
159 30
                if ($this->collectorEnabled) {
160 28
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
161
                }
162 30
                if (!$producer['auto_setup_fabric']) {
163
                    $definition->addMethodCall('disableAutoSetupFabric');
164
                }
165
166 30
                if ($producer['enable_logger']) {
167
                    $this->injectLogger($definition);
168
                }
169
170 30
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
171
172 30
                $this->container->setDefinition($producerServiceName, $definition);
173 30
                if (null !== $producer['service_alias']) {
174 25
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
175
                }
176
177
                // register alias for argument auto wiring
178 30
                if (method_exists($this->container, 'registerAliasForArgument')) {
179 30
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
180 30
                    $this->container
181 30
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
182 30
                        ->setPublic(false);
183
184 30
                    $this->container
185 30
                        ->registerAliasForArgument($producerServiceName, $producer['class'], $argName)
186 30
                        ->setPublic(false);
187
                }
188
189
                $definition->addMethodCall('setDefaultRoutingKey', array($producer['default_routing_key']));
190
                if (isset($producer['validator'])) {
191
                    $definition->addMethodCall('setValidator', array($producer['validator']['class'],
192
                                                                     $producer['validator']['schema'],
193
                                                                     $producer['validator']['additionalProperties']));
194
                }
195
            }
196
        } else {
197
            foreach ($this->config['producers'] as $key => $producer) {
198
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
199
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
200
                $this->container->setDefinition($producerServiceName, $definition);
201
202
                // register alias for argumen auto wiring
203
                if (method_exists($this->container, 'registerAliasForArgument')) {
204 34
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
205
                    $this->container
206 34
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
207
                        ->setPublic(false);
208 34
                }
209 29
            }
210 29
        }
211 29
    }
212 29
213
    protected function loadConsumers()
214 29
    {
215
        foreach ($this->config['consumers'] as $key => $consumer) {
216
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
217 29
            $definition->setPublic(true);
218
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
219 29
            $definition->addTag('old_sound_rabbit_mq.consumer');
220
            //this consumer doesn't define an exchange -> using AMQP Default
221
            if (!isset($consumer['exchange_options'])) {
222 29
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
223 29
            }
224
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
225 29
            //this consumer doesn't define a queue -> using AMQP Default
226 25
            if (!isset($consumer['queue_options'])) {
227 25
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
228 25
            }
229 25
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
230
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
231
232
            if (array_key_exists('qos_options', $consumer)) {
233 29
                $definition->addMethodCall('setQosOptions', array(
234
                    $consumer['qos_options']['prefetch_size'],
235
                    $consumer['qos_options']['prefetch_count'],
236 29
                    $consumer['qos_options']['global']
237
                ));
238
            }
239 29
240 25
            if (isset($consumer['idle_timeout'])) {
241
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
242 29
            }
243
            if (isset($consumer['idle_timeout_exit_code'])) {
244
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
245
            }
246
            if (isset($consumer['timeout_wait'])) {
247
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
248
            }
249
            if (isset($consumer['graceful_max_execution'])) {
250
                $definition->addMethodCall(
251
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
252 29
                    array($consumer['graceful_max_execution']['timeout'])
253
                );
254
                $definition->addMethodCall(
255
                    'setGracefulMaxExecutionTimeoutExitCode',
256 29
                    array($consumer['graceful_max_execution']['exit_code'])
257 29
                );
258 28
            }
259
            if (!$consumer['auto_setup_fabric']) {
260
                $definition->addMethodCall('disableAutoSetupFabric');
261 29
            }
262 1
263
            $this->injectConnection($definition, $consumer['connection']);
264
            if ($this->collectorEnabled) {
265 29
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
266 29
            }
267 29
268
            if ($consumer['enable_logger']) {
269
                $this->injectLogger($definition);
270 29
            }
271 29
272 29
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
273 29
            $this->container->setDefinition($name, $definition);
274 29
            $this->addDequeuerAwareCall($consumer['callback'], $name);
275
276 29
            // register alias for argument auto wiring
277 29
            if (method_exists($this->container, 'registerAliasForArgument')) {
278 29
                $argName = !str_ends_with(strtolower($key), 'consumer') ? sprintf('%sConsumer', $key) : $key;
279
                $this->container
280
                    ->registerAliasForArgument($name, ConsumerInterface::class, $argName)
281 34
                    ->setPublic(false);
282
283 34
                $this->container
284
                    ->registerAliasForArgument($name, '%old_sound_rabbit_mq.consumer.class%', $argName)
285 34
                    ->setPublic(false);
286 25
                }
287 25
        }
288
    }
289 25
290
    protected function loadMultipleConsumers()
291
    {
292
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
293
            $queues = array();
294
            $callbacks = array();
295
296 25
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
297 25
                throw new InvalidConfigurationException(
298 25
                    "Error on loading $key multiple consumer. " .
299 25
                    "Either 'queues' or 'queues_provider' parameters should be defined."
300
                );
301
            }
302 25
303
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
304 25
                $queues[$queueOptions['name']] = $queueOptions;
305 25
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
306 25
                $callbacks[] = $queueOptions['callback'];
307 25
            }
308 25
309
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
310 25
            $definition
311 25
                ->setPublic(true)
312 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
313 25
                ->addTag('old_sound_rabbit_mq.multi_consumer')
314
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
315
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
316
317 25
            if ($consumer['queues_provider']) {
318
                $definition->addMethodCall(
319
                    'setQueuesProvider',
320
                    array(new Reference($consumer['queues_provider']))
321
                );
322
            }
323
324
            if (array_key_exists('qos_options', $consumer)) {
325 25
                $definition->addMethodCall('setQosOptions', array(
326
                    $consumer['qos_options']['prefetch_size'],
327
                    $consumer['qos_options']['prefetch_count'],
328 25
                    $consumer['qos_options']['global']
329
                ));
330
            }
331 25
332 25
            if (isset($consumer['idle_timeout'])) {
333
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
334 25
            }
335
            if (isset($consumer['idle_timeout_exit_code'])) {
336
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
337
            }
338
            if (isset($consumer['timeout_wait'])) {
339
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
340
            }
341
            if (isset($consumer['graceful_max_execution'])) {
342
                $definition->addMethodCall(
343
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
344 25
                    array($consumer['graceful_max_execution']['timeout'])
345
                );
346
                $definition->addMethodCall(
347
                    'setGracefulMaxExecutionTimeoutExitCode',
348 25
                    array($consumer['graceful_max_execution']['exit_code'])
349 25
                );
350 25
            }
351
            if (!$consumer['auto_setup_fabric']) {
352
                $definition->addMethodCall('disableAutoSetupFabric');
353 25
            }
354
355
            $this->injectConnection($definition, $consumer['connection']);
356
            if ($this->collectorEnabled) {
357 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
358 25
            }
359 25
360 25
            if ($consumer['enable_logger']) {
361
                $this->injectLogger($definition);
362 25
            }
363 25
364
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
365
            $this->container->setDefinition($name, $definition);
366 34
            if ($consumer['queues_provider']) {
367
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
368 34
            }
369
            foreach ($callbacks as $callback) {
370 34
                $this->addDequeuerAwareCall($callback, $name);
371
            }
372 25
        }
373
    }
374
375
    protected function loadDynamicConsumers()
376
    {
377
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
378
379 25
            if (empty($consumer['queue_options_provider'])) {
380
                throw new InvalidConfigurationException(
381 25
                    "Error on loading $key dynamic consumer. " .
382 25
                    "'queue_provider' parameter should be defined."
383 25
                );
384 25
            }
385 25
386 25
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
387
            $definition
388 25
                ->setPublic(true)
389
                ->addTag('old_sound_rabbit_mq.base_amqp')
390
                ->addTag('old_sound_rabbit_mq.consumer')
391
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
392
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
393
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
394
395
            if (array_key_exists('qos_options', $consumer)) {
396 25
                $definition->addMethodCall('setQosOptions', array(
397 25
                    $consumer['qos_options']['prefetch_size'],
398 25
                    $consumer['qos_options']['prefetch_count'],
399
                    $consumer['qos_options']['global']
400
                ));
401 25
            }
402
403
            $definition->addMethodCall(
404 25
                'setQueueOptionsProvider',
405
                array(new Reference($consumer['queue_options_provider']))
406
            );
407 25
408
            if (isset($consumer['idle_timeout'])) {
409
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
410 25
            }
411
            if (isset($consumer['idle_timeout_exit_code'])) {
412
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
413
            }
414
            if (isset($consumer['timeout_wait'])) {
415
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
416
            }
417
            if (isset($consumer['graceful_max_execution'])) {
418
                $definition->addMethodCall(
419
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
420 25
                    array($consumer['graceful_max_execution']['timeout'])
421
                );
422
                $definition->addMethodCall(
423
                    'setGracefulMaxExecutionTimeoutExitCode',
424 25
                    array($consumer['graceful_max_execution']['exit_code'])
425 25
                );
426 25
            }
427
            if (!$consumer['auto_setup_fabric']) {
428
                $definition->addMethodCall('disableAutoSetupFabric');
429 25
            }
430
431
            $this->injectConnection($definition, $consumer['connection']);
432
            if ($this->collectorEnabled) {
433 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
434 25
            }
435 25
436 25
            if ($consumer['enable_logger']) {
437
                $this->injectLogger($definition);
438 34
            }
439
440 34
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
441
            $this->container->setDefinition($name, $definition);
442 34
            $this->addDequeuerAwareCall($consumer['callback'], $name);
443
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
444
        }
445
    }
446
447
    protected function loadBatchConsumers()
448
    {
449
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
450
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
451
452
            if (!isset($consumer['exchange_options'])) {
453
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
454
            }
455
456
            $definition
457
                ->setPublic(true)
458
                ->addTag('old_sound_rabbit_mq.base_amqp')
459
                ->addTag('old_sound_rabbit_mq.batch_consumer')
460
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
461
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
462
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
463
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
464
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
465
                ->addMethodCall('setQosOptions', array(
466
                    $consumer['qos_options']['prefetch_size'],
467
                    $consumer['qos_options']['prefetch_count'],
468
                    $consumer['qos_options']['global']
469
                ))
470
            ;
471
472
            if (isset($consumer['idle_timeout_exit_code'])) {
473
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
474
            }
475
476
            if (isset($consumer['idle_timeout'])) {
477
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
478
            }
479
480
            if (isset($consumer['graceful_max_execution'])) {
481
                $definition->addMethodCall(
482
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
483
                    array($consumer['graceful_max_execution']['timeout'])
484
                );
485
            }
486
487
            if (!$consumer['auto_setup_fabric']) {
488
                $definition->addMethodCall('disableAutoSetupFabric');
489
            }
490
491
            if ($consumer['keep_alive']) {
492
                $definition->addMethodCall('keepAlive');
493
            }
494
495
            $this->injectConnection($definition, $consumer['connection']);
496
            if ($this->collectorEnabled) {
497
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
498
            }
499 34
500
            if ($consumer['enable_logger']) {
501 34
                $this->injectLogger($definition);
502
            }
503 34
504 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
505
        }
506 25
    }
507 25
508 25
    protected function loadAnonConsumers()
509 25
    {
510 25
        foreach ($this->config['anon_consumers'] as $key => $anon) {
511 25
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
512 25
            $definition
513 25
                ->setPublic(true)
514
                ->addTag('old_sound_rabbit_mq.base_amqp')
515
                ->addTag('old_sound_rabbit_mq.anon_consumer')
516 25
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
517 25
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
518 25
            $this->injectConnection($definition, $anon['connection']);
519
            if ($this->collectorEnabled) {
520 34
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
521
            }
522
523
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
524
            $this->container->setDefinition($name, $definition);
525
            $this->addDequeuerAwareCall($anon['callback'], $name);
526
        }
527
    }
528
529
    /**
530 30
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
531
     * parameter. So we revert the change for right configurations.
532 30
     *
533 1
     * @param array $config
534
     *
535 1
     * @return array
536
     */
537
    private function normalizeArgumentKeys(array $config)
538
    {
539 1
        if (isset($config['arguments'])) {
540 1
            $arguments = $config['arguments'];
541 1
            // support for old configuration
542
            if (is_string($arguments)) {
543
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

543
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
544 1
            }
545
546 1
            $newArguments = array();
547
            foreach ($arguments as $key => $value) {
548 30
                if (strstr($key, '_')) {
549
                    $key = str_replace('_', '-', $key);
550
                }
551
                $newArguments[$key] = $value;
552
            }
553
            $config['arguments'] = $newArguments;
554
        }
555
        return $config;
556
    }
557
558
    /**
559
     * Support for arguments provided as string. Support for old configuration files.
560
     *
561
     * @deprecated
562
     * @param string $arguments
563
     * @return array
564
     */
565
    private function argumentsStringAsArray($arguments)
566
    {
567
        $argumentsArray = array();
568
569
        $argumentPairs = explode(',', $arguments);
570
        foreach ($argumentPairs as $argument) {
571
            $argumentPair = explode(':', $argument);
572
            $type = 'S';
573
            if (isset($argumentPair[2])) {
574
                $type = $argumentPair[2];
575 34
            }
576
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
577 34
        }
578 28
579 28
        return $argumentsArray;
580
    }
581 28
582 28
    protected function loadRpcClients()
583 28
    {
584 28
        foreach ($this->config['rpc_clients'] as $key => $client) {
585 28
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
586
            $definition->setLazy($client['lazy']);
587 28
            $definition
588 28
                ->addTag('old_sound_rabbit_mq.rpc_client')
589
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
590 28
            $this->injectConnection($definition, $client['connection']);
591 28
            if ($this->collectorEnabled) {
592
                $this->injectLoggedChannel($definition, $key, $client['connection']);
593 28
            }
594
            if (array_key_exists('unserializer', $client)) {
595 28
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
596
            }
597 34
            if (array_key_exists('direct_reply_to', $client)) {
598
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
599 34
            }
600
            $definition->setPublic(true);
601 34
602 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
603
        }
604 25
    }
605 25
606 25
    protected function loadRpcServers()
607 25
    {
608 25
        foreach ($this->config['rpc_servers'] as $key => $server) {
609 25
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
610 25
            $definition
611 25
                ->setPublic(true)
612
                ->addTag('old_sound_rabbit_mq.base_amqp')
613 25
                ->addTag('old_sound_rabbit_mq.rpc_server')
614
                ->addMethodCall('initServer', array($key))
615
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
616
            $this->injectConnection($definition, $server['connection']);
617
            if ($this->collectorEnabled) {
618
                $this->injectLoggedChannel($definition, $key, $server['connection']);
619
            }
620 25
            if (array_key_exists('qos_options', $server)) {
621 25
                $definition->addMethodCall('setQosOptions', array(
622
                    $server['qos_options']['prefetch_size'],
623 25
                    $server['qos_options']['prefetch_count'],
624 25
                    $server['qos_options']['global']
625
                ));
626 25
            }
627 25
            if (array_key_exists('exchange_options', $server)) {
628
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
629 25
            }
630
            if (array_key_exists('queue_options', $server)) {
631 34
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
632
            }
633 31
            if (array_key_exists('serializer', $server)) {
634
                $definition->addMethodCall('setSerializer', array($server['serializer']));
635 31
            }
636 31
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
637
        }
638 31
    }
639 31
640 31
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
641
    {
642 31
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
643
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
644 31
        $channel
645 31
            ->setPublic(false)
646 31
            ->addTag('old_sound_rabbit_mq.logged_channel');
647
        $this->injectConnection($channel, $connectionName);
648 33
649
        $this->container->setDefinition($id, $channel);
650 33
651 33
        $this->channelIds[] = $id;
652
        $definition->addArgument(new Reference($id));
653 34
    }
654
655 34
    protected function injectConnection(Definition $definition, $connectionName)
656
    {
657
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
658
    }
659
660
    public function getAlias()
661
    {
662
        return 'old_sound_rabbit_mq';
663
    }
664 29
665
    /**
666 29
     * Add proper dequeuer aware call
667 29
     *
668
     * @param string $callback
669
     * @param string $name
670
     */
671
    protected function addDequeuerAwareCall($callback, $name)
672
    {
673
        if (!$this->container->has($callback)) {
674
            return;
675
        }
676
677 1
        $callbackDefinition = $this->container->findDefinition($callback);
678
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
679 1
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
680 1
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
681
        }
682 1
    }
683 1
684
    private function injectLogger(Definition $definition)
685
    {
686
        $definition->addTag('monolog.logger', array(
687
            'channel' => 'phpamqplib'
688
        ));
689
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
690 1
    }
691
692
    /**
693 1
     * Get default AMQP exchange options
694
     *
695
     * @return array
696
     */
697
    protected function getDefaultExchangeOptions()
698
    {
699
        return array(
700
            'name' => '',
701
            'type' => 'direct',
702
            'passive' => true,
703
            'declare' => false
704
        );
705 30
    }
706
707
    /**
708 30
     * Get default AMQP queue options
709
     *
710
     * @return array
711
     */
712
    protected function getDefaultQueueOptions()
713
    {
714
        return array(
715
            'name' => '',
716
            'declare' => false
717
        );
718
    }
719
}
720