Passed
Pull Request — master (#631)
by Aleksandr
07:53
created

OldSoundRabbitMqExtension::loadGroupConsumers()   F

Complexity

Conditions 13
Paths 1537

Size

Total Lines 68
Code Lines 42

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 28
CRAP Score 18.3885

Importance

Changes 0
Metric Value
cc 13
eloc 42
nc 1537
nop 0
dl 0
loc 68
ccs 28
cts 41
cp 0.6828
crap 18.3885
rs 2.45
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
6
use Symfony\Component\DependencyInjection\Argument\TaggedIteratorArgument;
7
use Symfony\Component\DependencyInjection\ContainerInterface;
8
use Symfony\Component\DependencyInjection\Extension\Extension;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\Definition;
11
use Symfony\Component\DependencyInjection\Reference;
12
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
13
use Symfony\Component\Config\FileLocator;
14
15
/**
16
 * OldSoundRabbitMqExtension.
17
 *
18
 * @author Alvaro Videla
19
 * @author Marc Weistroff <[email protected]>
20
 */
21
class OldSoundRabbitMqExtension extends Extension
22
{
23
    /**
24
     * @var ContainerBuilder
25
     */
26
    private $container;
27
28
    /**
29
     * @var Boolean Whether the data collector is enabled
30
     */
31
    private $collectorEnabled;
32
33
    private $channelIds = array();
34
    private $groups = array();
35
36 31
    private $config = array();
37
38 31
    public function load(array $configs, ContainerBuilder $container)
39
    {
40 31
        $this->container = $container;
41 31
42
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
43 31
        $loader->load('rabbitmq.xml');
44 31
45
        $configuration = $this->getConfiguration($configs, $container);
46 31
        $this->config = $this->processConfiguration($configuration, $configs);
47
48 31
        $this->collectorEnabled = $this->config['enable_collector'];
49 31
50 31
        $this->loadConnections();
51 31
        $this->loadBindings();
52 31
        $this->loadProducers();
53 31
        $this->loadConsumers();
54 31
        $this->loadMultipleConsumers();
55 31
        $this->loadDynamicConsumers();
56 31
        $this->loadBatchConsumers();
57 31
        $this->loadAnonConsumers();
58
        $this->loadGroupConsumers();
59 31
        $this->loadRpcClients();
60 28
        $this->loadRpcServers();
61 28
62 28
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
63
            $channels = array();
64
            foreach (array_unique($this->channelIds) as $id) {
65 28
                $channels[] = new Reference($id);
66 28
            }
67
68 3
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
69
            $definition->replaceArgument(0, $channels);
70 31
        } else {
71
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
72 31
        }
73
    }
74 31
75
    public function getConfiguration(array $config, ContainerBuilder $container)
76
    {
77 31
        return new Configuration($this->getAlias());
78
    }
79 31
80 31
    protected function loadConnections()
81
    {
82 31
        foreach ($this->config['connections'] as $key => $connection) {
83 22
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
84 31
            $classParam =
85
                $connection['lazy']
86 31
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
87 31
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
88
89 31
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
90
                $classParam, $connection,
91
            ));
92
            if (isset($connection['connection_parameters_provider'])) {
93 31
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
94 31
                unset($connection['connection_parameters_provider']);
95 31
            }
96
            $definition->setPublic(false);
97 31
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
98 31
            $this->container->setDefinition($factoryName, $definition);
99
100 31
            $definition = new Definition($classParam);
101
            if (method_exists($definition, 'setFactory')) {
102
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
103
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
104
            } else {
105
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
106 31
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107 31
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

107
                $definition->/** @scrutinizer ignore-call */ 
108
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
108
            }
109 31
            $definition->addTag('old_sound_rabbit_mq.connection');
110
            $definition->setPublic(true);
111 31
112
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
113 31
        }
114
    }
115 31
116
    protected function loadBindings()
