Test Failed
Pull Request — master (#38)
by Aleksandr
07:22
created

OldSoundRabbitMqExtension::loadBatchConsumers()   F

Complexity

Conditions 11
Paths 513

Size

Total Lines 62
Code Lines 37

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 28.953

Importance

Changes 7
Bugs 1 Features 1
Metric Value
cc 11
eloc 37
c 7
b 1
f 1
nc 513
nop 0
dl 0
loc 62
ccs 16
cts 34
cp 0.4706
crap 28.953
rs 3.8263

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
6
use Symfony\Component\DependencyInjection\Argument\TaggedIteratorArgument;
7
use Symfony\Component\DependencyInjection\ContainerInterface;
8
use Symfony\Component\DependencyInjection\Extension\Extension;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\Definition;
11
use Symfony\Component\DependencyInjection\Reference;
12
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
13
use Symfony\Component\Config\FileLocator;
14
15
/**
16
 * OldSoundRabbitMqExtension.
17
 *
18
 * @author Alvaro Videla
19
 * @author Marc Weistroff <[email protected]>
20
 */
21
class OldSoundRabbitMqExtension extends Extension
22
{
23
    /**
24
     * @var ContainerBuilder
25
     */
26
    private $container;
27
28
    /**
29
     * @var Boolean Whether the data collector is enabled
30
     */
31
    private $collectorEnabled;
32
33
    private $channelIds = array();
34
35
    private $config = array();
36 31
37
    public function load(array $configs, ContainerBuilder $container)
38 31
    {
39
        $this->container = $container;
40 31
41 31
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
42
        $loader->load('rabbitmq.xml');
43 31
44 31
        $configuration = $this->getConfiguration($configs, $container);
45
        $this->config = $this->processConfiguration($configuration, $configs);
46 31
47
        $this->collectorEnabled = $this->config['enable_collector'];
48 31
49 31
        $this->loadConnections();
50 31
        $this->loadBindings();
51 31
        $this->loadProducers();
52 31
        $this->loadConsumers();
53 31
        $this->loadMultipleConsumers();
54 31
        $this->loadDynamicConsumers();
55 31
        $this->loadBatchConsumers();
56 31
        $this->loadAnonConsumers();
57 31
        $this->loadGroupConsumers();
58
        $this->loadRpcClients();
59 31
        $this->loadRpcServers();
60 28
61 28
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
62 28
            $channels = array();
63
            foreach (array_unique($this->channelIds) as $id) {
64
                $channels[] = new Reference($id);
65 28
            }
66 28
67
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
68 3
            $definition->replaceArgument(0, $channels);
69
        } else {
70 31
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
71
        }
72 31
    }
73
74 31
    public function getConfiguration(array $config, ContainerBuilder $container)
75
    {
76
        return new Configuration($this->getAlias());
77 31
    }
78
79 31
    protected function loadConnections()
80 31
    {
81
        foreach ($this->config['connections'] as $key => $connection) {
82 31
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
83 22
            $classParam =
84 31
                $connection['lazy']
85
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
86 31
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
87 31
88
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
89 31
                $classParam, $connection,
90
            ));
91
            if (isset($connection['connection_parameters_provider'])) {
92
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
93 31
                unset($connection['connection_parameters_provider']);
94 31
            }
95 31
            $definition->setPublic(false);
96
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
97 31
            $this->container->setDefinition($factoryName, $definition);
98 31
99
            $definition = new Definition($classParam);
100 31
            if (method_exists($definition, 'setFactory')) {
101
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
102
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
103
            } else {
104
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
105
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

105
                $definition->/** @scrutinizer ignore-call */ 
106
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
106 31
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107 31
            }
108
            $definition->addTag('old_sound_rabbit_mq.connection');
109 31
            $definition->setPublic(true);
110
111 31
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
112
        }
113 31
    }
114
115 31
    protected function loadBindings()
