Passed
Push — master ( 1b8288...d8b945 )
by Mihai
02:19 queued 11s
created

OldSoundRabbitMqExtension::loadProducers()   C

Complexity

Conditions 14
Paths 197

Size

Total Lines 61
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 30
CRAP Score 17.7854

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 14
eloc 41
c 3
b 0
f 0
nc 197
nop 0
dl 0
loc 61
ccs 30
cts 41
cp 0.7317
crap 17.7854
rs 5.4583

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
6
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
7
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
8
use Symfony\Component\Config\FileLocator;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\ContainerInterface;
11
use Symfony\Component\DependencyInjection\Definition;
12
use Symfony\Component\DependencyInjection\Extension\Extension;
13
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
14
use Symfony\Component\DependencyInjection\Reference;
15
16
/**
17
 * OldSoundRabbitMqExtension.
18
 *
19
 * @author Alvaro Videla
20
 * @author Marc Weistroff <[email protected]>
21
 */
22
class OldSoundRabbitMqExtension extends Extension
23
{
24
    /**
25
     * @var ContainerBuilder
26
     */
27
    private $container;
28
29
    /**
30
     * @var Boolean Whether the data collector is enabled
31
     */
32
    private $collectorEnabled;
33
34
    private $channelIds = array();
35
36
    private $config = array();
37
38 34
    public function load(array $configs, ContainerBuilder $container)
39
    {
40 34
        $this->container = $container;
41
42 34
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
43 34
        $loader->load('rabbitmq.xml');
44
45 34
        $configuration = $this->getConfiguration($configs, $container);
46 34
        $this->config = $this->processConfiguration($configuration, $configs);
47
48 34
        $this->collectorEnabled = $this->config['enable_collector'];
49
50 34
        $this->loadConnections();
51 34
        $this->loadBindings();
52 34
        $this->loadProducers();
53 34
        $this->loadConsumers();
54 34
        $this->loadMultipleConsumers();
55 34
        $this->loadDynamicConsumers();
56 34
        $this->loadBatchConsumers();
57 34
        $this->loadAnonConsumers();
58 34
        $this->loadRpcClients();
59 34
        $this->loadRpcServers();
60
61 34
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
62 31
            $channels = array();
63 31
            foreach (array_unique($this->channelIds) as $id) {
64 31
                $channels[] = new Reference($id);
65
            }
66
67 31
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
68 31
            $definition->replaceArgument(0, $channels);
69
        } else {
70 3
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
71
        }
72 34
    }
73
74 34
    public function getConfiguration(array $config, ContainerBuilder $container)
75
    {
76 34
        return new Configuration($this->getAlias());
77
    }
78
79 34
    protected function loadConnections()
80
    {
81 34
        foreach ($this->config['connections'] as $key => $connection) {
82 34
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
83
            $classParam =
84 34
                $connection['lazy']
85 25
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
86 34
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
87
88 34
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
89 34
                $classParam, $connection,
90
            ));
91 34
            if (isset($connection['connection_parameters_provider'])) {
92
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
93
                unset($connection['connection_parameters_provider']);
94
            }
95 34
            $definition->setPublic(false);
96 34
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
97 34
            $this->container->setDefinition($factoryName, $definition);
98
99 34
            $definition = new Definition($classParam);
100 34
            if (method_exists($definition, 'setFactory')) {
101
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
102 34
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
103
            } else {
104
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
105
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

105
                $definition->/** @scrutinizer ignore-call */ 
106
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
106
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107
            }
108 34
            $definition->addTag('old_sound_rabbit_mq.connection');
109 34
            $definition->setPublic(true);
110
111 34
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
112
        }
113 34
    }
114
115 34
    protected function loadBindings()
116
    {
117 34
        if ($this->config['sandbox']) {
118
            return;
119
        }
120 34
        foreach ($this->config['bindings'] as $binding) {
121 25
            ksort($binding);
122 25
            $definition = new Definition($binding['class']);
123 25
            $definition->addTag('old_sound_rabbit_mq.binding');
124 25
            $definition->addMethodCall('setArguments', array($binding['arguments']));
125 25
            $definition->addMethodCall('setDestination', array($binding['destination']));
126 25
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
127 25
            $definition->addMethodCall('setExchange', array($binding['exchange']));
128 25
            $definition->addMethodCall('isNowait', array($binding['nowait']));
129 25
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
130 25
            $this->injectConnection($definition, $binding['connection']);
131 25
            $key = md5(json_encode($binding));
132 25
            if ($this->collectorEnabled) {
133
                // in the context of a binding, I don't thing logged channels are needed?
134 25
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
135
            }
136 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
137
        }
138 34
    }