117
    {
118 31
        if ($this->config['sandbox']) {
119 22
            return;
120 22
        }
121 22
        foreach ($this->config['bindings'] as $binding) {
122 22
            ksort($binding);
123 22
            $definition = new Definition($binding['class']);
124 22
            $definition->addTag('old_sound_rabbit_mq.binding');
125 22
            $definition->addMethodCall('setArguments', array($binding['arguments']));
126 22
            $definition->addMethodCall('setDestination', array($binding['destination']));
127 22
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
128 22
            $definition->addMethodCall('setExchange', array($binding['exchange']));
129 22
            $definition->addMethodCall('isNowait', array($binding['nowait']));
130 22
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
131
            $this->injectConnection($definition, $binding['connection']);
132 22
            $key = md5(json_encode($binding));
133
            if ($this->collectorEnabled) {
134 22
                // in the context of a binding, I don't thing logged channels are needed?
135
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
136 31
            }
137
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
138 31
        }
139
    }
140 31
141 31
    protected function loadProducers()
142 27
    {
143 27
        if ($this->config['sandbox'] == false) {
144 27
            foreach ($this->config['producers'] as $key => $producer) {
145 27
                $definition = new Definition($producer['class']);
146
                $definition->setPublic(true);
147 27
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
148 1
                $definition->addTag('old_sound_rabbit_mq.producer');
149
                //this producer doesn't define an exchange -> using AMQP Default
150 27
                if (!isset($producer['exchange_options'])) {
151
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
152 27
                }
153 27
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
154
                //this producer doesn't define a queue -> using AMQP Default
155 27
                if (!isset($producer['queue_options'])) {
156 27
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
157 27
                }
158 25
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
159
                $this->injectConnection($definition, $producer['connection']);
160 27
                if ($this->collectorEnabled) {
161
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
162
                }
163
                if (!$producer['auto_setup_fabric']) {
164 27
                    $definition->addMethodCall('disableAutoSetupFabric');
165
                }
166
167
                if ($producer['enable_logger']) {
168 27
                    $this->injectLogger($definition);
169
                }
170 27
171 27
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
172 22
173
                $this->container->setDefinition($producerServiceName, $definition);
174
                if (null !== $producer['service_alias']) {
175
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
176
                }
177
            }
178
        } else {
179
            foreach ($this->config['producers'] as $key => $producer) {
180
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
181 31
                $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer', $key), $definition);
182
            }
183 31
        }
184
    }
185 31
186 26
    protected function loadConsumers()
187 26
    {
188 26
        foreach ($this->config['consumers'] as $key => $consumer) {
189 26
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
190
            $definition->setPublic(true);
191 26
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
192
            $definition->addTag('old_sound_rabbit_mq.consumer');
193
            foreach ($consumer['groups'] as $group) {
194 26
                $this->groups[] = $group;
195
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
196 26
            }
197
            //this consumer doesn't define an exchange -> using AMQP Default
198
            if (!isset($consumer['exchange_options'])) {
199 26
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
200 26
            }
201
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
202 26
            //this consumer doesn't define a queue -> using AMQP Default
203 22
            if (!isset($consumer['queue_options'])) {
204 22
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
205 22
            }
206 22
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
207
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
208
209
            if (array_key_exists('qos_options', $consumer)) {
210 26
                $definition->addMethodCall('setQosOptions', array(
211
                    $consumer['qos_options']['prefetch_size'],
212
                    $consumer['qos_options']['prefetch_count'],
213 26
                    $consumer['qos_options']['global']
214
                ));
215
            }
216 26
217 22
            if (isset($consumer['idle_timeout'])) {
218
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
219 26
            }
220
            if (isset($consumer['idle_timeout_exit_code'])) {
221
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
222
            }
223
            if (isset($consumer['timeout_wait'])) {
224
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
225
            }
226
            if (isset($consumer['graceful_max_execution'])) {
227
                $definition->addMethodCall(
228
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
229 26
                    array($consumer['graceful_max_execution']['timeout'])
230
                );
231
                $definition->addMethodCall(
232
                    'setGracefulMaxExecutionTimeoutExitCode',
233 26
                    array($consumer['graceful_max_execution']['exit_code'])
234 26
                );
235 25
            }
236
            if (!$consumer['auto_setup_fabric']) {
237
                $definition->addMethodCall('disableAutoSetupFabric');
238 26
            }
239 1
240
            $this->injectConnection($definition, $consumer['connection']);
241
            if ($this->collectorEnabled) {
242 26
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
243 26
            }
244 26
245
            if ($consumer['enable_logger']) {
246 31
                $this->injectLogger($definition);
247
            }
248 31
249
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
250 31
            $this->container->setDefinition($name, $definition);
251 22
            $this->addDequeuerAwareCall($consumer['callback'], $name);
252 22
        }
253
    }
