Passed
Pull Request — master (#38)
by Aleksandr
18:21 queued 08:22
created

OldSoundRabbitMqExtension::loadProducers()   B

Complexity

Conditions 10
Paths 67

Size

Total Lines 41
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 21
CRAP Score 10.7111

Importance

Changes 4
Bugs 0 Features 0
Metric Value
cc 10
eloc 27
c 4
b 0
f 0
nc 67
nop 0
dl 0
loc 41
ccs 21
cts 26
cp 0.8077
crap 10.7111
rs 7.6666

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
6
use Symfony\Component\DependencyInjection\Argument\TaggedIteratorArgument;
7
use Symfony\Component\DependencyInjection\ContainerInterface;
8
use Symfony\Component\DependencyInjection\Extension\Extension;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\Definition;
11
use Symfony\Component\DependencyInjection\Reference;
12
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
13
use Symfony\Component\Config\FileLocator;
14
15
/**
16
 * OldSoundRabbitMqExtension.
17
 *
18
 * @author Alvaro Videla
19
 * @author Marc Weistroff <[email protected]>
20
 */
21
class OldSoundRabbitMqExtension extends Extension
22
{
23
    /**
24
     * @var ContainerBuilder
25
     */
26
    private $container;
27
28
    /**
29
     * @var Boolean Whether the data collector is enabled
30
     */
31
    private $collectorEnabled;
32
33
    private $channelIds = array();
34
    private $groups = array();
35
36 31
    private $config = array();
37
38 31
    public function load(array $configs, ContainerBuilder $container)
39
    {
40 31
        $this->container = $container;
41 31
42
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
43 31
        $loader->load('rabbitmq.xml');
44 31
45
        $configuration = $this->getConfiguration($configs, $container);
46 31
        $this->config = $this->processConfiguration($configuration, $configs);
47
48 31
        $this->collectorEnabled = $this->config['enable_collector'];
49 31
50 31
        $this->loadConnections();
51 31
        $this->loadBindings();
52 31
        $this->loadProducers();
53 31
        $this->loadConsumers();
54 31
        $this->loadMultipleConsumers();
55 31
        $this->loadDynamicConsumers();
56 31
        $this->loadBatchConsumers();
57 31
        $this->loadAnonConsumers();
58
        $this->loadGroupConsumers();
59 31
        $this->loadRpcClients();
60 28
        $this->loadRpcServers();
61 28
62 28
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
63
            $channels = array();
64
            foreach (array_unique($this->channelIds) as $id) {
65 28
                $channels[] = new Reference($id);
66 28
            }
67
68 3
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
69
            $definition->replaceArgument(0, $channels);
70 31
        } else {
71
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
72 31
        }
73
    }
74 31
75
    public function getConfiguration(array $config, ContainerBuilder $container)
76
    {
77 31
        return new Configuration($this->getAlias());
78
    }
79 31
80 31
    protected function loadConnections()
81
    {
82 31
        foreach ($this->config['connections'] as $key => $connection) {
83 22
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
84 31
            $classParam =
85
                $connection['lazy']
86 31
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
87 31
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
88
89 31
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
90
                $classParam, $connection,
91
            ));
92
            if (isset($connection['connection_parameters_provider'])) {
93 31
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
94 31
                unset($connection['connection_parameters_provider']);
95 31
            }
96
            $definition->setPublic(false);
97 31
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
98 31
            $this->container->setDefinition($factoryName, $definition);
99
100 31
            $definition = new Definition($classParam);
101
            if (method_exists($definition, 'setFactory')) {
102
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
103
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
104
            } else {
105
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
106 31
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107 31
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

107
                $definition->/** @scrutinizer ignore-call */ 
108
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
108
            }
109 31
            $definition->addTag('old_sound_rabbit_mq.connection');
110
            $definition->setPublic(true);
111 31
112
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
113 31
        }
114
    }
115 31
116
    protected function loadBindings()
