Test Failed
Pull Request — master (#38)
by Aleksandr
15:07 queued 05:09
created

OldSoundRabbitMqExtension::loadBatchConsumers()   F

Complexity

Conditions 11
Paths 513

Size

Total Lines 62
Code Lines 37

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 51.1484

Importance

Changes 7
Bugs 1 Features 1
Metric Value
cc 11
eloc 37
c 7
b 1
f 1
nc 513
nop 0
dl 0
loc 62
ccs 12
cts 39
cp 0.3076
crap 51.1484
rs 3.8263

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
6
use Symfony\Component\DependencyInjection\Argument\TaggedIteratorArgument;
7
use Symfony\Component\DependencyInjection\ContainerInterface;
8
use Symfony\Component\DependencyInjection\Extension\Extension;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\Definition;
11
use Symfony\Component\DependencyInjection\Reference;
12
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
13
use Symfony\Component\Config\FileLocator;
14
15
/**
16
 * OldSoundRabbitMqExtension.
17
 *
18
 * @author Alvaro Videla
19
 * @author Marc Weistroff <[email protected]>
20
 */
21
class OldSoundRabbitMqExtension extends Extension
22
{
23
    /**
24
     * @var ContainerBuilder
25
     */
26
    private $container;
27
28
    /**
29
     * @var Boolean Whether the data collector is enabled
30
     */
31
    private $collectorEnabled;
32
33
    private $channelIds = array();
34
35
    private $config = array();
36 31
37
    public function load(array $configs, ContainerBuilder $container)
38 31
    {
39
        $this->container = $container;
40 31
41 31
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
42
        $loader->load('rabbitmq.xml');
43 31
44 31
        $configuration = $this->getConfiguration($configs, $container);
45
        $this->config = $this->processConfiguration($configuration, $configs);
46 31
47
        $this->collectorEnabled = $this->config['enable_collector'];
48 31
49 31
        $this->loadConnections();
50 31
        $this->loadBindings();
51 31
        $this->loadProducers();
52 31
        $this->loadConsumers();
53 31
        $this->loadMultipleConsumers();
54 31
        $this->loadDynamicConsumers();
55 31
        $this->loadBatchConsumers();
56 31
        $this->loadAnonConsumers();
57 31
        $this->loadGroupConsumers();
58
        $this->loadRpcClients();
59 31
        $this->loadRpcServers();
60 28
61 28
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
62 28
            $channels = array();
63
            foreach (array_unique($this->channelIds) as $id) {
64
                $channels[] = new Reference($id);
65 28
            }
66 28
67
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
68 3
            $definition->replaceArgument(0, $channels);
69
        } else {
70 31
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
71
        }
72 31
    }
73
74 31
    public function getConfiguration(array $config, ContainerBuilder $container)
75
    {
76
        return new Configuration($this->getAlias());
77 31
    }
78
79 31
    protected function loadConnections()
80 31
    {
81
        foreach ($this->config['connections'] as $key => $connection) {
82 31
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
83 22
            $classParam =
84 31
                $connection['lazy']
85
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
86 31
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
87 31
88
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
89 31
                $classParam, $connection,
90
            ));
91
            if (isset($connection['connection_parameters_provider'])) {
92
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
93 31
                unset($connection['connection_parameters_provider']);
94 31
            }
95 31
            $definition->setPublic(false);
96
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
97 31
            $this->container->setDefinition($factoryName, $definition);
98 31
99
            $definition = new Definition($classParam);
100 31
            if (method_exists($definition, 'setFactory')) {
101
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
102
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
103
            } else {
104
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
105
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

105
                $definition->/** @scrutinizer ignore-call */ 
106
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
106 31
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107 31
            }
108
            $definition->addTag('old_sound_rabbit_mq.connection');
109 31
            $definition->setPublic(true);
110
111 31
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
112
        }
113 31
    }
114
115 31
    protected function loadBindings()
116
    {
117
        if ($this->config['sandbox']) {
118 31
            return;
119 22
        }
120 22
        foreach ($this->config['bindings'] as $binding) {
121 22
            ksort($binding);
122 22
            $definition = new Definition($binding['class']);
123 22
            $definition->addTag('old_sound_rabbit_mq.binding');
124 22
            $definition->addMethodCall('setArguments', array($binding['arguments']));
125 22
            $definition->addMethodCall('setDestination', array($binding['destination']));
126 22
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
127 22
            $definition->addMethodCall('setExchange', array($binding['exchange']));
128 22
            $definition->addMethodCall('isNowait', array($binding['nowait']));
129 22
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
130 22
            $this->injectConnection($definition, $binding['connection']);
131
            $key = md5(json_encode($binding));
132 22
            if ($this->collectorEnabled) {
133
                // in the context of a binding, I don't thing logged channels are needed?
134 22
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
135
            }
136 31
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
137
        }
138 31
    }
