Passed
Pull Request — master (#38)
by Aleksandr
09:33
created

OldSoundRabbitMqExtension::loadProducers()   B

Complexity

Conditions 10
Paths 67

Size

Total Lines 41
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 21
CRAP Score 10.7111

Importance

Changes 4
Bugs 0 Features 0
Metric Value
cc 10
eloc 27
c 4
b 0
f 0
nc 67
nop 0
dl 0
loc 41
ccs 21
cts 26
cp 0.8077
crap 10.7111
rs 7.6666

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
6
use Symfony\Component\DependencyInjection\Argument\TaggedIteratorArgument;
7
use Symfony\Component\DependencyInjection\ContainerInterface;
8
use Symfony\Component\DependencyInjection\Extension\Extension;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\Definition;
11
use Symfony\Component\DependencyInjection\Reference;
12
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
13
use Symfony\Component\Config\FileLocator;
14
15
/**
16
 * OldSoundRabbitMqExtension.
17
 *
18
 * @author Alvaro Videla
19
 * @author Marc Weistroff <[email protected]>
20
 */
21
class OldSoundRabbitMqExtension extends Extension
22
{
23
    /**
24
     * @var ContainerBuilder
25
     */
26
    private $container;
27
28
    /**
29
     * @var Boolean Whether the data collector is enabled
30
     */
31
    private $collectorEnabled;
32
33
    private $channelIds = array();
34
    private $groups = array();
35
36 31
    private $config = array();
37
38 31
    public function load(array $configs, ContainerBuilder $container)
39
    {
40 31
        $this->container = $container;
41 31
42
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
43 31
        $loader->load('rabbitmq.xml');
44 31
45
        $configuration = $this->getConfiguration($configs, $container);
46 31
        $this->config = $this->processConfiguration($configuration, $configs);
47
48 31
        $this->collectorEnabled = $this->config['enable_collector'];
49 31
50 31
        $this->loadConnections();
51 31
        $this->loadBindings();
52 31
        $this->loadProducers();
53 31
        $this->loadConsumers();
54 31
        $this->loadMultipleConsumers();
55 31
        $this->loadDynamicConsumers();
56 31
        $this->loadBatchConsumers();
57 31
        $this->loadAnonConsumers();
58
        $this->loadGroupConsumers();
59 31
        $this->loadRpcClients();
60 28
        $this->loadRpcServers();
61 28
62 28
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
63
            $channels = array();
64
            foreach (array_unique($this->channelIds) as $id) {
65 28
                $channels[] = new Reference($id);
66 28
            }
67
68 3
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
69
            $definition->replaceArgument(0, $channels);
70 31
        } else {
71
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
72 31
        }
73
    }
74 31
75
    public function getConfiguration(array $config, ContainerBuilder $container)
76
    {
77 31
        return new Configuration($this->getAlias());
78
    }
79 31
80 31
    protected function loadConnections()
81
    {
82 31
        foreach ($this->config['connections'] as $key => $connection) {
83 22
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
84 31
            $classParam =
85
                $connection['lazy']
86 31
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
87 31
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
88
89 31
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
90
                $classParam, $connection,
91
            ));
92
            if (isset($connection['connection_parameters_provider'])) {
93 31
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
94 31
                unset($connection['connection_parameters_provider']);
95 31
            }
96
            $definition->setPublic(false);
97 31
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
98 31
            $this->container->setDefinition($factoryName, $definition);
99
100 31
            $definition = new Definition($classParam);
101
            if (method_exists($definition, 'setFactory')) {
102
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
103
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
104
            } else {
105
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
106 31
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107 31
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

107
                $definition->/** @scrutinizer ignore-call */ 
108
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
108
            }
109 31
            $definition->addTag('old_sound_rabbit_mq.connection');
110
            $definition->setPublic(true);
111 31
112
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
113 31
        }
114
    }
115 31
116
    protected function loadBindings()