116
    {
117
        if ($this->config['sandbox']) {
118 31
            return;
119 22
        }
120 22
        foreach ($this->config['bindings'] as $binding) {
121 22
            ksort($binding);
122 22
            $definition = new Definition($binding['class']);
123 22
            $definition->addTag('old_sound_rabbit_mq.binding');
124 22
            $definition->addMethodCall('setArguments', array($binding['arguments']));
125 22
            $definition->addMethodCall('setDestination', array($binding['destination']));
126 22
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
127 22
            $definition->addMethodCall('setExchange', array($binding['exchange']));
128 22
            $definition->addMethodCall('isNowait', array($binding['nowait']));
129 22
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
130 22
            $this->injectConnection($definition, $binding['connection']);
131
            $key = md5(json_encode($binding));
132 22
            if ($this->collectorEnabled) {
133
                // in the context of a binding, I don't thing logged channels are needed?
134 22
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
135
            }
136 31
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
137
        }
138 31
    }
139
140 31
    protected function loadProducers()
141 31
    {
142 27
        if ($this->config['sandbox'] == false) {
143 27
            foreach ($this->config['producers'] as $key => $producer) {
144 27
                $definition = new Definition($producer['class']);
145 27
                $definition->setPublic(true);
146
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
147 27
                $definition->addTag('old_sound_rabbit_mq.producer');
148 1
                //this producer doesn't define an exchange -> using AMQP Default
149
                if (!isset($producer['exchange_options'])) {
150 27
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
151
                }
152 27
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
153 27
                //this producer doesn't define a queue -> using AMQP Default
154
                if (!isset($producer['queue_options'])) {
155 27
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
156 27
                }
157 27
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
158 25
                $this->injectConnection($definition, $producer['connection']);
159
                if ($this->collectorEnabled) {
160 27
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
161
                }
162
                if (!$producer['auto_setup_fabric']) {
163
                    $definition->addMethodCall('disableAutoSetupFabric');
164 27
                }
165
166
                if ($producer['enable_logger']) {
167
                    $this->injectLogger($definition);
168 27
                }
169
170 27
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
171 27
172 22
                $this->container->setDefinition($producerServiceName, $definition);
173
                if (null !== $producer['service_alias']) {
174
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
175
                }
176
            }
177
        } else {
178
            foreach ($this->config['producers'] as $key => $producer) {
179
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
180
                $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer', $key), $definition);
181 31
            }
182
        }
183 31
    }
184
185 31
    protected function loadConsumers()
186 26
    {
187 26
        foreach ($this->config['consumers'] as $key => $consumer) {
188 26
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
189 26
            $definition->setPublic(true);
190
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
191 26
            $definition->addTag('old_sound_rabbit_mq.consumer');
192
            foreach ($consumer['groups'] as $group) {
193
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
194 26
            }
195
            //this consumer doesn't define an exchange -> using AMQP Default
196 26
            if (!isset($consumer['exchange_options'])) {
197
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
198
            }
199 26
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
200 26
            //this consumer doesn't define a queue -> using AMQP Default
201
            if (!isset($consumer['queue_options'])) {
202 26
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
203 22
            }
204 22
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
205 22
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
206 22
207
            if (array_key_exists('qos_options', $consumer)) {
208
                $definition->addMethodCall('setQosOptions', array(
209
                    $consumer['qos_options']['prefetch_size'],
210 26
                    $consumer['qos_options']['prefetch_count'],
211
                    $consumer['qos_options']['global']
212
                ));
213 26
            }
214
215
            if (isset($consumer['idle_timeout'])) {
216 26
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
217
            }
218
            if (isset($consumer['idle_timeout_exit_code'])) {
219
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
220
            }
221
            if (isset($consumer['timeout_wait'])) {
222
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
223
            }
224
            if (isset($consumer['graceful_max_execution'])) {
225
                $definition->addMethodCall(
226 26
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
227
                    array($consumer['graceful_max_execution']['timeout'])
228
                );
229
                $definition->addMethodCall(
230 26
                    'setGracefulMaxExecutionTimeoutExitCode',
231 26
                    array($consumer['graceful_max_execution']['exit_code'])
232 25
                );
233
            }
234
            if (!$consumer['auto_setup_fabric']) {
235 26
                $definition->addMethodCall('disableAutoSetupFabric');
236 1
            }
237
238
            $this->injectConnection($definition, $consumer['connection']);
239 26
            if ($this->collectorEnabled) {
240 26
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
241 26
            }
242
243 31
            if ($consumer['enable_logger']) {
244
                $this->injectLogger($definition);
245 31
            }
246
247 31
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
248 22
            $this->container->setDefinition($name, $definition);
249 22
            $this->addDequeuerAwareCall($consumer['callback'], $name);
250
        }
251 22
    }
