Completed
Pull Request — master (#38)
by Aleksandr
07:11
created

OldSoundRabbitMqExtension::loadProducers()   B

Complexity

Conditions 10
Paths 67

Size

Total Lines 41
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 21
CRAP Score 10.7111

Importance

Changes 4
Bugs 0 Features 0
Metric Value
cc 10
eloc 27
c 4
b 0
f 0
nc 67
nop 0
dl 0
loc 41
ccs 21
cts 26
cp 0.8077
crap 10.7111
rs 7.6666

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
6
use Symfony\Component\DependencyInjection\Argument\TaggedIteratorArgument;
7
use Symfony\Component\DependencyInjection\ContainerInterface;
8
use Symfony\Component\DependencyInjection\Extension\Extension;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\Definition;
11
use Symfony\Component\DependencyInjection\Reference;
12
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
13
use Symfony\Component\Config\FileLocator;
14
15
/**
16
 * OldSoundRabbitMqExtension.
17
 *
18
 * @author Alvaro Videla
19
 * @author Marc Weistroff <[email protected]>
20
 */
21
class OldSoundRabbitMqExtension extends Extension
22
{
23
    /**
24
     * @var ContainerBuilder
25
     */
26
    private $container;
27
28
    /**
29
     * @var Boolean Whether the data collector is enabled
30
     */
31
    private $collectorEnabled;
32
33
    private $channelIds = array();
34
    private $groups = array();
35
36 31
    private $config = array();
37
38 31
    public function load(array $configs, ContainerBuilder $container)
39
    {
40 31
        $this->container = $container;
41 31
42
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
43 31
        $loader->load('rabbitmq.xml');
44 31
45
        $configuration = $this->getConfiguration($configs, $container);
46 31
        $this->config = $this->processConfiguration($configuration, $configs);
47
48 31
        $this->collectorEnabled = $this->config['enable_collector'];
49 31
50 31
        $this->loadConnections();
51 31
        $this->loadBindings();
52 31
        $this->loadProducers();
53 31
        $this->loadConsumers();
54 31
        $this->loadMultipleConsumers();
55 31
        $this->loadDynamicConsumers();
56 31
        $this->loadBatchConsumers();
57 31
        $this->loadAnonConsumers();
58
        $this->loadGroupConsumers();
59 31
        $this->loadRpcClients();
60 28
        $this->loadRpcServers();
61 28
62 28
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
63
            $channels = array();
64
            foreach (array_unique($this->channelIds) as $id) {
65 28
                $channels[] = new Reference($id);
66 28
            }
67
68 3
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
69
            $definition->replaceArgument(0, $channels);
70 31
        } else {
71
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
72 31
        }
73
    }
74 31
75
    public function getConfiguration(array $config, ContainerBuilder $container)
76
    {
77 31
        return new Configuration($this->getAlias());
78
    }
79 31
80 31
    protected function loadConnections()
81
    {
82 31
        foreach ($this->config['connections'] as $key => $connection) {
83 22
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
84 31
            $classParam =
85
                $connection['lazy']
86 31
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
87 31
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
88
89 31
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
90
                $classParam, $connection,
91
            ));
92
            if (isset($connection['connection_parameters_provider'])) {
93 31
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
94 31
                unset($connection['connection_parameters_provider']);
95 31
            }
96
            $definition->setPublic(false);
97 31
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
98 31
            $this->container->setDefinition($factoryName, $definition);
99
100 31
            $definition = new Definition($classParam);
101
            if (method_exists($definition, 'setFactory')) {
102
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
103
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
104
            } else {
105
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
106 31
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107 31
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

107
                $definition->/** @scrutinizer ignore-call */ 
108
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
108
            }
109 31
            $definition->addTag('old_sound_rabbit_mq.connection');
110
            $definition->setPublic(true);
111 31
112
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
113 31
        }
114
    }
115 31
116
    protected function loadBindings()