117
    {
118 31
        if ($this->config['sandbox']) {
119 22
            return;
120 22
        }
121 22
        foreach ($this->config['bindings'] as $binding) {
122 22
            ksort($binding);
123 22
            $definition = new Definition($binding['class']);
124 22
            $definition->addTag('old_sound_rabbit_mq.binding');
125 22
            $definition->addMethodCall('setArguments', array($binding['arguments']));
126 22
            $definition->addMethodCall('setDestination', array($binding['destination']));
127 22
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
128 22
            $definition->addMethodCall('setExchange', array($binding['exchange']));
129 22
            $definition->addMethodCall('isNowait', array($binding['nowait']));
130 22
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
131
            $this->injectConnection($definition, $binding['connection']);
132 22
            $key = md5(json_encode($binding));
133
            if ($this->collectorEnabled) {
134 22
                // in the context of a binding, I don't thing logged channels are needed?
135
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
136 31
            }
137
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
138 31
        }
139
    }
140 31
141 31
    protected function loadProducers()
142 27
    {
143 27
        if ($this->config['sandbox'] == false) {
144 27
            foreach ($this->config['producers'] as $key => $producer) {
145 27
                $definition = new Definition($producer['class']);
146
                $definition->setPublic(true);
147 27
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
148 1
                $definition->addTag('old_sound_rabbit_mq.producer');
149
                //this producer doesn't define an exchange -> using AMQP Default
150 27
                if (!isset($producer['exchange_options'])) {
151
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
152 27
                }
153 27
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
154
                //this producer doesn't define a queue -> using AMQP Default
155 27
                if (!isset($producer['queue_options'])) {
156 27
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
157 27
                }
158 25
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
159
                $this->injectConnection($definition, $producer['connection']);
160 27
                if ($this->collectorEnabled) {
161
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
162
                }
163
                if (!$producer['auto_setup_fabric']) {
164 27
                    $definition->addMethodCall('disableAutoSetupFabric');
165
                }
166
167
                if ($producer['enable_logger']) {
168 27
                    $this->injectLogger($definition);
169
                }
170 27
171 27
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
172 22
173
                $this->container->setDefinition($producerServiceName, $definition);
174
                if (null !== $producer['service_alias']) {
175
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
176
                }
177
            }
178
        } else {
179
            foreach ($this->config['producers'] as $key => $producer) {
180
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
181 31
                $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer', $key), $definition);
182
            }
183 31
        }
184
    }
185 31
186 26
    protected function loadConsumers()
187 26
    {
188 26
        foreach ($this->config['consumers'] as $key => $consumer) {
189 26
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
190
            $definition->setPublic(true);
191 26
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
192
            $definition->addTag('old_sound_rabbit_mq.consumer');
193
            foreach ($consumer['groups'] as $group) {
194 26
                $this->groups[] = $group;
195
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
196 26
            }
197
            //this consumer doesn't define an exchange -> using AMQP Default
198
            if (!isset($consumer['exchange_options'])) {
199 26
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
200 26
            }
201
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
202 26
            //this consumer doesn't define a queue -> using AMQP Default
203 22
            if (!isset($consumer['queue_options'])) {
204 22
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
205 22
            }
206 22
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
207
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
208
209
            if (array_key_exists('qos_options', $consumer)) {
210 26
                $definition->addMethodCall('setQosOptions', array(
211
                    $consumer['qos_options']['prefetch_size'],
212
                    $consumer['qos_options']['prefetch_count'],
213 26
                    $consumer['qos_options']['global']
214
                ));
215
            }
216 26
217
            if (isset($consumer['idle_timeout'])) {
218
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
219
            }
220
            if (isset($consumer['idle_timeout_exit_code'])) {
221
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
222
            }
223
            if (isset($consumer['timeout_wait'])) {
224
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
225
            }
226 26
            if (isset($consumer['graceful_max_execution'])) {
227
                $definition->addMethodCall(
228
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
229
                    array($consumer['graceful_max_execution']['timeout'])
230 26
                );
231 26
                $definition->addMethodCall(
232 25
                    'setGracefulMaxExecutionTimeoutExitCode',
233
                    array($consumer['graceful_max_execution']['exit_code'])
234
                );
235 26
            }
236 1
            if (!$consumer['auto_setup_fabric']) {
237
                $definition->addMethodCall('disableAutoSetupFabric');
238
            }
239 26
240 26
            $this->injectConnection($definition, $consumer['connection']);
241 26
            if ($this->collectorEnabled) {
242
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
243 31
            }
244
245 31
            if ($consumer['enable_logger']) {
246
                $this->injectLogger($definition);
247 31
            }
248 22
249 22
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
250
            $this->container->setDefinition($name, $definition);
251 22
            $this->addDequeuerAwareCall($consumer['callback'], $name);
252
        }
253
    }