252
253
    protected function loadMultipleConsumers()
254
    {
255
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
256
            $queues = array();
257
            $callbacks = array();
258 22
259 22
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
260 22
                throw new InvalidConfigurationException(
261 22
                    "Error on loading $key multiple consumer. " .
262
                    "Either 'queues' or 'queues_provider' parameters should be defined."
263
                );
264 22
            }
265
266 22
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
267 22
                $queues[$queueOptions['name']] = $queueOptions;
268 22
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
269 22
                $callbacks[] = $queueOptions['callback'];
270 22
            }
271
272 22
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
273 22
            $definition
274 22
                ->setPublic(true)
275 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
276
                ->addTag('old_sound_rabbit_mq.multi_consumer')
277
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
278
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
279 22
280
            foreach ($consumer['groups'] as $group) {
281
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
282
            }
283
284
            if ($consumer['queues_provider']) {
285
                $definition->addMethodCall(
286
                    'setQueuesProvider',
287 22
                    array(new Reference($consumer['queues_provider']))
288
                );
289
            }
290 22
291
            if (array_key_exists('qos_options', $consumer)) {
292
                $definition->addMethodCall('setQosOptions', array(
293 22
                    $consumer['qos_options']['prefetch_size'],
294
                    $consumer['qos_options']['prefetch_count'],
295
                    $consumer['qos_options']['global']
296
                ));
297
            }
298
299
            if (isset($consumer['idle_timeout'])) {
300
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
301
            }
302
            if (isset($consumer['idle_timeout_exit_code'])) {
303 22
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
304
            }
305
            if (isset($consumer['timeout_wait'])) {
306
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
307 22
            }
308 22
            if (isset($consumer['graceful_max_execution'])) {
309 22
                $definition->addMethodCall(
310
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
311
                    array($consumer['graceful_max_execution']['timeout'])
312 22
                );
313
                $definition->addMethodCall(
314
                    'setGracefulMaxExecutionTimeoutExitCode',
315
                    array($consumer['graceful_max_execution']['exit_code'])
316 22
                );
317 22
            }
318 22
            if (!$consumer['auto_setup_fabric']) {
319 22
                $definition->addMethodCall('disableAutoSetupFabric');
320
            }
321 22
322 22
            $this->injectConnection($definition, $consumer['connection']);
323
            if ($this->collectorEnabled) {
324
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
325 31
            }
326
327 31
            if ($consumer['enable_logger']) {
328
                $this->injectLogger($definition);
329 31
            }
330
331 22
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
332
            $this->container->setDefinition($name, $definition);
333
            if ($consumer['queues_provider']) {
334
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
335
            }
336
            foreach ($callbacks as $callback) {
337
                $this->addDequeuerAwareCall($callback, $name);
338 22
            }
339
        }
340 22
    }
341 22
342 22
    protected function loadDynamicConsumers()