254 22
255
    protected function loadMultipleConsumers()
256
    {
257
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
258
            $queues = array();
259
            $callbacks = array();
260
261 22
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
262 22
                throw new InvalidConfigurationException(
263 22
                    "Error on loading $key multiple consumer. " .
264 22
                    "Either 'queues' or 'queues_provider' parameters should be defined."
265
                );
266
            }
267 22
268
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
269 22
                $queues[$queueOptions['name']] = $queueOptions;
270 22
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
271 22
                $callbacks[] = $queueOptions['callback'];
272 22
            }
273 22
274
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
275 22
            $definition
276 22
                ->setPublic(true)
277 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
278 22
                ->addTag('old_sound_rabbit_mq.multi_consumer')
279
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
280
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
281
282 22
            foreach ($consumer['groups'] as $group) {
283
                $this->groups[] = $group;
284
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
285
            }
286
287
            if ($consumer['queues_provider']) {
288
                $definition->addMethodCall(
289
                    'setQueuesProvider',
290 22
                    array(new Reference($consumer['queues_provider']))
291
                );
292
            }
293 22
294
            if (array_key_exists('qos_options', $consumer)) {
295
                $definition->addMethodCall('setQosOptions', array(
296 22
                    $consumer['qos_options']['prefetch_size'],
297 22
                    $consumer['qos_options']['prefetch_count'],
298
                    $consumer['qos_options']['global']
299 22
                ));
300
            }
301
302
            if (isset($consumer['idle_timeout'])) {
303
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
304
            }
305
            if (isset($consumer['idle_timeout_exit_code'])) {
306
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
307
            }
308
            if (isset($consumer['timeout_wait'])) {
309 22
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
310
            }
311
            if (isset($consumer['graceful_max_execution'])) {
312
                $definition->addMethodCall(
313 22
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
314 22
                    array($consumer['graceful_max_execution']['timeout'])
315 22
                );
316
                $definition->addMethodCall(
317
                    'setGracefulMaxExecutionTimeoutExitCode',
318 22
                    array($consumer['graceful_max_execution']['exit_code'])
319
                );
320
            }
321
            if (!$consumer['auto_setup_fabric']) {
322 22
                $definition->addMethodCall('disableAutoSetupFabric');
323 22
            }
324 22
325 22
            $this->injectConnection($definition, $consumer['connection']);
326
            if ($this->collectorEnabled) {
327 22
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
328 22
            }
329
330
            if ($consumer['enable_logger']) {
331 31
                $this->injectLogger($definition);
332
            }
333 31
334
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
335 31
            $this->container->setDefinition($name, $definition);
336
            if ($consumer['queues_provider']) {
337 22
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
338
            }
339
            foreach ($callbacks as $callback) {
340
                $this->addDequeuerAwareCall($callback, $name);
341
            }
342
        }
343
    }
344 22
345
    protected function loadDynamicConsumers()