254
255
    protected function loadMultipleConsumers()
256
    {
257
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
258 22
            $queues = array();
259 22
            $callbacks = array();
260 22
261 22
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
262
                throw new InvalidConfigurationException(
263
                    "Error on loading $key multiple consumer. " .
264 22
                    "Either 'queues' or 'queues_provider' parameters should be defined."
265
                );
266 22
            }
267 22
268 22
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
269 22
                $queues[$queueOptions['name']] = $queueOptions;
270 22
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
271
                $callbacks[] = $queueOptions['callback'];
272 22
            }
273 22
274 22
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
275 22
            $definition
276
                ->setPublic(true)
277
                ->addTag('old_sound_rabbit_mq.base_amqp')
278
                ->addTag('old_sound_rabbit_mq.multi_consumer')
279 22
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
280
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
281
282
            foreach ($consumer['groups'] as $group) {
283
                $this->groups[] = $group;
284
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
285
            }
286
287 22
            if ($consumer['queues_provider']) {
288
                $definition->addMethodCall(
289
                    'setQueuesProvider',
290 22
                    array(new Reference($consumer['queues_provider']))
291
                );
292
            }
293 22
294
            if (array_key_exists('qos_options', $consumer)) {
295
                $definition->addMethodCall('setQosOptions', array(
296
                    $consumer['qos_options']['prefetch_size'],
297
                    $consumer['qos_options']['prefetch_count'],
298
                    $consumer['qos_options']['global']
299
                ));
300
            }
301
302
            if (isset($consumer['idle_timeout'])) {
303 22
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
304
            }
305
            if (isset($consumer['idle_timeout_exit_code'])) {
306
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
307 22
            }
308 22
            if (isset($consumer['timeout_wait'])) {
309 22
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
310
            }
311
            if (isset($consumer['graceful_max_execution'])) {
312 22
                $definition->addMethodCall(
313
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
314
                    array($consumer['graceful_max_execution']['timeout'])
315
                );
316 22
                $definition->addMethodCall(
317 22
                    'setGracefulMaxExecutionTimeoutExitCode',
318 22
                    array($consumer['graceful_max_execution']['exit_code'])
319 22
                );
320
            }
321 22
            if (!$consumer['auto_setup_fabric']) {
322 22
                $definition->addMethodCall('disableAutoSetupFabric');
323
            }
324
325 31
            $this->injectConnection($definition, $consumer['connection']);
326
            if ($this->collectorEnabled) {
327 31
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
328
            }
329 31
330
            if ($consumer['enable_logger']) {
331 22
                $this->injectLogger($definition);
332
            }
333
334
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
335
            $this->container->setDefinition($name, $definition);
336
            if ($consumer['queues_provider']) {
337
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
338 22
            }
339
            foreach ($callbacks as $callback) {
340 22
                $this->addDequeuerAwareCall($callback, $name);
341 22
            }
342 22
        }
343 22
    }
344 22
345 22
    protected function loadDynamicConsumers()