343 22
    {
344 22
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
345 22
346
            if (empty($consumer['queue_options_provider'])) {
347 22
                throw new InvalidConfigurationException(
348
                    "Error on loading $key dynamic consumer. " .
349
                    "'queue_provider' parameter should be defined."
350
                );
351
            }
352
353
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
354
            $definition
355 22
                ->setPublic(true)
356 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
357 22
                ->addTag('old_sound_rabbit_mq.consumer')
358
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
359
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
360 22
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
361
362
            foreach ($consumer['groups'] as $group) {
363 22
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
364
            }
365
366 22
            if (array_key_exists('qos_options', $consumer)) {
367
                $definition->addMethodCall('setQosOptions', array(
368
                    $consumer['qos_options']['prefetch_size'],
369
                    $consumer['qos_options']['prefetch_count'],
370
                    $consumer['qos_options']['global']
371
                ));
372
            }
373
374
            $definition->addMethodCall(
375
                'setQueueOptionsProvider',
376 22
                array(new Reference($consumer['queue_options_provider']))
377
            );
378
379
            if (isset($consumer['idle_timeout'])) {
380 22
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
381 22
            }
382 22
            if (isset($consumer['idle_timeout_exit_code'])) {
383
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
384
            }
385 22
            if (isset($consumer['timeout_wait'])) {
386
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
387
            }
388
            if (isset($consumer['graceful_max_execution'])) {
389 22
                $definition->addMethodCall(
390 22
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
391 22
                    array($consumer['graceful_max_execution']['timeout'])
392 22
                );
393
                $definition->addMethodCall(
394 31
                    'setGracefulMaxExecutionTimeoutExitCode',
395
                    array($consumer['graceful_max_execution']['exit_code'])
396 31
                );
397
            }
398 31
            if (!$consumer['auto_setup_fabric']) {
399
                $definition->addMethodCall('disableAutoSetupFabric');
400
            }
401
402
            $this->injectConnection($definition, $consumer['connection']);
403
            if ($this->collectorEnabled) {
404
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
405
            }
406
407
            if ($consumer['enable_logger']) {
408
                $this->injectLogger($definition);
409
            }
410
411
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
412
            $this->container->setDefinition($name, $definition);
413
            $this->addDequeuerAwareCall($consumer['callback'], $name);
414
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
415
        }
416
    }
417
418
    protected function loadBatchConsumers()
419
    {
420
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
421
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
422
423
            if (!isset($consumer['exchange_options'])) {
424
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
425
            }
426
427
            $definition
428
                ->setPublic(true)
429
                ->addTag('old_sound_rabbit_mq.base_amqp')
430
                ->addTag('old_sound_rabbit_mq.batch_consumer')
431
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
432
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
433
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
434
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
435
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
436
                ->addMethodCall('setQosOptions', array(
437
                    $consumer['qos_options']['prefetch_size'],
438
                    $consumer['qos_options']['prefetch_count'],
439
                    $consumer['qos_options']['global']
440
                ))
441
            ;
442
443
            foreach ($consumer['groups'] as $group) {
444
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
445
            }
446
447
            if (isset($consumer['idle_timeout_exit_code'])) {
448
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
449
            }
450
451
            if (isset($consumer['idle_timeout'])) {
452
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
453
            }
454
455 31
            if (isset($consumer['graceful_max_execution'])) {
456
                $definition->addMethodCall(
457 31
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
458
                    array($consumer['graceful_max_execution']['timeout'])
459 31
                );
460 22
            }
461
462 22
            if (!$consumer['auto_setup_fabric']) {
463 22
                $definition->addMethodCall('disableAutoSetupFabric');
464 22
            }
465 22
466 22
            if ($consumer['keep_alive']) {
467 22
                $definition->addMethodCall('keepAlive');
468 22
            }
469 22
470
            $this->injectConnection($definition, $consumer['connection']);
471
            if ($this->collectorEnabled) {
472 22
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
473 22
            }
474 22
475
            if ($consumer['enable_logger']) {
476 31
                $this->injectLogger($definition);
477
            }
478
479
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
480
        }