346 22
    {
347 22
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
348 22
349 22
            if (empty($consumer['queue_options_provider'])) {
350 22
                throw new InvalidConfigurationException(
351 22
                    "Error on loading $key dynamic consumer. " .
352
                    "'queue_provider' parameter should be defined."
353 22
                );
354
            }
355
356
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
357
            $definition
358
                ->setPublic(true)
359
                ->addTag('old_sound_rabbit_mq.base_amqp')
360
                ->addTag('old_sound_rabbit_mq.consumer')
361 22
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
362 22
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
363 22
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
364
365
            foreach ($consumer['groups'] as $group) {
366 22
                $this->groups[] = $group;
367
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
368
            }
369 22
370
            if (array_key_exists('qos_options', $consumer)) {
371
                $definition->addMethodCall('setQosOptions', array(
372 22
                    $consumer['qos_options']['prefetch_size'],
373
                    $consumer['qos_options']['prefetch_count'],
374
                    $consumer['qos_options']['global']
375 22
                ));
376
            }
377
378
            $definition->addMethodCall(
379
                'setQueueOptionsProvider',
380
                array(new Reference($consumer['queue_options_provider']))
381
            );
382
383
            if (isset($consumer['idle_timeout'])) {
384
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
385 22
            }
386
            if (isset($consumer['idle_timeout_exit_code'])) {
387
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
388
            }
389 22
            if (isset($consumer['timeout_wait'])) {
390 22
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
391 22
            }
392
            if (isset($consumer['graceful_max_execution'])) {
393
                $definition->addMethodCall(
394 22
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
395
                    array($consumer['graceful_max_execution']['timeout'])
396
                );
397
                $definition->addMethodCall(
398 22
                    'setGracefulMaxExecutionTimeoutExitCode',
399 22
                    array($consumer['graceful_max_execution']['exit_code'])
400 22
                );
401 22
            }
402
            if (!$consumer['auto_setup_fabric']) {
403 31
                $definition->addMethodCall('disableAutoSetupFabric');
404
            }
405 31
406
            $this->injectConnection($definition, $consumer['connection']);
407 31
            if ($this->collectorEnabled) {
408
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
409
            }
410
411
            if ($consumer['enable_logger']) {
412
                $this->injectLogger($definition);
413
            }
414
415
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
416
            $this->container->setDefinition($name, $definition);
417
            $this->addDequeuerAwareCall($consumer['callback'], $name);
418
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
419
        }
420
    }
421
422
    protected function loadBatchConsumers()
423
    {
424
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
425
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
426
427
            if (!isset($consumer['exchange_options'])) {
428
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
429
            }
430
431
            $definition
432
                ->setPublic(true)
433
                ->addTag('old_sound_rabbit_mq.base_amqp')
434
                ->addTag('old_sound_rabbit_mq.batch_consumer')
435
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
436
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
437
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
438
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
439
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
440
                ->addMethodCall('setQosOptions', array(
441
                    $consumer['qos_options']['prefetch_size'],
442
                    $consumer['qos_options']['prefetch_count'],
443
                    $consumer['qos_options']['global']
444
                ))
445
            ;
446
447
            foreach ($consumer['groups'] as $group) {
448
                $this->groups[] = $group;
449
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
450
            }
451
452
            if (isset($consumer['idle_timeout_exit_code'])) {
453
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
454
            }
455
456
            if (isset($consumer['idle_timeout'])) {
457
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
458
            }
459
460
            if (isset($consumer['graceful_max_execution'])) {
461
                $definition->addMethodCall(
462
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
463
                    array($consumer['graceful_max_execution']['timeout'])
464 31
                );
465
            }
466 31
467
            if (!$consumer['auto_setup_fabric']) {
468 31
                $definition->addMethodCall('disableAutoSetupFabric');
469 22
            }
470
471 22
            if ($consumer['keep_alive']) {
472 22
                $definition->addMethodCall('keepAlive');
473 22
            }
474 22
475 22
            $this->injectConnection($definition, $consumer['connection']);
476 22
            if ($this->collectorEnabled) {
477 22
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
478 22
            }
479
480
            if ($consumer['enable_logger']) {
481 22
                $this->injectLogger($definition);
482 22
            }
483 22
484
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
485 31
        }
486
    }
487
488
    protected function loadAnonConsumers()
489
    {
490
        foreach ($this->config['anon_consumers'] as $key => $anon) {
491
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
492
            $definition
493
                ->setPublic(true)
494
                ->addTag('old_sound_rabbit_mq.base_amqp')
495 27
                ->addTag('old_sound_rabbit_mq.anon_consumer')
496
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
497 27
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
498 1
            $this->injectConnection($definition, $anon['connection']);
499
            if ($this->collectorEnabled) {
500 1
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
501
            }
502
503
            foreach ($anon['groups'] as $group) {
504 1
                $this->groups[] = $group;
505 1
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
506 1
            }
507
508
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
509 1
            $this->container->setDefinition($name, $definition);
510
            $this->addDequeuerAwareCall($anon['callback'], $name);
511 1
        }
512
    }
513 27
    
514
    protected function loadGroupConsumers()