346
    {
347 22
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
348
349
            if (empty($consumer['queue_options_provider'])) {
350
                throw new InvalidConfigurationException(
351
                    "Error on loading $key dynamic consumer. " .
352
                    "'queue_provider' parameter should be defined."
353
                );
354
            }
355 22
356 22
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
357 22
            $definition
358
                ->setPublic(true)
359
                ->addTag('old_sound_rabbit_mq.base_amqp')
360 22
                ->addTag('old_sound_rabbit_mq.consumer')
361
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
362
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
363 22
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
364
365
            foreach ($consumer['groups'] as $group) {
366 22
                $this->groups[] = $group;
367
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
368
            }
369
370
            if (array_key_exists('qos_options', $consumer)) {
371
                $definition->addMethodCall('setQosOptions', array(
372
                    $consumer['qos_options']['prefetch_size'],
373
                    $consumer['qos_options']['prefetch_count'],
374
                    $consumer['qos_options']['global']
375
                ));
376 22
            }
377
378
            $definition->addMethodCall(
379
                'setQueueOptionsProvider',
380 22
                array(new Reference($consumer['queue_options_provider']))
381 22
            );
382 22
383
            if (isset($consumer['idle_timeout'])) {
384
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
385 22
            }
386
            if (isset($consumer['idle_timeout_exit_code'])) {
387
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
388
            }
389 22
            if (isset($consumer['timeout_wait'])) {
390 22
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
391 22
            }
392 22
            if (isset($consumer['graceful_max_execution'])) {
393
                $definition->addMethodCall(
394 31
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
395
                    array($consumer['graceful_max_execution']['timeout'])
396 31
                );
397
                $definition->addMethodCall(
398 31
                    'setGracefulMaxExecutionTimeoutExitCode',
399
                    array($consumer['graceful_max_execution']['exit_code'])
400
                );
401
            }
402
            if (!$consumer['auto_setup_fabric']) {
403
                $definition->addMethodCall('disableAutoSetupFabric');
404
            }
405
406
            $this->injectConnection($definition, $consumer['connection']);
407
            if ($this->collectorEnabled) {
408
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
409
            }
410
411
            if ($consumer['enable_logger']) {
412
                $this->injectLogger($definition);
413
            }
414
415
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
416
            $this->container->setDefinition($name, $definition);
417
            $this->addDequeuerAwareCall($consumer['callback'], $name);
418
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
419
        }
420
    }
421
422
    protected function loadBatchConsumers()
423
    {
424
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
425
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
426
427
            if (!isset($consumer['exchange_options'])) {
428
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
429
            }
430
431
            $definition
432
                ->setPublic(true)
433
                ->addTag('old_sound_rabbit_mq.base_amqp')
434
                ->addTag('old_sound_rabbit_mq.batch_consumer')
435
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
436
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
437
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
438
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
439
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
440
                ->addMethodCall('setQosOptions', array(
441
                    $consumer['qos_options']['prefetch_size'],
442
                    $consumer['qos_options']['prefetch_count'],
443
                    $consumer['qos_options']['global']
444
                ))
445
            ;
446
447
            foreach ($consumer['groups'] as $group) {
448
                $this->groups[] = $group;
449
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
450
            }
451
452
            if (isset($consumer['idle_timeout_exit_code'])) {
453
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
454
            }
455 31
456
            if (isset($consumer['idle_timeout'])) {
457 31
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
458
            }
459 31
460 22
            if (isset($consumer['graceful_max_execution'])) {
461
                $definition->addMethodCall(
462 22
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
463 22
                    array($consumer['graceful_max_execution']['timeout'])
464 22
                );
465 22
            }
466 22
467 22
            if (!$consumer['auto_setup_fabric']) {
468 22
                $definition->addMethodCall('disableAutoSetupFabric');
469 22
            }
470
471
            if ($consumer['keep_alive']) {
472 22
                $definition->addMethodCall('keepAlive');
473 22
            }
474 22
475
            $this->injectConnection($definition, $consumer['connection']);
476 31
            if ($this->collectorEnabled) {
477
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
478
            }
479
480
            if ($consumer['enable_logger']) {
481
                $this->injectLogger($definition);
482
            }
483
484
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
485
        }
486 27
    }
487
488 27
    protected function loadAnonConsumers()
489 1
    {
490
        foreach ($this->config['anon_consumers'] as $key => $anon) {
491 1
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
492
            $definition
493
                ->setPublic(true)
494
                ->addTag('old_sound_rabbit_mq.base_amqp')
495 1
                ->addTag('old_sound_rabbit_mq.anon_consumer')
496 1
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
497 1
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
498
            $this->injectConnection($definition, $anon['connection']);
499
            if ($this->collectorEnabled) {
500 1
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
501
            }
502 1
503
            foreach ($anon['groups'] as $group) {
504 27
                $this->groups[] = $group;
505
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
506
            }
507
508
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
509
            $this->container->setDefinition($name, $definition);
510
            $this->addDequeuerAwareCall($anon['callback'], $name);
511
        }
512
    }
513
    
514
    protected function loadGroupConsumers()