117
    {
118 31
        if ($this->config['sandbox']) {
119 22
            return;
120 22
        }
121 22
        foreach ($this->config['bindings'] as $binding) {
122 22
            ksort($binding);
123 22
            $definition = new Definition($binding['class']);
124 22
            $definition->addTag('old_sound_rabbit_mq.binding');
125 22
            $definition->addMethodCall('setArguments', array($binding['arguments']));
126 22
            $definition->addMethodCall('setDestination', array($binding['destination']));
127 22
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
128 22
            $definition->addMethodCall('setExchange', array($binding['exchange']));
129 22
            $definition->addMethodCall('isNowait', array($binding['nowait']));
130 22
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
131
            $this->injectConnection($definition, $binding['connection']);
132 22
            $key = md5(json_encode($binding));
133
            if ($this->collectorEnabled) {
134 22
                // in the context of a binding, I don't thing logged channels are needed?
135
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
136 31
            }
137
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
138 31
        }
139
    }
140 31
141 31
    protected function loadProducers()
142 27
    {
143 27
        if ($this->config['sandbox'] == false) {
144 27
            foreach ($this->config['producers'] as $key => $producer) {
145 27
                $definition = new Definition($producer['class']);
146
                $definition->setPublic(true);
147 27
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
148 1
                $definition->addTag('old_sound_rabbit_mq.producer');
149
                //this producer doesn't define an exchange -> using AMQP Default
150 27
                if (!isset($producer['exchange_options'])) {
151
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
152 27
                }
153 27
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
154
                //this producer doesn't define a queue -> using AMQP Default
155 27
                if (!isset($producer['queue_options'])) {
156 27
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
157 27
                }
158 25
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
159
                $this->injectConnection($definition, $producer['connection']);
160 27
                if ($this->collectorEnabled) {
161
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
162
                }
163
                if (!$producer['auto_setup_fabric']) {
164 27
                    $definition->addMethodCall('disableAutoSetupFabric');
165
                }
166
167
                if ($producer['enable_logger']) {
168 27
                    $this->injectLogger($definition);
169
                }
170 27
171 27
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
172 22
173
                $this->container->setDefinition($producerServiceName, $definition);
174
                if (null !== $producer['service_alias']) {
175
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
176
                }
177
            }
178
        } else {
179
            foreach ($this->config['producers'] as $key => $producer) {
180
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
181 31
                $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer', $key), $definition);
182
            }
183 31
        }
184
    }
185 31
186 26
    protected function loadConsumers()
187 26
    {
188 26
        foreach ($this->config['consumers'] as $key => $consumer) {
189 26
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
190
            $definition->setPublic(true);
191 26
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
192
            $definition->addTag('old_sound_rabbit_mq.consumer');
193
            foreach ($consumer['groups'] as $group) {
194 26
                $this->groups[] = $group;
195
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
196 26
            }
197
            //this consumer doesn't define an exchange -> using AMQP Default
198
            if (!isset($consumer['exchange_options'])) {
199 26
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
200 26
            }
201
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
202 26
            //this consumer doesn't define a queue -> using AMQP Default
203 22
            if (!isset($consumer['queue_options'])) {
204 22
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
205 22
            }
206 22
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
207
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
208
209
            if (array_key_exists('qos_options', $consumer)) {
210 26
                $definition->addMethodCall('setQosOptions', array(
211
                    $consumer['qos_options']['prefetch_size'],
212
                    $consumer['qos_options']['prefetch_count'],
213 26
                    $consumer['qos_options']['global']
214
                ));
215
            }
216 26
217
            if (isset($consumer['idle_timeout'])) {
218
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
219
            }
220
            if (isset($consumer['idle_timeout_exit_code'])) {
221
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
222
            }
223
            if (isset($consumer['timeout_wait'])) {
224
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
225
            }
226 26
            if (isset($consumer['graceful_max_execution'])) {
227
                $definition->addMethodCall(
228
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
229
                    array($consumer['graceful_max_execution']['timeout'])
230 26
                );
231 26
                $definition->addMethodCall(
232 25
                    'setGracefulMaxExecutionTimeoutExitCode',
233
                    array($consumer['graceful_max_execution']['exit_code'])
234
                );
235 26
            }
236 1
            if (!$consumer['auto_setup_fabric']) {
237
                $definition->addMethodCall('disableAutoSetupFabric');
238
            }
239 26
240 26
            $this->injectConnection($definition, $consumer['connection']);
241 26
            if ($this->collectorEnabled) {
242
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
243 31
            }
244
245 31
            if ($consumer['enable_logger']) {
246
                $this->injectLogger($definition);
247 31
            }
248 22
249 22
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
250
            $this->container->setDefinition($name, $definition);
251 22
            $this->addDequeuerAwareCall($consumer['callback'], $name);
252
        }
253
    }