139
140 34
    protected function loadProducers()
141
    {
142 34
        if ($this->config['sandbox'] == false) {
143 34
            foreach ($this->config['producers'] as $key => $producer) {
144 30
                $definition = new Definition($producer['class']);
145 30
                $definition->setPublic(true);
146 30
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
147 30
                $definition->addTag('old_sound_rabbit_mq.producer');
148
                //this producer doesn't define an exchange -> using AMQP Default
149 30
                if (!isset($producer['exchange_options'])) {
150 1
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
151
                }
152 30
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
153
                //this producer doesn't define a queue -> using AMQP Default
154 30
                if (!isset($producer['queue_options'])) {
155 30
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
156
                }
157 30
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
158 30
                $this->injectConnection($definition, $producer['connection']);
159 30
                if ($this->collectorEnabled) {
160 28
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
161
                }
162 30
                if (!$producer['auto_setup_fabric']) {
163
                    $definition->addMethodCall('disableAutoSetupFabric');
164
                }
165
166 30
                if ($producer['enable_logger']) {
167
                    $this->injectLogger($definition);
168
                }
169
170 30
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
171
172 30
                $this->container->setDefinition($producerServiceName, $definition);
173 30
                if (null !== $producer['service_alias']) {
174 25
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
175
                }
176
177
                // register alias for argument auto wiring
178 30
                if (method_exists($this->container, 'registerAliasForArgument')) {
179 30
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
180 30
                    $this->container
181 30
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
182 30
                        ->setPublic(false);
183
184 30
                    $this->container
185 30
                        ->registerAliasForArgument($producerServiceName, $producer['class'], $argName)
186 30
                        ->setPublic(false);
187
                }
188
            }
189
        } else {
190
            foreach ($this->config['producers'] as $key => $producer) {
191
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
192
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
193
                $this->container->setDefinition($producerServiceName, $definition);
194
195
                // register alias for argumen auto wiring
196
                if (method_exists($this->container, 'registerAliasForArgument')) {
197
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
198
                    $this->container
199
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
200
                        ->setPublic(false);
201
                }
202
            }
203
        }
204 34
    }
205
206 34
    protected function loadConsumers()
207
    {
208 34
        foreach ($this->config['consumers'] as $key => $consumer) {
209 29
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
210 29
            $definition->setPublic(true);
211 29
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
212 29
            $definition->addTag('old_sound_rabbit_mq.consumer');
213
            //this consumer doesn't define an exchange -> using AMQP Default
214 29
            if (!isset($consumer['exchange_options'])) {
215
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
216
            }
217 29
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
218
            //this consumer doesn't define a queue -> using AMQP Default
219 29
            if (!isset($consumer['queue_options'])) {
220
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
221
            }
222 29
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
223 29
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
224
225 29
            if (array_key_exists('qos_options', $consumer)) {
226 25
                $definition->addMethodCall('setQosOptions', array(
227 25
                    $consumer['qos_options']['prefetch_size'],
228 25
                    $consumer['qos_options']['prefetch_count'],
229 25
                    $consumer['qos_options']['global']
230
                ));
231
            }
232
233 29
            if (isset($consumer['idle_timeout'])) {
234
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
235
            }
236 29
            if (isset($consumer['idle_timeout_exit_code'])) {
237
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
238
            }
239 29
            if (isset($consumer['timeout_wait'])) {
240 25
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
241
            }
242 29
            if (isset($consumer['graceful_max_execution'])) {
243
                $definition->addMethodCall(
244
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
245
                    array($consumer['graceful_max_execution']['timeout'])
246
                );
247
                $definition->addMethodCall(
248
                    'setGracefulMaxExecutionTimeoutExitCode',
249
                    array($consumer['graceful_max_execution']['exit_code'])
250
                );
251
            }
252 29
            if (!$consumer['auto_setup_fabric']) {
253
                $definition->addMethodCall('disableAutoSetupFabric');
254
            }
255
256 29
            $this->injectConnection($definition, $consumer['connection']);
257 29
            if ($this->collectorEnabled) {
258 28
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
259
            }
260
261 29
            if ($consumer['enable_logger']) {
262 1
                $this->injectLogger($definition);
263
            }
264
265 29
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
266 29
            $this->container->setDefinition($name, $definition);
267 29
            $this->addDequeuerAwareCall($consumer['callback'], $name);
268
269
            // register alias for argument auto wiring
270 29
            if (method_exists($this->container, 'registerAliasForArgument')) {
271 29
                $argName = !str_ends_with(strtolower($key), 'consumer') ? sprintf('%sConsumer', $key) : $key;
272 29
                $this->container
273 29
                    ->registerAliasForArgument($name, ConsumerInterface::class, $argName)
274 29
                    ->setPublic(false);
275
276 29
                $this->container
277 29
                    ->registerAliasForArgument($name, '%old_sound_rabbit_mq.consumer.class%', $argName)
278 29
                    ->setPublic(false);
279
                }
280
        }
281 34
    }