515
    {
516
        $groupConsumers = $this->config['group_consumers'];
517
        $this->groups = array_unique(array_merge($this->groups, array_keys($groupConsumers)));
518
519
        $defaultConfig = array('connection' => 'default', 'auto_setup_fabric' => true, 'enable_logger' => false);
520
        foreach ($this->groups as $key) {
521
            $group = isset($groupConsumers[$key]) ? $groupConsumers[$key] : $defaultConfig;
522
            $definition = new Definition('%old_sound_rabbit_mq.group_consumer.class%');
523
            $definition
524
                ->setPublic(true)
525
                ->addTag('old_sound_rabbit_mq.base_amqp')
526
                ->addTag('old_sound_rabbit_mq.group_consumer')
527
            ;
528
            $connection = isset($group['connection']) ? $group['connection'] : 'default';
529
            $this->injectConnection($definition, $connection);
530
            if ($this->collectorEnabled) {
531 31
                $this->injectLoggedChannel($definition, $key . '_group_consumer', $connection);
532
            }
533 31
534 25
            if ($group['enable_logger']) {
535 25
                $this->injectLogger($definition);
536
            }
537 25
538 25
            $definition->addMethodCall('addConsumers', array(
539 25
                new TaggedIteratorArgument('old_sound_rabbit_mq.consumer.group.' . $key))
540 25
            );
541 25
542
            if (isset($group['timeout_wait'])) {
543 25
                $definition->addMethodCall('setTimeoutWait', array($group['timeout_wait']));
544 25
            }
545
546 25
            if (array_key_exists('qos_options', $group)) {
547 25
                $definition->addMethodCall('setQosOptions', array(
548
                    $group['qos_options']['prefetch_size'],
549 25
                    $group['qos_options']['prefetch_count'],
550
                    $group['qos_options']['global']
551 25
                ));
552
            }
553 31
554
            if (isset($group['idle_timeout'])) {
555 31
                $definition->addMethodCall('setIdleTimeout', array($group['idle_timeout']));
556
            }
557 31
            if (isset($group['idle_timeout_exit_code'])) {
558 22
                $definition->addMethodCall('setIdleTimeoutExitCode', array($group['idle_timeout_exit_code']));
559
            }
560 22
561 22
            if (isset($group['graceful_max_execution'])) {
562 22
                $definition->addMethodCall(
563 22
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
564 22
                    array($group['graceful_max_execution']['timeout'])
565 22
                );
566 22
                $definition->addMethodCall(
567 22
                    'setGracefulMaxExecutionTimeoutExitCode',
568
                    array($group['graceful_max_execution']['exit_code'])
569 22
                );
570
            }
571
572
            if (isset($group['auto_setup_fabric'])) {
573
                if (!$group['auto_setup_fabric']) {
574
                    $definition->addMethodCall('disableAutoSetupFabric');
575
                } else {
576 22
                    // TODO disable grouped consumers
577 22
                }
578
            }
579 22
580 22
            $name = sprintf('old_sound_rabbit_mq.%s_group', $key);
581
            $this->container->setDefinition($name, $definition);
582 22
        }
583 22
    }
584
585 22
    /**
586
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
587 31
     * parameter. So we revert the change for right configurations.
588
     *
589 28
     * @param array $config
590
     *
591 28
     * @return array
592 28
     */
593
    private function normalizeArgumentKeys(array $config)
594 28
    {
595 28
        if (isset($config['arguments'])) {
596 28
            $arguments = $config['arguments'];
597
            // support for old configuration
598 28
            if (is_string($arguments)) {
599
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

599
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
600 28
            }
601 28
602 28
            $newArguments = array();
603
            foreach ($arguments as $key => $value) {
604 30
                if (strstr($key, '_')) {
605
                    $key = str_replace('_', '-', $key);
606 30
                }
607 30
                $newArguments[$key] = $value;
608
            }
609 31
            $config['arguments'] = $newArguments;
610
        }
611 31
        return $config;
612
    }
613
614
    /**
615
     * Support for arguments provided as string. Support for old configuration files.
616
     *
617
     * @deprecated
618
     * @param string $arguments
619
     * @return array
620 26
     */
621
    private function argumentsStringAsArray($arguments)