481
    }
482
483
    protected function loadAnonConsumers()
484
    {
485
        foreach ($this->config['anon_consumers'] as $key => $anon) {
486 27
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
487
            $definition
488 27
                ->setPublic(true)
489 1
                ->addTag('old_sound_rabbit_mq.base_amqp')
490
                ->addTag('old_sound_rabbit_mq.anon_consumer')
491 1
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
492
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
493
            $this->injectConnection($definition, $anon['connection']);
494
            if ($this->collectorEnabled) {
495 1
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
496 1
            }
497 1
498
            foreach ($anon['groups'] as $group) {
499
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
500 1
            }
501
502 1
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
503
            $this->container->setDefinition($name, $definition);
504 27
            $this->addDequeuerAwareCall($anon['callback'], $name);
505
        }
506
    }
507
    
508
    protected function loadGroupConsumers()
509
    {
510
        foreach (array_merge_recursive(['default' => ['connection' => 'default']], $this->config['group_consumers']) as $key => $group) {
511
            $definition = new Definition('%old_sound_rabbit_mq.group_consumer.class%');
512
            $definition
513
                ->setPublic(true)
514
                ->addTag('old_sound_rabbit_mq.base_amqp')
515
                ->addTag('old_sound_rabbit_mq.group_consumer')
516
            ;
517
            $this->injectConnection($definition, $group['connection']);
518
            if ($this->collectorEnabled) {
519
                $this->injectLoggedChannel($definition, $key, $group['connection']);
520
            }
521
522
            $definition->addMethodCall('addConsumers', [new TaggedIteratorArgument('old_sound_rabbit_mq.consumer.group.' . $key)]);
523
524
            if (array_key_exists('qos_options', $group)) {
525
                $definition->addMethodCall('setQosOptions', array(
526
                    $consumer['qos_options']['prefetch_size'],
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $consumer seems to be never defined.
Loading history...
527
                    $consumer['qos_options']['prefetch_count'],
528
                    $consumer['qos_options']['global']
529
                ));
530
            }
531 31
532
            $name = sprintf('old_sound_rabbit_mq.%s_group', $key);
533 31
            $this->container->setDefinition($name, $definition);
534 25
        }
535 25
    }
536
537 25
    /**
538 25
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
539 25
     * parameter. So we revert the change for right configurations.
540 25
     *
541 25
     * @param array $config
542
     *
543 25
     * @return array
544 25
     */
545
    private function normalizeArgumentKeys(array $config)
546 25
    {
547 25
        if (isset($config['arguments'])) {
548
            $arguments = $config['arguments'];
549 25
            // support for old configuration
550
            if (is_string($arguments)) {
551 25
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

551
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
552
            }
553 31
554
            $newArguments = array();
555 31
            foreach ($arguments as $key => $value) {
556
                if (strstr($key, '_')) {
557 31
                    $key = str_replace('_', '-', $key);
558 22
                }
559
                $newArguments[$key] = $value;
560 22
            }
561 22
            $config['arguments'] = $newArguments;
562 22
        }
563 22
        return $config;
564 22
    }
565 22
566 22
    /**
567 22
     * Support for arguments provided as string. Support for old configuration files.
568
     *
569 22
     * @deprecated
570
     * @param string $arguments
571
     * @return array
572
     */
573
    private function argumentsStringAsArray($arguments)
574
    {
575
        $argumentsArray = array();
576 22
577 22
        $argumentPairs = explode(',', $arguments);
578
        foreach ($argumentPairs as $argument) {
579 22
            $argumentPair = explode(':', $argument);
580 22
            $type = 'S';
581
            if (isset($argumentPair[2])) {
582 22
                $type = $argumentPair[2];
583 22
            }
584
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
585 22
        }
586
587 31
        return $argumentsArray;
588
    }
589 28
590
    protected function loadRpcClients()