282
283 34
    protected function loadMultipleConsumers()
284
    {
285 34
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
286 25
            $queues = array();
287 25
            $callbacks = array();
288
289 25
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
290
                throw new InvalidConfigurationException(
291
                    "Error on loading $key multiple consumer. " .
292
                    "Either 'queues' or 'queues_provider' parameters should be defined."
293
                );
294
            }
295
296 25
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
297 25
                $queues[$queueOptions['name']] = $queueOptions;
298 25
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
299 25
                $callbacks[] = $queueOptions['callback'];
300
            }
301
302 25
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
303
            $definition
304 25
                ->setPublic(true)
305 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
306 25
                ->addTag('old_sound_rabbit_mq.multi_consumer')
307 25
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
308 25
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
309
310 25
            if ($consumer['queues_provider']) {
311 25
                $definition->addMethodCall(
312 25
                    'setQueuesProvider',
313 25
                    array(new Reference($consumer['queues_provider']))
314
                );
315
            }
316
317 25
            if (array_key_exists('qos_options', $consumer)) {
318
                $definition->addMethodCall('setQosOptions', array(
319
                    $consumer['qos_options']['prefetch_size'],
320
                    $consumer['qos_options']['prefetch_count'],
321
                    $consumer['qos_options']['global']
322
                ));
323
            }
324
325 25
            if (isset($consumer['idle_timeout'])) {
326
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
327
            }
328 25
            if (isset($consumer['idle_timeout_exit_code'])) {
329
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
330
            }
331 25
            if (isset($consumer['timeout_wait'])) {
332 25
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
333
            }
334 25
            if (isset($consumer['graceful_max_execution'])) {
335
                $definition->addMethodCall(
336
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
337
                    array($consumer['graceful_max_execution']['timeout'])
338
                );
339
                $definition->addMethodCall(
340
                    'setGracefulMaxExecutionTimeoutExitCode',
341
                    array($consumer['graceful_max_execution']['exit_code'])
342
                );
343
            }
344 25
            if (!$consumer['auto_setup_fabric']) {
345
                $definition->addMethodCall('disableAutoSetupFabric');
346
            }
347
348 25
            $this->injectConnection($definition, $consumer['connection']);
349 25
            if ($this->collectorEnabled) {
350 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
351
            }
352
353 25
            if ($consumer['enable_logger']) {
354
                $this->injectLogger($definition);
355
            }
356
357 25
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
358 25
            $this->container->setDefinition($name, $definition);
359 25
            if ($consumer['queues_provider']) {
360 25
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
361
            }
362 25
            foreach ($callbacks as $callback) {
363 25
                $this->addDequeuerAwareCall($callback, $name);
364
            }
365
        }
366 34
    }
367
368 34
    protected function loadDynamicConsumers()