139
140 31
    protected function loadProducers()
141 31
    {
142 27
        if ($this->config['sandbox'] == false) {
143 27
            foreach ($this->config['producers'] as $key => $producer) {
144 27
                $definition = new Definition($producer['class']);
145 27
                $definition->setPublic(true);
146
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
147 27
                $definition->addTag('old_sound_rabbit_mq.producer');
148 1
                //this producer doesn't define an exchange -> using AMQP Default
149
                if (!isset($producer['exchange_options'])) {
150 27
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
151
                }
152 27
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
153 27
                //this producer doesn't define a queue -> using AMQP Default
154
                if (!isset($producer['queue_options'])) {
155 27
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
156 27
                }
157 27
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
158 25
                $this->injectConnection($definition, $producer['connection']);
159
                if ($this->collectorEnabled) {
160 27
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
161
                }
162
                if (!$producer['auto_setup_fabric']) {
163
                    $definition->addMethodCall('disableAutoSetupFabric');
164 27
                }
165
166
                if ($producer['enable_logger']) {
167
                    $this->injectLogger($definition);
168 27
                }
169
170 27
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
171 27
172 22
                $this->container->setDefinition($producerServiceName, $definition);
173
                if (null !== $producer['service_alias']) {
174
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
175
                }
176
            }
177
        } else {
178
            foreach ($this->config['producers'] as $key => $producer) {
179
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
180
                $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer', $key), $definition);
181 31
            }
182
        }
183 31
    }
184
185 31
    protected function loadConsumers()
186 26
    {
187 26
        foreach ($this->config['consumers'] as $key => $consumer) {
188 26
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
189 26
            $definition->setPublic(true);
190
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
191 26
            $definition->addTag('old_sound_rabbit_mq.consumer');
192
            foreach ($consumer['groups'] as $group) {
193
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
194 26
            }
195
            //this consumer doesn't define an exchange -> using AMQP Default
196 26
            if (!isset($consumer['exchange_options'])) {
197
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
198
            }
199 26
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
200 26
            //this consumer doesn't define a queue -> using AMQP Default
201
            if (!isset($consumer['queue_options'])) {
202 26
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
203 22
            }
204 22
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
205 22
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
206 22
207
            if (array_key_exists('qos_options', $consumer)) {
208
                $definition->addMethodCall('setQosOptions', array(
209
                    $consumer['qos_options']['prefetch_size'],
210 26
                    $consumer['qos_options']['prefetch_count'],
211
                    $consumer['qos_options']['global']
212
                ));
213 26
            }
214
215
            if (isset($consumer['idle_timeout'])) {
216 26
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
217
            }
218
            if (isset($consumer['idle_timeout_exit_code'])) {
219
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
220
            }
221
            if (isset($consumer['graceful_max_execution'])) {
222
                $definition->addMethodCall(
223
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
224
                    array($consumer['graceful_max_execution']['timeout'])
225
                );
226 26
                $definition->addMethodCall(
227
                    'setGracefulMaxExecutionTimeoutExitCode',
228
                    array($consumer['graceful_max_execution']['exit_code'])
229
                );
230 26
            }
231 26
            if (!$consumer['auto_setup_fabric']) {
232 25
                $definition->addMethodCall('disableAutoSetupFabric');
233
            }
234
235 26
            $this->injectConnection($definition, $consumer['connection']);
236 1
            if ($this->collectorEnabled) {
237
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
238
            }
239 26
240 26
            if ($consumer['enable_logger']) {
241 26
                $this->injectLogger($definition);
242
            }
243 31
244
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
245 31
            $this->container->setDefinition($name, $definition);
246
            $this->addDequeuerAwareCall($consumer['callback'], $name);
247 31
        }
248 22
    }
249 22
250
    protected function loadMultipleConsumers()
251 22
    {
252
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
253
            $queues = array();
254
            $callbacks = array();
255
256
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
257
                throw new InvalidConfigurationException(
258 22
                    "Error on loading $key multiple consumer. " .
259 22
                    "Either 'queues' or 'queues_provider' parameters should be defined."
260 22
                );
261 22
            }
262
263
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
264 22
                $queues[$queueOptions['name']] = $queueOptions;
265
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
266 22
                $callbacks[] = $queueOptions['callback'];
267 22
            }
268 22
269 22
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
270 22
            $definition
271
                ->setPublic(true)
272 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
273 22
                ->addTag('old_sound_rabbit_mq.multi_consumer')