591 28
    {
592 28
        foreach ($this->config['rpc_clients'] as $key => $client) {
593
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
594 28
            $definition->setLazy($client['lazy']);
595 28
            $definition
596 28
                ->addTag('old_sound_rabbit_mq.rpc_client')
597
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
598 28
            $this->injectConnection($definition, $client['connection']);
599
            if ($this->collectorEnabled) {
600 28
                $this->injectLoggedChannel($definition, $key, $client['connection']);
601 28
            }
602 28
            if (array_key_exists('unserializer', $client)) {
603
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
604 30
            }
605
            if (array_key_exists('direct_reply_to', $client)) {
606 30
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
607 30
            }
608
            $definition->setPublic(true);
609 31
610
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
611 31
        }
612
    }
613
614
    protected function loadRpcServers()
615
    {
616
        foreach ($this->config['rpc_servers'] as $key => $server) {
617
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
618
            $definition
619
                ->setPublic(true)
620 26
                ->addTag('old_sound_rabbit_mq.base_amqp')
621
                ->addTag('old_sound_rabbit_mq.rpc_server')
622 26
                ->addMethodCall('initServer', array($key))
623 26
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
624
            $this->injectConnection($definition, $server['connection']);
625
            if ($this->collectorEnabled) {
626
                $this->injectLoggedChannel($definition, $key, $server['connection']);
627
            }
628
            if (array_key_exists('qos_options', $server)) {
629
                $definition->addMethodCall('setQosOptions', array(
630
                    $server['qos_options']['prefetch_size'],
631
                    $server['qos_options']['prefetch_count'],
632
                    $server['qos_options']['global']
633 1
                ));
634
            }
635 1
            if (array_key_exists('exchange_options', $server)) {
636 1
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
637
            }
638 1
            if (array_key_exists('queue_options', $server)) {
639 1
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
640
            }
641
            if (array_key_exists('serializer', $server)) {
642
                $definition->addMethodCall('setSerializer', array($server['serializer']));
643
            }
644
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
645
        }
646 1
    }
647
648
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
649 1
    {
650
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
651
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
652
        $channel
653
            ->setPublic(false)
654
            ->addTag('old_sound_rabbit_mq.logged_channel');
655
        $this->injectConnection($channel, $connectionName);
656
657
        $this->container->setDefinition($id, $channel);
658
659
        $this->channelIds[] = $id;
660
        $definition->addArgument(new Reference($id));
661 27
    }
662
663
    protected function injectConnection(Definition $definition, $connectionName)
664 27
    {
665
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
666
    }
667
668
    public function getAlias()
669
    {
670
        return 'old_sound_rabbit_mq';
671
    }
672
673
    /**
674
     * Add proper dequeuer aware call
675
     *
676
     * @param string $callback
677
     * @param string $name
678
     */
679
    protected function addDequeuerAwareCall($callback, $name)
680
    {
681
        if (!$this->container->has($callback)) {
682
            return;
683
        }
684
685
        $callbackDefinition = $this->container->findDefinition($callback);
686
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
687
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
688
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
689
        }
690
    }
691
692
    private function injectLogger(Definition $definition)
693
    {
694
        $definition->addTag('monolog.logger', array(
695
            'channel' => 'phpamqplib'
696
        ));
697
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
698
    }
699
700
    /**
701
     * Get default AMQP exchange options
702
     *
703
     * @return array
704
     */
705
    protected function getDefaultExchangeOptions()
706
    {
707
        return array(
708
            'name' => '',
709
            'type' => 'direct',
710
            'passive' => true,
711
            'declare' => false
712
        );
713
    }
714
715
    /**
716
     * Get default AMQP queue options
717
     *
718
     * @return array
719
     */
720
    protected function getDefaultQueueOptions()
721
    {
722
        return array(
723
            'name' => '',
724
            'declare' => false
725
        );
726
    }
727
}
728