369
    {
370 34
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
371
372 25
            if (empty($consumer['queue_options_provider'])) {
373
                throw new InvalidConfigurationException(
374
                    "Error on loading $key dynamic consumer. " .
375
                    "'queue_provider' parameter should be defined."
376
                );
377
            }
378
379 25
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
380
            $definition
381 25
                ->setPublic(true)
382 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
383 25
                ->addTag('old_sound_rabbit_mq.consumer')
384 25
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
385 25
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
386 25
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
387
388 25
            if (array_key_exists('qos_options', $consumer)) {
389
                $definition->addMethodCall('setQosOptions', array(
390
                    $consumer['qos_options']['prefetch_size'],
391
                    $consumer['qos_options']['prefetch_count'],
392
                    $consumer['qos_options']['global']
393
                ));
394
            }
395
396 25
            $definition->addMethodCall(
397 25
                'setQueueOptionsProvider',
398 25
                array(new Reference($consumer['queue_options_provider']))
399
            );
400
401 25
            if (isset($consumer['idle_timeout'])) {
402
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
403
            }
404 25
            if (isset($consumer['idle_timeout_exit_code'])) {
405
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
406
            }
407 25
            if (isset($consumer['timeout_wait'])) {
408
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
409
            }
410 25
            if (isset($consumer['graceful_max_execution'])) {
411
                $definition->addMethodCall(
412
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
413
                    array($consumer['graceful_max_execution']['timeout'])
414
                );
415
                $definition->addMethodCall(
416
                    'setGracefulMaxExecutionTimeoutExitCode',
417
                    array($consumer['graceful_max_execution']['exit_code'])
418
                );
419
            }
420 25
            if (!$consumer['auto_setup_fabric']) {
421
                $definition->addMethodCall('disableAutoSetupFabric');
422
            }
423
424 25
            $this->injectConnection($definition, $consumer['connection']);
425 25
            if ($this->collectorEnabled) {
426 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
427
            }
428
429 25
            if ($consumer['enable_logger']) {
430
                $this->injectLogger($definition);
431
            }
432
433 25
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
434 25
            $this->container->setDefinition($name, $definition);
435 25
            $this->addDequeuerAwareCall($consumer['callback'], $name);
436 25
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
437
        }
438 34
    }
439
440 34
    protected function loadBatchConsumers()
441
    {
442 34
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
443
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
444
445
            if (!isset($consumer['exchange_options'])) {
446
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
447
            }
448
449
            $definition
450
                ->setPublic(true)
451
                ->addTag('old_sound_rabbit_mq.base_amqp')
452
                ->addTag('old_sound_rabbit_mq.batch_consumer')
453
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
454
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
455
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
456
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
457
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
458
                ->addMethodCall('setQosOptions', array(
459
                    $consumer['qos_options']['prefetch_size'],
460
                    $consumer['qos_options']['prefetch_count'],
461
                    $consumer['qos_options']['global']
462
                ))
463
            ;
464
465
            if (isset($consumer['idle_timeout_exit_code'])) {
466
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
467
            }
468
469
            if (isset($consumer['idle_timeout'])) {
470
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
471
            }
472
473
            if (isset($consumer['graceful_max_execution'])) {
474
                $definition->addMethodCall(
475
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
476
                    array($consumer['graceful_max_execution']['timeout'])
477
                );
478
            }
479
480
            if (!$consumer['auto_setup_fabric']) {
481
                $definition->addMethodCall('disableAutoSetupFabric');
482
            }
483
484
            if ($consumer['keep_alive']) {
485
                $definition->addMethodCall('keepAlive');
486
            }
487
488
            $this->injectConnection($definition, $consumer['connection']);
489
            if ($this->collectorEnabled) {
490
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
491
            }
492
493
            if ($consumer['enable_logger']) {
494
                $this->injectLogger($definition);
495
            }
496
497
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
498
        }
499 34
    }
500
501 34
    protected function loadAnonConsumers()
502
    {
503 34
        foreach ($this->config['anon_consumers'] as $key => $anon) {
504 25
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
505
            $definition
506 25
                ->setPublic(true)
507 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
508 25
                ->addTag('old_sound_rabbit_mq.anon_consumer')
509 25
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
510 25
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
511 25
            $this->injectConnection($definition, $anon['connection']);
512 25
            if ($this->collectorEnabled) {
513 25
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
514
            }
515
516 25
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
517 25
            $this->container->setDefinition($name, $definition);
518 25
            $this->addDequeuerAwareCall($anon['callback'], $name);
519
        }
520 34
    }
521
522
    /**
523
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
524
     * parameter. So we revert the change for right configurations.
525
     *
526
     * @param array $config
527
     *
528
     * @return array
529
     */
530 30
    private function normalizeArgumentKeys(array $config)
531
    {
532 30
        if (isset($config['arguments'])) {
533 1
            $arguments = $config['arguments'];
534
            // support for old configuration
535 1
            if (is_string($arguments)) {
536
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

536
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
537
            }
538
539 1
            $newArguments = array();
540 1
            foreach ($arguments as $key => $value) {
541 1
                if (strstr($key, '_')) {
542
                    $key = str_replace('_', '-', $key);
543
                }
544 1
                $newArguments[$key] = $value;
545
            }
546 1
            $config['arguments'] = $newArguments;
547
        }
548 30
        return $config;
549
    }
550
551
    /**
552
     * Support for arguments provided as string. Support for old configuration files.
553
     *
554
     * @deprecated
555
     * @param string $arguments
556
     * @return array
557
     */