274 22
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
275 22
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
276
277
            foreach ($consumer['groups'] as $group) {
278
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
279 22
            }
280
281
            if ($consumer['queues_provider']) {
282
                $definition->addMethodCall(
283
                    'setQueuesProvider',
284
                    array(new Reference($consumer['queues_provider']))
285
                );
286
            }
287 22
288
            if (array_key_exists('qos_options', $consumer)) {
289
                $definition->addMethodCall('setQosOptions', array(
290 22
                    $consumer['qos_options']['prefetch_size'],
291
                    $consumer['qos_options']['prefetch_count'],
292
                    $consumer['qos_options']['global']
293 22
                ));
294
            }
295
296
            if (isset($consumer['idle_timeout'])) {
297
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
298
            }
299
            if (isset($consumer['idle_timeout_exit_code'])) {
300
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
301
            }
302
            if (isset($consumer['graceful_max_execution'])) {
303 22
                $definition->addMethodCall(
304
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
305
                    array($consumer['graceful_max_execution']['timeout'])
306
                );
307 22
                $definition->addMethodCall(
308 22
                    'setGracefulMaxExecutionTimeoutExitCode',
309 22
                    array($consumer['graceful_max_execution']['exit_code'])
310
                );
311
            }
312 22
            if (!$consumer['auto_setup_fabric']) {
313
                $definition->addMethodCall('disableAutoSetupFabric');
314
            }
315
316 22
            $this->injectConnection($definition, $consumer['connection']);
317 22
            if ($this->collectorEnabled) {
318 22
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
319 22
            }
320
321 22
            if ($consumer['enable_logger']) {
322 22
                $this->injectLogger($definition);
323
            }
324
325 31
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
326
            $this->container->setDefinition($name, $definition);
327 31
            if ($consumer['queues_provider']) {
328
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
329 31
            }
330
            foreach ($callbacks as $callback) {
331 22
                $this->addDequeuerAwareCall($callback, $name);
332
            }
333
        }
334
    }
335
336
    protected function loadDynamicConsumers()
337
    {
338 22
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
339
340 22
            if (empty($consumer['queue_options_provider'])) {
341 22
                throw new InvalidConfigurationException(
342 22
                    "Error on loading $key dynamic consumer. " .
343 22
                    "'queue_provider' parameter should be defined."
344 22
                );
345 22
            }
346
347 22
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
348
            $definition
349
                ->setPublic(true)
350
                ->addTag('old_sound_rabbit_mq.base_amqp')
351
                ->addTag('old_sound_rabbit_mq.consumer')
352
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
353
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
354
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
355 22
356 22
            foreach ($consumer['groups'] as $group) {
357 22
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
358
            }
359
360 22
            if (array_key_exists('qos_options', $consumer)) {
361
                $definition->addMethodCall('setQosOptions', array(
362
                    $consumer['qos_options']['prefetch_size'],
363 22
                    $consumer['qos_options']['prefetch_count'],
364
                    $consumer['qos_options']['global']
365
                ));
366 22
            }
367
368
            $definition->addMethodCall(
369
                'setQueueOptionsProvider',
370
                array(new Reference($consumer['queue_options_provider']))
371
            );
372
373
            if (isset($consumer['idle_timeout'])) {
374
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
375
            }
376 22
            if (isset($consumer['idle_timeout_exit_code'])) {
377
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
378
            }
379
            if (isset($consumer['graceful_max_execution'])) {
380 22
                $definition->addMethodCall(
381 22
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
382 22
                    array($consumer['graceful_max_execution']['timeout'])
383
                );
384
                $definition->addMethodCall(
385 22
                    'setGracefulMaxExecutionTimeoutExitCode',
386
                    array($consumer['graceful_max_execution']['exit_code'])
387
                );
388
            }
389 22
            if (!$consumer['auto_setup_fabric']) {
390 22
                $definition->addMethodCall('disableAutoSetupFabric');
391 22
            }
392 22
393
            $this->injectConnection($definition, $consumer['connection']);
394 31
            if ($this->collectorEnabled) {
395
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
396 31
            }
397
398 31
            if ($consumer['enable_logger']) {
399
                $this->injectLogger($definition);
400
            }
401
402
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
403
            $this->container->setDefinition($name, $definition);
404
            $this->addDequeuerAwareCall($consumer['callback'], $name);
405
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
406
        }
407
    }
408
409
    protected function loadBatchConsumers()