515
    {
516
        $groupConsumers = $this->config['group_consumers'];
517
        $this->groups = array_unique(array_merge($this->groups, array_keys($groupConsumers)));
518
519
        $defaultConfig = array('connection' => 'default', 'auto_setup_fabric' => true, 'enable_logger' => false);
520
        foreach ($this->groups as $key) {
521
            $group = isset($groupConsumers[$key]) ? $groupConsumers[$key] : $defaultConfig;
522
            $definition = new Definition('%old_sound_rabbit_mq.group_consumer.class%');
523
            $definition
524
                ->setPublic(true)
525
                ->addTag('old_sound_rabbit_mq.base_amqp')
526
                ->addTag('old_sound_rabbit_mq.group_consumer')
527
            ;
528
            $connection = isset($group['connection']) ? $group['connection'] : 'default';
529
            $this->injectConnection($definition, $connection);
530
            if ($this->collectorEnabled) {
531
                $this->injectLoggedChannel($definition, $key . '_group_consumer', $connection);
532
            }
533
534
            if ($group['enable_logger']) {
535
                $this->injectLogger($definition);
536
            }
537
538
            $definition->addMethodCall('addConsumers', array(
539
                new TaggedIteratorArgument('old_sound_rabbit_mq.consumer.group.' . $key))
540 31
            );
541
542 31
            if (isset($group['timeout_wait'])) {
543 25
                $definition->addMethodCall('setTimeoutWait', array($group['timeout_wait']));
544 25
            }
545
546 25
            if (array_key_exists('qos_options', $group)) {
547 25
                $definition->addMethodCall('setQosOptions', array(
548 25
                    $group['qos_options']['prefetch_size'],
549 25
                    $group['qos_options']['prefetch_count'],
550 25
                    $group['qos_options']['global']
551
                ));
552 25
            }
553 25
554
            if (isset($group['idle_timeout'])) {
555 25
                $definition->addMethodCall('setIdleTimeout', array($group['idle_timeout']));
556 25
            }
557
            if (isset($group['idle_timeout_exit_code'])) {
558 25
                $definition->addMethodCall('setIdleTimeoutExitCode', array($group['idle_timeout_exit_code']));
559
            }
560 25
561
            if (isset($group['graceful_max_execution'])) {
562 31
                $definition->addMethodCall(
563
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
564 31
                    array($group['graceful_max_execution']['timeout'])
565
                );
566 31
                $definition->addMethodCall(
567 22
                    'setGracefulMaxExecutionTimeoutExitCode',
568
                    array($group['graceful_max_execution']['exit_code'])
569 22
                );
570 22
            }
571 22
572 22
            if (isset($group['auto_setup_fabric'])) {
573 22
                if (!$group['auto_setup_fabric']) {
574 22
                    $definition->addMethodCall('disableAutoSetupFabric');
575 22
                } else {
576 22
                    // TODO disable grouped consumers
577
                }
578 22
            }
579
580
            $name = sprintf('old_sound_rabbit_mq.%s_group', $key);
581
            $this->container->setDefinition($name, $definition);
582
        }
583
    }
584
585 22
    /**
586 22
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
587
     * parameter. So we revert the change for right configurations.
588 22
     *
589 22
     * @param array $config
590
     *
591 22
     * @return array
592 22
     */
593
    private function normalizeArgumentKeys(array $config)
594 22
    {
595
        if (isset($config['arguments'])) {
596 31
            $arguments = $config['arguments'];
597
            // support for old configuration
598 28
            if (is_string($arguments)) {
599
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

599
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
600 28
            }
601 28
602
            $newArguments = array();
603 28
            foreach ($arguments as $key => $value) {
604 28
                if (strstr($key, '_')) {
605 28
                    $key = str_replace('_', '-', $key);
606
                }
607 28
                $newArguments[$key] = $value;
608
            }
609 28
            $config['arguments'] = $newArguments;
610 28
        }
611 28
        return $config;
612
    }
613 30
614
    /**
615 30
     * Support for arguments provided as string. Support for old configuration files.
616 30
     *
617
     * @deprecated
618 31
     * @param string $arguments
619
     * @return array
620 31
     */
621
    private function argumentsStringAsArray($arguments)