622 26
    {
623 26
        $argumentsArray = array();
624
625
        $argumentPairs = explode(',', $arguments);
626
        foreach ($argumentPairs as $argument) {
627
            $argumentPair = explode(':', $argument);
628
            $type = 'S';
629
            if (isset($argumentPair[2])) {
630
                $type = $argumentPair[2];
631
            }
632
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
633 1
        }
634
635 1
        return $argumentsArray;
636 1
    }
637
638 1
    protected function loadRpcClients()
639 1
    {
640
        foreach ($this->config['rpc_clients'] as $key => $client) {
641
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
642
            $definition->setLazy($client['lazy']);
643
            $definition
644
                ->addTag('old_sound_rabbit_mq.rpc_client')
645
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
646 1
            $this->injectConnection($definition, $client['connection']);
647
            if ($this->collectorEnabled) {
648
                $this->injectLoggedChannel($definition, $key, $client['connection']);
649 1
            }
650
            if (array_key_exists('unserializer', $client)) {
651
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
652
            }
653
            if (array_key_exists('direct_reply_to', $client)) {
654
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
655
            }
656
            $definition->setPublic(true);
657
658
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
659
        }
660
    }
661 27
662
    protected function loadRpcServers()
663
    {
664 27
        foreach ($this->config['rpc_servers'] as $key => $server) {
665
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
666
            $definition
667
                ->setPublic(true)
668
                ->addTag('old_sound_rabbit_mq.base_amqp')
669
                ->addTag('old_sound_rabbit_mq.rpc_server')
670
                ->addMethodCall('initServer', array($key))
671
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
672
            $this->injectConnection($definition, $server['connection']);
673
            if ($this->collectorEnabled) {
674
                $this->injectLoggedChannel($definition, $key, $server['connection']);
675
            }
676
            if (array_key_exists('qos_options', $server)) {
677
                $definition->addMethodCall('setQosOptions', array(
678
                    $server['qos_options']['prefetch_size'],
679
                    $server['qos_options']['prefetch_count'],
680
                    $server['qos_options']['global']
681
                ));
682
            }
683
            if (array_key_exists('exchange_options', $server)) {
684
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
685
            }
686
            if (array_key_exists('queue_options', $server)) {
687
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
688
            }
689
            if (array_key_exists('serializer', $server)) {
690
                $definition->addMethodCall('setSerializer', array($server['serializer']));
691
            }
692
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
693
        }
694
    }
695
696
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
697
    {
698
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
699
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
700
        $channel
701
            ->setPublic(false)
702
            ->addTag('old_sound_rabbit_mq.logged_channel');
703
        $this->injectConnection($channel, $connectionName);
704
705
        $this->container->setDefinition($id, $channel);
706
707
        $this->channelIds[] = $id;
708
        $definition->addArgument(new Reference($id));
709
    }
710
711
    protected function injectConnection(Definition $definition, $connectionName)
712
    {
713
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
714
    }
715
716
    public function getAlias()
717
    {
718
        return 'old_sound_rabbit_mq';
719
    }
720
721
    /**
722
     * Add proper dequeuer aware call
723
     *
724
     * @param string $callback
725
     * @param string $name
726
     */
727
    protected function addDequeuerAwareCall($callback, $name)
728
    {
729
        if (!$this->container->has($callback)) {
730
            return;
731
        }
732
733
        $callbackDefinition = $this->container->findDefinition($callback);
734
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
735
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
736
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
737
        }
738
    }
739
740
    private function injectLogger(Definition $definition)
741
    {
742
        $definition->addTag('monolog.logger', array(
743
            'channel' => 'phpamqplib'
744
        ));
745
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
746
    }
747
748
    /**
749
     * Get default AMQP exchange options
750
     *
751
     * @return array
752
     */
753
    protected function getDefaultExchangeOptions()
754
    {
755
        return array(
756
            'name' => '',
757
            'type' => 'direct',
758
            'passive' => true,
759
            'declare' => false
760
        );
761
    }
762
763
    /**
764
     * Get default AMQP queue options
765
     *
766
     * @return array
767
     */
768
    protected function getDefaultQueueOptions()
769
    {
770
        return array(
771
            'name' => '',
772
            'declare' => false
773
        );
774
    }
775
}
776