117
    {
118 31
        if ($this->config['sandbox']) {
119 22
            return;
120 22
        }
121 22
        foreach ($this->config['bindings'] as $binding) {
122 22
            ksort($binding);
123 22
            $definition = new Definition($binding['class']);
124 22
            $definition->addTag('old_sound_rabbit_mq.binding');
125 22
            $definition->addMethodCall('setArguments', array($binding['arguments']));
126 22
            $definition->addMethodCall('setDestination', array($binding['destination']));
127 22
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
128 22
            $definition->addMethodCall('setExchange', array($binding['exchange']));
129 22
            $definition->addMethodCall('isNowait', array($binding['nowait']));
130 22
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
131
            $this->injectConnection($definition, $binding['connection']);
132 22
            $key = md5(json_encode($binding));
133
            if ($this->collectorEnabled) {
134 22
                // in the context of a binding, I don't thing logged channels are needed?
135
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
136 31
            }
137
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
138 31
        }
139
    }
140 31
141 31
    protected function loadProducers()
142 27
    {
143 27
        if ($this->config['sandbox'] == false) {
144 27
            foreach ($this->config['producers'] as $key => $producer) {
145 27
                $definition = new Definition($producer['class']);
146
                $definition->setPublic(true);
147 27
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
148 1
                $definition->addTag('old_sound_rabbit_mq.producer');
149
                //this producer doesn't define an exchange -> using AMQP Default
150 27
                if (!isset($producer['exchange_options'])) {
151
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
152 27
                }
153 27
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
154
                //this producer doesn't define a queue -> using AMQP Default
155 27
                if (!isset($producer['queue_options'])) {
156 27
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
157 27
                }
158 25
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
159
                $this->injectConnection($definition, $producer['connection']);
160 27
                if ($this->collectorEnabled) {
161
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
162
                }
163
                if (!$producer['auto_setup_fabric']) {
164 27
                    $definition->addMethodCall('disableAutoSetupFabric');
165
                }
166
167
                if ($producer['enable_logger']) {
168 27
                    $this->injectLogger($definition);
169
                }
170 27
171 27
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
172 22
173
                $this->container->setDefinition($producerServiceName, $definition);
174
                if (null !== $producer['service_alias']) {
175
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
176
                }
177
            }
178
        } else {
179
            foreach ($this->config['producers'] as $key => $producer) {
180
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
181 31
                $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer', $key), $definition);
182
            }
183 31
        }
184
    }
185 31
186 26
    protected function loadConsumers()
187 26
    {
188 26
        foreach ($this->config['consumers'] as $key => $consumer) {
189 26
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
190
            $definition->setPublic(true);
191 26
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
192
            $definition->addTag('old_sound_rabbit_mq.consumer');
193
            foreach ($consumer['groups'] as $group) {
194 26
                $this->groups[] = $group;
195
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
196 26
            }
197
            //this consumer doesn't define an exchange -> using AMQP Default
198
            if (!isset($consumer['exchange_options'])) {
199 26
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
200 26
            }
201
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
202 26
            //this consumer doesn't define a queue -> using AMQP Default
203 22
            if (!isset($consumer['queue_options'])) {
204 22
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
205 22
            }
206 22
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
207
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
208
209
            if (array_key_exists('qos_options', $consumer)) {
210 26
                $definition->addMethodCall('setQosOptions', array(
211
                    $consumer['qos_options']['prefetch_size'],
212
                    $consumer['qos_options']['prefetch_count'],
213 26
                    $consumer['qos_options']['global']
214
                ));
215
            }
216 26
217
            if (isset($consumer['idle_timeout'])) {
218
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
219
            }
220
            if (isset($consumer['idle_timeout_exit_code'])) {
221
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
222
            }
223
            if (isset($consumer['timeout_wait'])) {
224
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
225
            }
226 26
            if (isset($consumer['graceful_max_execution'])) {
227
                $definition->addMethodCall(
228
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
229
                    array($consumer['graceful_max_execution']['timeout'])
230 26
                );
231 26
                $definition->addMethodCall(
232 25
                    'setGracefulMaxExecutionTimeoutExitCode',
233
                    array($consumer['graceful_max_execution']['exit_code'])
234
                );
235 26
            }
236 1
            if (!$consumer['auto_setup_fabric']) {
237
                $definition->addMethodCall('disableAutoSetupFabric');
238
            }
239 26
240 26
            $this->injectConnection($definition, $consumer['connection']);
241 26
            if ($this->collectorEnabled) {
242
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
243 31
            }
244
245 31
            if ($consumer['enable_logger']) {
246
                $this->injectLogger($definition);
247 31
            }
248 22
249 22
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
250
            $this->container->setDefinition($name, $definition);
251 22
            $this->addDequeuerAwareCall($consumer['callback'], $name);
252
        }
253
    }