622
    {
623
        $argumentsArray = array();
624
625
        $argumentPairs = explode(',', $arguments);
626
        foreach ($argumentPairs as $argument) {
627
            $argumentPair = explode(':', $argument);
628
            $type = 'S';
629 26
            if (isset($argumentPair[2])) {
630
                $type = $argumentPair[2];
631 26
            }
632 26
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
633
        }
634
635
        return $argumentsArray;
636
    }
637
638
    protected function loadRpcClients()
639
    {
640
        foreach ($this->config['rpc_clients'] as $key => $client) {
641
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
642 1
            $definition->setLazy($client['lazy']);
643
            $definition
644 1
                ->addTag('old_sound_rabbit_mq.rpc_client')
645 1
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
646
            $this->injectConnection($definition, $client['connection']);
647 1
            if ($this->collectorEnabled) {
648 1
                $this->injectLoggedChannel($definition, $key, $client['connection']);
649
            }
650
            if (array_key_exists('unserializer', $client)) {
651
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
652
            }
653
            if (array_key_exists('direct_reply_to', $client)) {
654
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
655 1
            }
656
            $definition->setPublic(true);
657
658 1
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
659
        }
660
    }
661
662
    protected function loadRpcServers()
663
    {
664
        foreach ($this->config['rpc_servers'] as $key => $server) {
665
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
666
            $definition
667
                ->setPublic(true)
668
                ->addTag('old_sound_rabbit_mq.base_amqp')
669
                ->addTag('old_sound_rabbit_mq.rpc_server')
670 27
                ->addMethodCall('initServer', array($key))
671
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
672
            $this->injectConnection($definition, $server['connection']);
673 27
            if ($this->collectorEnabled) {
674
                $this->injectLoggedChannel($definition, $key, $server['connection']);
675
            }
676
            if (array_key_exists('qos_options', $server)) {
677
                $definition->addMethodCall('setQosOptions', array(
678
                    $server['qos_options']['prefetch_size'],
679
                    $server['qos_options']['prefetch_count'],
680
                    $server['qos_options']['global']
681
                ));
682
            }
683
            if (array_key_exists('exchange_options', $server)) {
684
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
685
            }
686
            if (array_key_exists('queue_options', $server)) {
687
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
688
            }
689
            if (array_key_exists('serializer', $server)) {
690
                $definition->addMethodCall('setSerializer', array($server['serializer']));
691
            }
692
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
693
        }
694
    }
695
696
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
697
    {
698
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
699
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
700
        $channel
701
            ->setPublic(false)
702
            ->addTag('old_sound_rabbit_mq.logged_channel');
703
        $this->injectConnection($channel, $connectionName);
704
705
        $this->container->setDefinition($id, $channel);
706
707
        $this->channelIds[] = $id;
708
        $definition->addArgument(new Reference($id));
709
    }
710
711
    protected function injectConnection(Definition $definition, $connectionName)
712
    {
713
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
714
    }
715
716
    public function getAlias()
717
    {
718
        return 'old_sound_rabbit_mq';
719
    }
720
721
    /**
722
     * Add proper dequeuer aware call
723
     *
724
     * @param string $callback
725
     * @param string $name
726
     */
727
    protected function addDequeuerAwareCall($callback, $name)
728
    {
729
        if (!$this->container->has($callback)) {
730
            return;
731
        }
732
733
        $callbackDefinition = $this->container->findDefinition($callback);
734
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
735
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
736
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
737
        }
738
    }
739
740
    private function injectLogger(Definition $definition)
741
    {
742
        $definition->addTag('monolog.logger', array(
743
            'channel' => 'phpamqplib'
744
        ));
745
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
746
    }
747
748
    /**
749
     * Get default AMQP exchange options
750
     *
751
     * @return array
752
     */
753
    protected function getDefaultExchangeOptions()
754
    {
755
        return array(
756
            'name' => '',
757
            'type' => 'direct',
758
            'passive' => true,
759
            'declare' => false
760
        );
761
    }
762
763
    /**
764
     * Get default AMQP queue options
765
     *
766
     * @return array
767
     */
768
    protected function getDefaultQueueOptions()
769
    {
770
        return array(
771
            'name' => '',
772
            'declare' => false
773
        );
774
    }
775
}
776