558
    private function argumentsStringAsArray($arguments)
559
    {
560
        $argumentsArray = array();
561
562
        $argumentPairs = explode(',', $arguments);
563
        foreach ($argumentPairs as $argument) {
564
            $argumentPair = explode(':', $argument);
565
            $type = 'S';
566
            if (isset($argumentPair[2])) {
567
                $type = $argumentPair[2];
568
            }
569
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
570
        }
571
572
        return $argumentsArray;
573
    }
574
575 34
    protected function loadRpcClients()
576
    {
577 34
        foreach ($this->config['rpc_clients'] as $key => $client) {
578 28
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
579 28
            $definition->setLazy($client['lazy']);
580
            $definition
581 28
                ->addTag('old_sound_rabbit_mq.rpc_client')
582 28
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
583 28
            $this->injectConnection($definition, $client['connection']);
584 28
            if ($this->collectorEnabled) {
585 28
                $this->injectLoggedChannel($definition, $key, $client['connection']);
586
            }
587 28
            if (array_key_exists('unserializer', $client)) {
588 28
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
589
            }
590 28
            if (array_key_exists('direct_reply_to', $client)) {
591 28
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
592
            }
593 28
            $definition->setPublic(true);
594
595 28
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
596
        }
597 34
    }
598
599 34
    protected function loadRpcServers()
600
    {
601 34
        foreach ($this->config['rpc_servers'] as $key => $server) {
602 25
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
603
            $definition
604 25
                ->setPublic(true)
605 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
606 25
                ->addTag('old_sound_rabbit_mq.rpc_server')
607 25
                ->addMethodCall('initServer', array($key))
608 25
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
609 25
            $this->injectConnection($definition, $server['connection']);
610 25
            if ($this->collectorEnabled) {
611 25
                $this->injectLoggedChannel($definition, $key, $server['connection']);
612
            }
613 25
            if (array_key_exists('qos_options', $server)) {
614
                $definition->addMethodCall('setQosOptions', array(
615
                    $server['qos_options']['prefetch_size'],
616
                    $server['qos_options']['prefetch_count'],
617
                    $server['qos_options']['global']
618
                ));
619
            }
620 25
            if (array_key_exists('exchange_options', $server)) {
621 25
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
622
            }
623 25
            if (array_key_exists('queue_options', $server)) {
624 25
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
625
            }
626 25
            if (array_key_exists('serializer', $server)) {
627 25
                $definition->addMethodCall('setSerializer', array($server['serializer']));
628
            }
629 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
630
        }
631 34
    }
632
633 31
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
634
    {
635 31
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
636 31
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
637
        $channel
638 31
            ->setPublic(false)
639 31
            ->addTag('old_sound_rabbit_mq.logged_channel');
640 31
        $this->injectConnection($channel, $connectionName);
641
642 31
        $this->container->setDefinition($id, $channel);
643
644 31
        $this->channelIds[] = $id;
645 31
        $definition->addArgument(new Reference($id));
646 31
    }
647
648 33
    protected function injectConnection(Definition $definition, $connectionName)
649
    {
650 33
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
651 33
    }
652
653 34
    public function getAlias()
654
    {
655 34
        return 'old_sound_rabbit_mq';
656
    }
657
658
    /**
659
     * Add proper dequeuer aware call
660
     *
661
     * @param string $callback
662
     * @param string $name
663
     */
664 29
    protected function addDequeuerAwareCall($callback, $name)
665
    {
666 29
        if (!$this->container->has($callback)) {
667 29
            return;
668
        }
669
670
        $callbackDefinition = $this->container->findDefinition($callback);
671
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
672
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
673
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
674
        }
675
    }
676
677 1
    private function injectLogger(Definition $definition)
678
    {
679 1
        $definition->addTag('monolog.logger', array(
680 1
            'channel' => 'phpamqplib'
681
        ));
682 1
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
683 1
    }
684
685
    /**
686
     * Get default AMQP exchange options
687
     *
688
     * @return array
689
     */
690 1
    protected function getDefaultExchangeOptions()
691
    {
692
        return array(
693 1
            'name' => '',
694
            'type' => 'direct',
695
            'passive' => true,
696
            'declare' => false
697
        );
698
    }
699
700
    /**
701
     * Get default AMQP queue options
702
     *
703
     * @return array
704
     */
705 30
    protected function getDefaultQueueOptions()
706
    {
707
        return array(
708 30
            'name' => '',
709
            'declare' => false
710
        );
711
    }
712
}
713