410
    {
411
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
412
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
413
414
            if (!isset($consumer['exchange_options'])) {
415
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
416
            }
417
418
            $definition
419
                ->setPublic(true)
420
                ->addTag('old_sound_rabbit_mq.base_amqp')
421
                ->addTag('old_sound_rabbit_mq.batch_consumer')
422
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
423
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
424
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
425
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
426
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
427
                ->addMethodCall('setQosOptions', array(
428
                    $consumer['qos_options']['prefetch_size'],
429
                    $consumer['qos_options']['prefetch_count'],
430
                    $consumer['qos_options']['global']
431
                ))
432
            ;
433
434
            foreach ($consumer['groups'] as $group) {
435
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
436
            }
437
438
            if (isset($consumer['idle_timeout_exit_code'])) {
439
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
440
            }
441
442
            if (isset($consumer['idle_timeout'])) {
443
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
444
            }
445
446
            if (isset($consumer['graceful_max_execution'])) {
447
                $definition->addMethodCall(
448
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
449
                    array($consumer['graceful_max_execution']['timeout'])
450
                );
451
            }
452
453
            if (!$consumer['auto_setup_fabric']) {
454
                $definition->addMethodCall('disableAutoSetupFabric');
455 31
            }
456
457 31
            if ($consumer['keep_alive']) {
458
                $definition->addMethodCall('keepAlive');
459 31
            }
460 22
461
            $this->injectConnection($definition, $consumer['connection']);
462 22
            if ($this->collectorEnabled) {
463 22
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
464 22
            }
465 22
466 22
            if ($consumer['enable_logger']) {
467 22
                $this->injectLogger($definition);
468 22
            }
469 22
470
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
471
        }
472 22
    }
473 22
474 22
    protected function loadAnonConsumers()
475
    {
476 31
        foreach ($this->config['anon_consumers'] as $key => $anon) {
477
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');     
478
            $definition
479
                ->setPublic(true)
480
                ->addTag('old_sound_rabbit_mq.base_amqp')
481
                ->addTag('old_sound_rabbit_mq.anon_consumer')
482
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
483
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
484
            $this->injectConnection($definition, $anon['connection']);
485
            if ($this->collectorEnabled) {
486 27
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
487
            }
488 27
489 1
            foreach ($consumer['groups'] as $group) {
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $consumer seems to be never defined.
Loading history...
490
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
491 1
            }
492
493
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
494
            $this->container->setDefinition($name, $definition);
495 1
            $this->addDequeuerAwareCall($anon['callback'], $name);
496 1
        }
497 1
    }
498
    
499
    protected function loadGroupConsumers()
500 1
    {
501
        foreach (array_merge_recursive(['default' => ['connection' => 'default']], $this->config['group_consumers']) as $key => $group) {
502 1
            $definition = new Definition('%old_sound_rabbit_mq.group_consumer.class%');
503
            $definition
504 27
                ->setPublic(true)
505
                ->addTag('old_sound_rabbit_mq.base_amqp')
506
                ->addTag('old_sound_rabbit_mq.group_consumer')
507
            ;
508
            $this->injectConnection($definition, $group['connection']);
509
            if ($this->collectorEnabled) {
510
                $this->injectLoggedChannel($definition, $key, $group['connection']);
511
            }
512
513
            $definition->addMethodCall('addConsumers', [new TaggedIteratorArgument('old_sound_rabbit_mq.consumer.group.' . $key)]);
514
515
            if (array_key_exists('qos_options', $group)) {
516
                $definition->addMethodCall('setQosOptions', array(
517
                    $consumer['qos_options']['prefetch_size'],
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $consumer seems to be never defined.
Loading history...
518
                    $consumer['qos_options']['prefetch_count'],
519
                    $consumer['qos_options']['global']
520
                ));
521
            }
522
523
            $name = sprintf('old_sound_rabbit_mq.%s_group', $key);
524
            $this->container->setDefinition($name, $definition);
525
        }
526
    }
527
528
    /**
529
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
530
     * parameter. So we revert the change for right configurations.
531 31
     *
532
     * @param array $config
533 31
     *
534 25
     * @return array
535 25
     */
536
    private function normalizeArgumentKeys(array $config)
537 25
    {
538 25
        if (isset($config['arguments'])) {
539 25
            $arguments = $config['arguments'];
540 25
            // support for old configuration
541 25
            if (is_string($arguments)) {
542
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

542
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
543 25
            }
544 25
545
            $newArguments = array();
546 25
            foreach ($arguments as $key => $value) {
547 25
                if (strstr($key, '_')) {
548
                    $key = str_replace('_', '-', $key);
549 25
                }
550
                $newArguments[$key] = $value;
551 25
            }
552
            $config['arguments'] = $newArguments;
553 31
        }
554
        return $config;
555 31
    }