254
255
    protected function loadMultipleConsumers()
256
    {
257
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
258 22
            $queues = array();
259 22
            $callbacks = array();
260 22
261 22
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
262
                throw new InvalidConfigurationException(
263
                    "Error on loading $key multiple consumer. " .
264 22
                    "Either 'queues' or 'queues_provider' parameters should be defined."
265
                );
266 22
            }
267 22
268 22
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
269 22
                $queues[$queueOptions['name']] = $queueOptions;
270 22
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
271
                $callbacks[] = $queueOptions['callback'];
272 22
            }
273 22
274 22
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
275 22
            $definition
276
                ->setPublic(true)
277
                ->addTag('old_sound_rabbit_mq.base_amqp')
278
                ->addTag('old_sound_rabbit_mq.multi_consumer')
279 22
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
280
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
281
282
            foreach ($consumer['groups'] as $group) {
283
                $this->groups[] = $group;
284
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
285
            }
286
287 22
            if ($consumer['queues_provider']) {
288
                $definition->addMethodCall(
289
                    'setQueuesProvider',
290 22
                    array(new Reference($consumer['queues_provider']))
291
                );
292
            }
293 22
294
            if (array_key_exists('qos_options', $consumer)) {
295
                $definition->addMethodCall('setQosOptions', array(
296
                    $consumer['qos_options']['prefetch_size'],
297
                    $consumer['qos_options']['prefetch_count'],
298
                    $consumer['qos_options']['global']
299
                ));
300
            }
301
302
            if (isset($consumer['idle_timeout'])) {
303 22
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
304
            }
305
            if (isset($consumer['idle_timeout_exit_code'])) {
306
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
307 22
            }
308 22
            if (isset($consumer['timeout_wait'])) {
309 22
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
310
            }
311
            if (isset($consumer['graceful_max_execution'])) {
312 22
                $definition->addMethodCall(
313
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
314
                    array($consumer['graceful_max_execution']['timeout'])
315
                );
316 22
                $definition->addMethodCall(
317 22
                    'setGracefulMaxExecutionTimeoutExitCode',
318 22
                    array($consumer['graceful_max_execution']['exit_code'])
319 22
                );
320
            }
321 22
            if (!$consumer['auto_setup_fabric']) {
322 22
                $definition->addMethodCall('disableAutoSetupFabric');
323
            }
324
325 31
            $this->injectConnection($definition, $consumer['connection']);
326
            if ($this->collectorEnabled) {
327 31
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
328
            }
329 31
330
            if ($consumer['enable_logger']) {
331 22
                $this->injectLogger($definition);
332
            }
333
334
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
335
            $this->container->setDefinition($name, $definition);
336
            if ($consumer['queues_provider']) {
337
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
338 22
            }
339
            foreach ($callbacks as $callback) {
340 22
                $this->addDequeuerAwareCall($callback, $name);
341 22
            }
342 22
        }
343 22
    }
344 22
345 22
    protected function loadDynamicConsumers()