254
255
    protected function loadMultipleConsumers()
256
    {
257
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
258 22
            $queues = array();
259 22
            $callbacks = array();
260 22
261 22
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
262
                throw new InvalidConfigurationException(
263
                    "Error on loading $key multiple consumer. " .
264 22
                    "Either 'queues' or 'queues_provider' parameters should be defined."
265
                );
266 22
            }
267 22
268 22
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
269 22
                $queues[$queueOptions['name']] = $queueOptions;
270 22
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
271
                $callbacks[] = $queueOptions['callback'];
272 22
            }
273 22
274 22
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
275 22
            $definition
276
                ->setPublic(true)
277
                ->addTag('old_sound_rabbit_mq.base_amqp')
278
                ->addTag('old_sound_rabbit_mq.multi_consumer')
279 22
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
280
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
281
282
            foreach ($consumer['groups'] as $group) {
283
                $this->groups[] = $group;
284
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
285
            }
286
287 22
            if ($consumer['queues_provider']) {
288
                $definition->addMethodCall(
289
                    'setQueuesProvider',
290 22
                    array(new Reference($consumer['queues_provider']))
291
                );
292
            }
293 22
294
            if (array_key_exists('qos_options', $consumer)) {
295
                $definition->addMethodCall('setQosOptions', array(
296
                    $consumer['qos_options']['prefetch_size'],
297
                    $consumer['qos_options']['prefetch_count'],
298
                    $consumer['qos_options']['global']
299
                ));
300
            }
301
302
            if (isset($consumer['idle_timeout'])) {
303 22
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
304
            }
305
            if (isset($consumer['idle_timeout_exit_code'])) {
306
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
307 22
            }
308 22
            if (isset($consumer['timeout_wait'])) {
309 22
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
310
            }
311
            if (isset($consumer['graceful_max_execution'])) {
312 22
                $definition->addMethodCall(
313
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
314
                    array($consumer['graceful_max_execution']['timeout'])
315
                );
316 22
                $definition->addMethodCall(
317 22
                    'setGracefulMaxExecutionTimeoutExitCode',
318 22
                    array($consumer['graceful_max_execution']['exit_code'])
319 22
                );
320
            }
321 22
            if (!$consumer['auto_setup_fabric']) {
322 22
                $definition->addMethodCall('disableAutoSetupFabric');
323
            }
324
325 31
            $this->injectConnection($definition, $consumer['connection']);
326
            if ($this->collectorEnabled) {
327 31
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
328
            }
329 31
330
            if ($consumer['enable_logger']) {
331 22
                $this->injectLogger($definition);
332
            }
333
334
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
335
            $this->container->setDefinition($name, $definition);
336
            if ($consumer['queues_provider']) {
337
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
338 22
            }
339
            foreach ($callbacks as $callback) {
340 22
                $this->addDequeuerAwareCall($callback, $name);
341 22
            }
342 22
        }
343 22
    }
344 22
345 22
    protected function loadDynamicConsumers()