556
557 31
    /**
558 22
     * Support for arguments provided as string. Support for old configuration files.
559
     *
560 22
     * @deprecated
561 22
     * @param string $arguments
562 22
     * @return array
563 22
     */
564 22
    private function argumentsStringAsArray($arguments)
565 22
    {
566 22
        $argumentsArray = array();
567 22
568
        $argumentPairs = explode(',', $arguments);
569 22
        foreach ($argumentPairs as $argument) {
570
            $argumentPair = explode(':', $argument);
571
            $type = 'S';
572
            if (isset($argumentPair[2])) {
573
                $type = $argumentPair[2];
574
            }
575
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
576 22
        }
577 22
578
        return $argumentsArray;
579 22
    }
580 22
581
    protected function loadRpcClients()
582 22
    {
583 22
        foreach ($this->config['rpc_clients'] as $key => $client) {
584
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
585 22
            $definition->setLazy($client['lazy']);
586
            $definition
587 31
                ->addTag('old_sound_rabbit_mq.rpc_client')
588
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
589 28
            $this->injectConnection($definition, $client['connection']);
590
            if ($this->collectorEnabled) {
591 28
                $this->injectLoggedChannel($definition, $key, $client['connection']);
592 28
            }
593
            if (array_key_exists('unserializer', $client)) {
594 28
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
595 28
            }
596 28
            if (array_key_exists('direct_reply_to', $client)) {
597
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
598 28
            }
599
            $definition->setPublic(true);
600 28
601 28
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
602 28
        }
603
    }
604 30
605
    protected function loadRpcServers()
606 30
    {
607 30
        foreach ($this->config['rpc_servers'] as $key => $server) {
608
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
609 31
            $definition
610
                ->setPublic(true)
611 31
                ->addTag('old_sound_rabbit_mq.base_amqp')
612
                ->addTag('old_sound_rabbit_mq.rpc_server')
613
                ->addMethodCall('initServer', array($key))
614
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
615
            $this->injectConnection($definition, $server['connection']);
616
            if ($this->collectorEnabled) {
617
                $this->injectLoggedChannel($definition, $key, $server['connection']);
618
            }
619
            if (array_key_exists('qos_options', $server)) {
620 26
                $definition->addMethodCall('setQosOptions', array(
621
                    $server['qos_options']['prefetch_size'],
622 26
                    $server['qos_options']['prefetch_count'],
623 26
                    $server['qos_options']['global']
624
                ));
625
            }
626
            if (array_key_exists('exchange_options', $server)) {
627
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
628
            }
629
            if (array_key_exists('queue_options', $server)) {
630
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
631
            }
632
            if (array_key_exists('serializer', $server)) {
633 1
                $definition->addMethodCall('setSerializer', array($server['serializer']));
634
            }
635 1
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
636 1
        }
637
    }
638 1
639 1
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
640
    {
641
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
642
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
643
        $channel
644
            ->setPublic(false)
645
            ->addTag('old_sound_rabbit_mq.logged_channel');
646 1
        $this->injectConnection($channel, $connectionName);
647
648
        $this->container->setDefinition($id, $channel);
649 1
650
        $this->channelIds[] = $id;
651
        $definition->addArgument(new Reference($id));
652
    }
653
654
    protected function injectConnection(Definition $definition, $connectionName)
655
    {
656
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
657
    }
658
659
    public function getAlias()
660
    {
661 27
        return 'old_sound_rabbit_mq';
662
    }
663
664 27
    /**
665
     * Add proper dequeuer aware call
666
     *
667
     * @param string $callback
668
     * @param string $name
669
     */
670
    protected function addDequeuerAwareCall($callback, $name)
671
    {
672
        if (!$this->container->has($callback)) {
673
            return;
674
        }
675
676
        $callbackDefinition = $this->container->findDefinition($callback);
677
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
678
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
679
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
680
        }
681
    }
682
683
    private function injectLogger(Definition $definition)
684
    {
685
        $definition->addTag('monolog.logger', array(
686
            'channel' => 'phpamqplib'
687
        ));
688
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
689
    }
690
691
    /**
692
     * Get default AMQP exchange options
693
     *
694
     * @return array
695
     */
696
    protected function getDefaultExchangeOptions()
697
    {
698
        return array(
699
            'name' => '',
700
            'type' => 'direct',
701
            'passive' => true,
702
            'declare' => false
703
        );
704
    }
705
706
    /**
707
     * Get default AMQP queue options
708
     *
709
     * @return array
710
     */
711
    protected function getDefaultQueueOptions()
712
    {
713
        return array(
714
            'name' => '',
715
            'declare' => false
716
        );
717
    }
718
}
719