346
    {
347 22
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
348
349
            if (empty($consumer['queue_options_provider'])) {
350
                throw new InvalidConfigurationException(
351
                    "Error on loading $key dynamic consumer. " .
352
                    "'queue_provider' parameter should be defined."
353
                );
354
            }
355 22
356 22
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
357 22
            $definition
358
                ->setPublic(true)
359
                ->addTag('old_sound_rabbit_mq.base_amqp')
360 22
                ->addTag('old_sound_rabbit_mq.consumer')
361
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
362
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
363 22
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
364
365
            foreach ($consumer['groups'] as $group) {
366 22
                $this->groups[] = $group;
367
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
368
            }
369
370
            if (array_key_exists('qos_options', $consumer)) {
371
                $definition->addMethodCall('setQosOptions', array(
372
                    $consumer['qos_options']['prefetch_size'],
373
                    $consumer['qos_options']['prefetch_count'],
374
                    $consumer['qos_options']['global']
375
                ));
376 22
            }
377
378
            $definition->addMethodCall(
379
                'setQueueOptionsProvider',
380 22
                array(new Reference($consumer['queue_options_provider']))
381 22
            );
382 22
383
            if (isset($consumer['idle_timeout'])) {
384
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
385 22
            }
386
            if (isset($consumer['idle_timeout_exit_code'])) {
387
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
388
            }
389 22
            if (isset($consumer['timeout_wait'])) {
390 22
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
391 22
            }
392 22
            if (isset($consumer['graceful_max_execution'])) {
393
                $definition->addMethodCall(
394 31
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
395
                    array($consumer['graceful_max_execution']['timeout'])
396 31
                );
397
                $definition->addMethodCall(
398 31
                    'setGracefulMaxExecutionTimeoutExitCode',
399
                    array($consumer['graceful_max_execution']['exit_code'])
400
                );
401
            }
402
            if (!$consumer['auto_setup_fabric']) {
403
                $definition->addMethodCall('disableAutoSetupFabric');
404
            }
405
406
            $this->injectConnection($definition, $consumer['connection']);
407
            if ($this->collectorEnabled) {
408
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
409
            }
410
411
            if ($consumer['enable_logger']) {
412
                $this->injectLogger($definition);
413
            }
414
415
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
416
            $this->container->setDefinition($name, $definition);
417
            $this->addDequeuerAwareCall($consumer['callback'], $name);
418
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
419
        }
420
    }
421
422
    protected function loadBatchConsumers()
423
    {
424
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
425
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
426
427
            if (!isset($consumer['exchange_options'])) {
428
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
429
            }
430
431
            $definition
432
                ->setPublic(true)
433
                ->addTag('old_sound_rabbit_mq.base_amqp')
434
                ->addTag('old_sound_rabbit_mq.batch_consumer')
435
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
436
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
437
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
438
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
439
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
440
                ->addMethodCall('setQosOptions', array(
441
                    $consumer['qos_options']['prefetch_size'],
442
                    $consumer['qos_options']['prefetch_count'],
443
                    $consumer['qos_options']['global']
444
                ))
445
            ;
446
447
            foreach ($consumer['groups'] as $group) {
448
                $this->groups[] = $group;
449
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
450
            }
451
452
            if (isset($consumer['idle_timeout_exit_code'])) {
453
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
454
            }
455 31
456
            if (isset($consumer['idle_timeout'])) {
457 31
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
458
            }
459 31
460 22
            if (isset($consumer['graceful_max_execution'])) {
461
                $definition->addMethodCall(
462 22
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
463 22
                    array($consumer['graceful_max_execution']['timeout'])
464 22
                );
465 22
            }
466 22
467 22
            if (!$consumer['auto_setup_fabric']) {
468 22
                $definition->addMethodCall('disableAutoSetupFabric');
469 22
            }
470
471
            if ($consumer['keep_alive']) {
472 22
                $definition->addMethodCall('keepAlive');
473 22
            }
474 22
475
            $this->injectConnection($definition, $consumer['connection']);
476 31
            if ($this->collectorEnabled) {
477
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
478
            }
479
480
            if ($consumer['enable_logger']) {
481
                $this->injectLogger($definition);
482
            }
483
484
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
485
        }
486 27
    }
487
488 27
    protected function loadAnonConsumers()