346
    {
347 22
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
348
349
            if (empty($consumer['queue_options_provider'])) {
350
                throw new InvalidConfigurationException(
351
                    "Error on loading $key dynamic consumer. " .
352
                    "'queue_provider' parameter should be defined."
353
                );
354
            }
355 22
356 22
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
357 22
            $definition
358
                ->setPublic(true)
359
                ->addTag('old_sound_rabbit_mq.base_amqp')
360 22
                ->addTag('old_sound_rabbit_mq.consumer')
361
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
362
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
363 22
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
364
365
            foreach ($consumer['groups'] as $group) {
366 22
                $this->groups[] = $group;
367
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
368
            }
369
370
            if (array_key_exists('qos_options', $consumer)) {
371
                $definition->addMethodCall('setQosOptions', array(
372
                    $consumer['qos_options']['prefetch_size'],
373
                    $consumer['qos_options']['prefetch_count'],
374
                    $consumer['qos_options']['global']
375
                ));
376 22
            }
377
378
            $definition->addMethodCall(
379
                'setQueueOptionsProvider',
380 22
                array(new Reference($consumer['queue_options_provider']))
381 22
            );
382 22
383
            if (isset($consumer['idle_timeout'])) {
384
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
385 22
            }
386
            if (isset($consumer['idle_timeout_exit_code'])) {
387
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
388
            }
389 22
            if (isset($consumer['timeout_wait'])) {
390 22
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
391 22
            }
392 22
            if (isset($consumer['graceful_max_execution'])) {
393
                $definition->addMethodCall(
394 31
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
395
                    array($consumer['graceful_max_execution']['timeout'])
396 31
                );
397
                $definition->addMethodCall(
398 31
                    'setGracefulMaxExecutionTimeoutExitCode',
399
                    array($consumer['graceful_max_execution']['exit_code'])
400
                );
401
            }
402
            if (!$consumer['auto_setup_fabric']) {
403
                $definition->addMethodCall('disableAutoSetupFabric');
404
            }
405
406
            $this->injectConnection($definition, $consumer['connection']);
407
            if ($this->collectorEnabled) {
408
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
409
            }
410
411
            if ($consumer['enable_logger']) {
412
                $this->injectLogger($definition);
413
            }
414
415
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
416
            $this->container->setDefinition($name, $definition);
417
            $this->addDequeuerAwareCall($consumer['callback'], $name);
418
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
419
        }
420
    }
421
422
    protected function loadBatchConsumers()
423
    {
424
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
425
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
426
427
            if (!isset($consumer['exchange_options'])) {
428
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
429
            }
430
431
            $definition
432
                ->setPublic(true)
433
                ->addTag('old_sound_rabbit_mq.base_amqp')
434
                ->addTag('old_sound_rabbit_mq.batch_consumer')
435
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
436
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
437
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
438
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
439
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
440
                ->addMethodCall('setQosOptions', array(
441
                    $consumer['qos_options']['prefetch_size'],
442
                    $consumer['qos_options']['prefetch_count'],
443
                    $consumer['qos_options']['global']
444
                ))
445
            ;
446
447
            foreach ($consumer['groups'] as $group) {
448
                $this->groups[] = $group;
449
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
450
            }
451
452
            if (isset($consumer['idle_timeout_exit_code'])) {
453
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
454
            }
455 31
456
            if (isset($consumer['idle_timeout'])) {
457 31
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
458
            }
459 31
460 22
            if (isset($consumer['graceful_max_execution'])) {
461
                $definition->addMethodCall(
462 22
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
463 22
                    array($consumer['graceful_max_execution']['timeout'])
464 22
                );
465 22
            }
466 22
467 22
            if (!$consumer['auto_setup_fabric']) {
468 22
                $definition->addMethodCall('disableAutoSetupFabric');
469 22
            }
470
471
            if ($consumer['keep_alive']) {
472 22
                $definition->addMethodCall('keepAlive');
473 22
            }
474 22
475
            $this->injectConnection($definition, $consumer['connection']);
476 31
            if ($this->collectorEnabled) {
477
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
478
            }
479
480
            if ($consumer['enable_logger']) {
481
                $this->injectLogger($definition);
482
            }
483
484
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
485
        }
486 27
    }
487
488 27
    protected function loadAnonConsumers()
489 1
    {
490
        foreach ($this->config['anon_consumers'] as $key => $anon) {
491 1
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
492
            $definition
493
                ->setPublic(true)
494
                ->addTag('old_sound_rabbit_mq.base_amqp')
495 1
                ->addTag('old_sound_rabbit_mq.anon_consumer')
496 1
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
497 1
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
498
            $this->injectConnection($definition, $anon['connection']);
499
            if ($this->collectorEnabled) {
500 1
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
501
            }
502 1
503
            foreach ($anon['groups'] as $group) {
504 27
                $this->groups[] = $group;
505
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
506
            }
507
508
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
509
            $this->container->setDefinition($name, $definition);
510
            $this->addDequeuerAwareCall($anon['callback'], $name);
511
        }
512
    }
513
    
514
    protected function loadGroupConsumers()
515
    {
516
        $this->groups = array_unique($this->groups);
517
        $defaultGroupConfig = array('connection' => 'default');
518
        $predefinedGroupsConfigs = array_combine(
519
            $this->groups,
520
            array_fill(0, count($this->groups), $defaultGroupConfig)
521
        );
522
523
        foreach (array_merge_recursive($predefinedGroupsConfigs, $this->config['group_consumers']) as $key => $group) {
0 ignored issues
show
Bug introduced by
It seems like $predefinedGroupsConfigs can also be of type false; however, parameter $array1 of array_merge_recursive() does only seem to accept array, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

523
        foreach (array_merge_recursive(/** @scrutinizer ignore-type */ $predefinedGroupsConfigs, $this->config['group_consumers']) as $key => $group) {
Loading history...
524
            $definition = new Definition('%old_sound_rabbit_mq.group_consumer.class%');
525
            $definition
526
                ->setPublic(true)
527
                ->addTag('old_sound_rabbit_mq.base_amqp')
528
                ->addTag('old_sound_rabbit_mq.group_consumer')
529
            ;
530
            $this->injectConnection($definition, $group['connection']);
531 31
            if ($this->collectorEnabled) {
532
                $this->injectLoggedChannel($definition, $key . '_group_consumer', $group['connection']);
533 31
            }
534 25
535 25
            $definition->addMethodCall('addConsumers', array(
536
                new TaggedIteratorArgument('old_sound_rabbit_mq.consumer.group.' . $key))
537 25
            );
538 25
539 25
            if (array_key_exists('qos_options', $group)) {
540 25
                $definition->addMethodCall('setQosOptions', array(
541 25
                    $consumer['qos_options']['prefetch_size'],
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $consumer seems to be never defined.
Loading history...
542
                    $consumer['qos_options']['prefetch_count'],
543 25
                    $consumer['qos_options']['global']
544 25
                ));
545
            }
546 25
547 25
            $name = sprintf('old_sound_rabbit_mq.%s_group', $key);
548
            $this->container->setDefinition($name, $definition);
549 25
        }
550
    }
551 25
552
    /**
553 31
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
554
     * parameter. So we revert the change for right configurations.
555 31
     *
556
     * @param array $config
557 31
     *
558 22
     * @return array
559
     */
560 22
    private function normalizeArgumentKeys(array $config)
561 22
    {
562 22
        if (isset($config['arguments'])) {
563 22
            $arguments = $config['arguments'];
564 22
            // support for old configuration
565 22
            if (is_string($arguments)) {
566 22
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

566
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
567 22
            }
568
569 22
            $newArguments = array();
570
            foreach ($arguments as $key => $value) {
571
                if (strstr($key, '_')) {
572
                    $key = str_replace('_', '-', $key);
573
                }
574
                $newArguments[$key] = $value;
575
            }
576 22
            $config['arguments'] = $newArguments;
577 22
        }
578
        return $config;
579 22
    }
580 22
581
    /**
582 22
     * Support for arguments provided as string. Support for old configuration files.
583 22
     *
584
     * @deprecated
585 22
     * @param string $arguments
586
     * @return array
587 31
     */
588
    private function argumentsStringAsArray($arguments)
589 28
    {
590
        $argumentsArray = array();
591 28
592 28
        $argumentPairs = explode(',', $arguments);
593
        foreach ($argumentPairs as $argument) {
594 28
            $argumentPair = explode(':', $argument);
595 28
            $type = 'S';
596 28
            if (isset($argumentPair[2])) {
597
                $type = $argumentPair[2];
598 28
            }
599
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
600 28
        }
601 28
602 28
        return $argumentsArray;
603
    }
604 30
605
    protected function loadRpcClients()
606 30
    {
607 30
        foreach ($this->config['rpc_clients'] as $key => $client) {
608
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
609 31
            $definition->setLazy($client['lazy']);
610
            $definition
611 31
                ->addTag('old_sound_rabbit_mq.rpc_client')
612
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
613
            $this->injectConnection($definition, $client['connection']);
614
            if ($this->collectorEnabled) {
615
                $this->injectLoggedChannel($definition, $key, $client['connection']);
616
            }
617
            if (array_key_exists('unserializer', $client)) {
618
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
619
            }
620 26
            if (array_key_exists('direct_reply_to', $client)) {
621
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
622 26
            }
623 26
            $definition->setPublic(true);
624
625
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
626
        }
627
    }
628
629
    protected function loadRpcServers()
630
    {
631
        foreach ($this->config['rpc_servers'] as $key => $server) {
632
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
633 1
            $definition
634
                ->setPublic(true)
635 1
                ->addTag('old_sound_rabbit_mq.base_amqp')
636 1
                ->addTag('old_sound_rabbit_mq.rpc_server')
637
                ->addMethodCall('initServer', array($key))
638 1
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
639 1
            $this->injectConnection($definition, $server['connection']);
640
            if ($this->collectorEnabled) {
641
                $this->injectLoggedChannel($definition, $key, $server['connection']);
642
            }
643
            if (array_key_exists('qos_options', $server)) {
644
                $definition->addMethodCall('setQosOptions', array(
645
                    $server['qos_options']['prefetch_size'],
646 1
                    $server['qos_options']['prefetch_count'],
647
                    $server['qos_options']['global']
648
                ));
649 1
            }
650
            if (array_key_exists('exchange_options', $server)) {
651
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
652
            }
653
            if (array_key_exists('queue_options', $server)) {
654
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
655
            }
656
            if (array_key_exists('serializer', $server)) {
657
                $definition->addMethodCall('setSerializer', array($server['serializer']));
658
            }
659
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
660
        }
661 27
    }
662
663
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
664 27
    {
665
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
666
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
667
        $channel
668
            ->setPublic(false)
669
            ->addTag('old_sound_rabbit_mq.logged_channel');
670
        $this->injectConnection($channel, $connectionName);
671
672
        $this->container->setDefinition($id, $channel);
673
674
        $this->channelIds[] = $id;
675
        $definition->addArgument(new Reference($id));
676
    }
677
678
    protected function injectConnection(Definition $definition, $connectionName)
679
    {
680
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
681
    }
682
683
    public function getAlias()
684
    {
685
        return 'old_sound_rabbit_mq';
686
    }
687
688
    /**
689
     * Add proper dequeuer aware call
690
     *
691
     * @param string $callback
692
     * @param string $name
693
     */
694
    protected function addDequeuerAwareCall($callback, $name)
695
    {
696
        if (!$this->container->has($callback)) {
697
            return;
698
        }
699
700
        $callbackDefinition = $this->container->findDefinition($callback);
701
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
702
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
703
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
704
        }
705
    }
706
707
    private function injectLogger(Definition $definition)
708
    {
709
        $definition->addTag('monolog.logger', array(
710
            'channel' => 'phpamqplib'
711
        ));
712
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
713
    }
714
715
    /**
716
     * Get default AMQP exchange options
717
     *
718
     * @return array
719
     */
720
    protected function getDefaultExchangeOptions()
721
    {
722
        return array(
723
            'name' => '',
724
            'type' => 'direct',
725
            'passive' => true,
726
            'declare' => false
727
        );
728
    }
729
730
    /**
731
     * Get default AMQP queue options
732
     *
733
     * @return array
734
     */
735
    protected function getDefaultQueueOptions()
736
    {
737
        return array(
738
            'name' => '',
739
            'declare' => false
740
        );
741
    }
742
}
743