489 1
    {
490
        foreach ($this->config['anon_consumers'] as $key => $anon) {
491 1
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
492
            $definition
493
                ->setPublic(true)
494
                ->addTag('old_sound_rabbit_mq.base_amqp')
495 1
                ->addTag('old_sound_rabbit_mq.anon_consumer')
496 1
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
497 1
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
498
            $this->injectConnection($definition, $anon['connection']);
499
            if ($this->collectorEnabled) {
500 1
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
501
            }
502 1
503
            foreach ($anon['groups'] as $group) {
504 27
                $this->groups[] = $group;
505
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
506
            }
507
508
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
509
            $this->container->setDefinition($name, $definition);
510
            $this->addDequeuerAwareCall($anon['callback'], $name);
511
        }
512
    }
513
    
514
    protected function loadGroupConsumers()
515
    {
516
        $this->groups = array_unique($this->groups);
517
        $defaultGroupConfig = array('connection' => 'default');
518
        $predefinedGroupsConfigs = array_combine(
519
            $this->groups,
520
            array_fill(0, count($this->groups), $defaultGroupConfig)
521
        );
522
523
        foreach (array_merge_recursive($predefinedGroupsConfigs, $this->config['group_consumers']) as $key => $group) {
0 ignored issues
show
Bug introduced by
It seems like $predefinedGroupsConfigs can also be of type false; however, parameter $array1 of array_merge_recursive() does only seem to accept array, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

523
        foreach (array_merge_recursive(/** @scrutinizer ignore-type */ $predefinedGroupsConfigs, $this->config['group_consumers']) as $key => $group) {
Loading history...
524
            $definition = new Definition('%old_sound_rabbit_mq.group_consumer.class%');
525
            $definition
526
                ->setPublic(true)
527
                ->addTag('old_sound_rabbit_mq.base_amqp')
528
                ->addTag('old_sound_rabbit_mq.group_consumer')
529
            ;
530
            $this->injectConnection($definition, $group['connection']);
531 31
            if ($this->collectorEnabled) {
532
                $this->injectLoggedChannel($definition, $key + '_group_consumer', $group['connection']);
533 31
            }
534 25
535 25
            $definition->addMethodCall('addConsumers', [new TaggedIteratorArgument('old_sound_rabbit_mq.consumer.group.' . $key)]);
536
537 25
            if (array_key_exists('qos_options', $group)) {
538 25
                $definition->addMethodCall('setQosOptions', array(
539 25
                    $consumer['qos_options']['prefetch_size'],
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $consumer seems to be never defined.
Loading history...
540 25
                    $consumer['qos_options']['prefetch_count'],
541 25
                    $consumer['qos_options']['global']
542
                ));
543 25
            }
544 25
545
            $name = sprintf('old_sound_rabbit_mq.%s_group', $key);
546 25
            $this->container->setDefinition($name, $definition);
547 25
        }
548
    }
549 25
550
    /**
551 25
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
552
     * parameter. So we revert the change for right configurations.
553 31
     *
554
     * @param array $config
555 31
     *
556
     * @return array
557 31
     */
558 22
    private function normalizeArgumentKeys(array $config)
559
    {
560 22
        if (isset($config['arguments'])) {
561 22
            $arguments = $config['arguments'];
562 22
            // support for old configuration
563 22
            if (is_string($arguments)) {
564 22
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

564
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
565 22
            }
566 22
567 22
            $newArguments = array();
568
            foreach ($arguments as $key => $value) {
569 22
                if (strstr($key, '_')) {
570
                    $key = str_replace('_', '-', $key);
571
                }
572
                $newArguments[$key] = $value;
573
            }
574
            $config['arguments'] = $newArguments;
575
        }
576 22
        return $config;
577 22
    }
578
579 22
    /**
580 22
     * Support for arguments provided as string. Support for old configuration files.
581
     *
582 22
     * @deprecated
583 22
     * @param string $arguments
584
     * @return array
585 22
     */
586
    private function argumentsStringAsArray($arguments)
587 31
    {
588
        $argumentsArray = array();
589 28
590
        $argumentPairs = explode(',', $arguments);
591 28
        foreach ($argumentPairs as $argument) {
592 28
            $argumentPair = explode(':', $argument);
593
            $type = 'S';
594 28
            if (isset($argumentPair[2])) {
595 28
                $type = $argumentPair[2];
596 28
            }
597
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
598 28
        }
599
600 28
        return $argumentsArray;
601 28
    }
602 28
603
    protected function loadRpcClients()
604 30
    {
605
        foreach ($this->config['rpc_clients'] as $key => $client) {
606 30
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
607 30
            $definition->setLazy($client['lazy']);
608
            $definition
609 31
                ->addTag('old_sound_rabbit_mq.rpc_client')
610
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
611 31
            $this->injectConnection($definition, $client['connection']);
612
            if ($this->collectorEnabled) {
613
                $this->injectLoggedChannel($definition, $key, $client['connection']);
614
            }
615
            if (array_key_exists('unserializer', $client)) {
616
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
617
            }
618
            if (array_key_exists('direct_reply_to', $client)) {
619
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
620 26
            }
621
            $definition->setPublic(true);
622 26
623 26
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
624
        }
625
    }
626
627
    protected function loadRpcServers()
628
    {
629
        foreach ($this->config['rpc_servers'] as $key => $server) {
630
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
631
            $definition
632
                ->setPublic(true)
633 1
                ->addTag('old_sound_rabbit_mq.base_amqp')
634
                ->addTag('old_sound_rabbit_mq.rpc_server')
635 1
                ->addMethodCall('initServer', array($key))
636 1
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
637
            $this->injectConnection($definition, $server['connection']);
638 1
            if ($this->collectorEnabled) {
639 1
                $this->injectLoggedChannel($definition, $key, $server['connection']);
640
            }
641
            if (array_key_exists('qos_options', $server)) {
642
                $definition->addMethodCall('setQosOptions', array(
643
                    $server['qos_options']['prefetch_size'],
644
                    $server['qos_options']['prefetch_count'],
645
                    $server['qos_options']['global']
646 1
                ));
647
            }
648
            if (array_key_exists('exchange_options', $server)) {
649 1
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
650
            }
651
            if (array_key_exists('queue_options', $server)) {
652
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
653
            }
654
            if (array_key_exists('serializer', $server)) {
655
                $definition->addMethodCall('setSerializer', array($server['serializer']));
656
            }
657
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
658
        }
659
    }
660
661 27
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
662
    {
663
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
664 27
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
665
        $channel
666
            ->setPublic(false)
667
            ->addTag('old_sound_rabbit_mq.logged_channel');
668
        $this->injectConnection($channel, $connectionName);
669
670
        $this->container->setDefinition($id, $channel);
671
672
        $this->channelIds[] = $id;
673
        $definition->addArgument(new Reference($id));
674
    }
675
676
    protected function injectConnection(Definition $definition, $connectionName)
677
    {
678
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
679
    }
680
681
    public function getAlias()
682
    {
683
        return 'old_sound_rabbit_mq';
684
    }
685
686
    /**
687
     * Add proper dequeuer aware call
688
     *
689
     * @param string $callback
690
     * @param string $name
691
     */
692
    protected function addDequeuerAwareCall($callback, $name)
693
    {
694
        if (!$this->container->has($callback)) {
695
            return;
696
        }
697
698
        $callbackDefinition = $this->container->findDefinition($callback);
699
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
700
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
701
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
702
        }
703
    }
704
705
    private function injectLogger(Definition $definition)
706
    {
707
        $definition->addTag('monolog.logger', array(
708
            'channel' => 'phpamqplib'
709
        ));
710
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
711
    }
712
713
    /**
714
     * Get default AMQP exchange options
715
     *
716
     * @return array
717
     */
718
    protected function getDefaultExchangeOptions()
719
    {
720
        return array(
721
            'name' => '',
722
            'type' => 'direct',
723
            'passive' => true,
724
            'declare' => false
725
        );
726
    }
727
728
    /**
729
     * Get default AMQP queue options
730
     *
731
     * @return array
732
     */
733
    protected function getDefaultQueueOptions()
734
    {
735
        return array(
736
            'name' => '',
737
            'declare' => false
738
        );
739
    }
740
}
741