Completed
Pull Request — master (#38)
by Aleksandr
08:12
created

OldSoundRabbitMqExtension::loadProducers()   B

Complexity

Conditions 10
Paths 67

Size

Total Lines 41
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 21
CRAP Score 10.7111

Importance

Changes 4
Bugs 0 Features 0
Metric Value
cc 10
eloc 27
c 4
b 0
f 0
nc 67
nop 0
dl 0
loc 41
ccs 21
cts 26
cp 0.8077
crap 10.7111
rs 7.6666

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
6
use Symfony\Component\DependencyInjection\Argument\TaggedIteratorArgument;
7
use Symfony\Component\DependencyInjection\ContainerInterface;
8
use Symfony\Component\DependencyInjection\Extension\Extension;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\Definition;
11
use Symfony\Component\DependencyInjection\Reference;
12
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
13
use Symfony\Component\Config\FileLocator;
14
15
/**
16
 * OldSoundRabbitMqExtension.
17
 *
18
 * @author Alvaro Videla
19
 * @author Marc Weistroff <[email protected]>
20
 */
21
class OldSoundRabbitMqExtension extends Extension
22
{
23
    /**
24
     * @var ContainerBuilder
25
     */
26
    private $container;
27
28
    /**
29
     * @var Boolean Whether the data collector is enabled
30
     */
31
    private $collectorEnabled;
32
33
    private $channelIds = array();
34
    private $groups = array();
35
36 31
    private $config = array();
37
38 31
    public function load(array $configs, ContainerBuilder $container)
39
    {
40 31
        $this->container = $container;
41 31
42
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
43 31
        $loader->load('rabbitmq.xml');
44 31
45
        $configuration = $this->getConfiguration($configs, $container);
46 31
        $this->config = $this->processConfiguration($configuration, $configs);
47
48 31
        $this->collectorEnabled = $this->config['enable_collector'];
49 31
50 31
        $this->loadConnections();
51 31
        $this->loadBindings();
52 31
        $this->loadProducers();
53 31
        $this->loadConsumers();
54 31
        $this->loadMultipleConsumers();
55 31
        $this->loadDynamicConsumers();
56 31
        $this->loadBatchConsumers();
57 31
        $this->loadAnonConsumers();
58
        $this->loadGroupConsumers();
59 31
        $this->loadRpcClients();
60 28
        $this->loadRpcServers();
61 28
62 28
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
63
            $channels = array();
64
            foreach (array_unique($this->channelIds) as $id) {
65 28
                $channels[] = new Reference($id);
66 28
            }
67
68 3
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
69
            $definition->replaceArgument(0, $channels);
70 31
        } else {
71
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
72 31
        }
73
    }
74 31
75
    public function getConfiguration(array $config, ContainerBuilder $container)
76
    {
77 31
        return new Configuration($this->getAlias());
78
    }
79 31
80 31
    protected function loadConnections()
81
    {
82 31
        foreach ($this->config['connections'] as $key => $connection) {
83 22
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
84 31
            $classParam =
85
                $connection['lazy']
86 31
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
87 31
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
88
89 31
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
90
                $classParam, $connection,
91
            ));
92
            if (isset($connection['connection_parameters_provider'])) {
93 31
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
94 31
                unset($connection['connection_parameters_provider']);
95 31
            }
96
            $definition->setPublic(false);
97 31
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
98 31
            $this->container->setDefinition($factoryName, $definition);
99
100 31
            $definition = new Definition($classParam);
101
            if (method_exists($definition, 'setFactory')) {
102
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
103
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
104
            } else {
105
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
106 31
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107 31
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

107
                $definition->/** @scrutinizer ignore-call */ 
108
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
108
            }
109 31
            $definition->addTag('old_sound_rabbit_mq.connection');
110
            $definition->setPublic(true);
111 31
112
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
113 31
        }
114
    }
115 31
116
    protected function loadBindings()
117
    {
118 31
        if ($this->config['sandbox']) {
119 22
            return;
120 22
        }
121 22
        foreach ($this->config['bindings'] as $binding) {
122 22
            ksort($binding);
123 22
            $definition = new Definition($binding['class']);
124 22
            $definition->addTag('old_sound_rabbit_mq.binding');
125 22
            $definition->addMethodCall('setArguments', array($binding['arguments']));
126 22
            $definition->addMethodCall('setDestination', array($binding['destination']));
127 22
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
128 22
            $definition->addMethodCall('setExchange', array($binding['exchange']));
129 22
            $definition->addMethodCall('isNowait', array($binding['nowait']));
130 22
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
131
            $this->injectConnection($definition, $binding['connection']);
132 22
            $key = md5(json_encode($binding));
133
            if ($this->collectorEnabled) {
134 22
                // in the context of a binding, I don't thing logged channels are needed?
135
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
136 31
            }
137
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
138 31
        }
139
    }
140 31
141 31
    protected function loadProducers()
142 27
    {
143 27
        if ($this->config['sandbox'] == false) {
144 27
            foreach ($this->config['producers'] as $key => $producer) {
145 27
                $definition = new Definition($producer['class']);
146
                $definition->setPublic(true);
147 27
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
148 1
                $definition->addTag('old_sound_rabbit_mq.producer');
149
                //this producer doesn't define an exchange -> using AMQP Default
150 27
                if (!isset($producer['exchange_options'])) {
151
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
152 27
                }
153 27
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
154
                //this producer doesn't define a queue -> using AMQP Default
155 27
                if (!isset($producer['queue_options'])) {
156 27
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
157 27
                }
158 25
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
159
                $this->injectConnection($definition, $producer['connection']);
160 27
                if ($this->collectorEnabled) {
161
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
162
                }
163
                if (!$producer['auto_setup_fabric']) {
164 27
                    $definition->addMethodCall('disableAutoSetupFabric');
165
                }
166
167
                if ($producer['enable_logger']) {
168 27
                    $this->injectLogger($definition);
169
                }
170 27
171 27
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
172 22
173
                $this->container->setDefinition($producerServiceName, $definition);
174
                if (null !== $producer['service_alias']) {
175
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
176
                }
177
            }
178
        } else {
179
            foreach ($this->config['producers'] as $key => $producer) {
180
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
181 31
                $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer', $key), $definition);
182
            }
183 31
        }
184
    }
185 31
186 26
    protected function loadConsumers()
187 26
    {
188 26
        foreach ($this->config['consumers'] as $key => $consumer) {
189 26
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
190
            $definition->setPublic(true);
191 26
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
192
            $definition->addTag('old_sound_rabbit_mq.consumer');
193
            foreach ($consumer['groups'] as $group) {
194 26
                $this->groups[] = $group;
195
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
196 26
            }
197
            //this consumer doesn't define an exchange -> using AMQP Default
198
            if (!isset($consumer['exchange_options'])) {
199 26
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
200 26
            }
201
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
202 26
            //this consumer doesn't define a queue -> using AMQP Default
203 22
            if (!isset($consumer['queue_options'])) {
204 22
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
205 22
            }
206 22
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
207
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
208
209
            if (array_key_exists('qos_options', $consumer)) {
210 26
                $definition->addMethodCall('setQosOptions', array(
211
                    $consumer['qos_options']['prefetch_size'],
212
                    $consumer['qos_options']['prefetch_count'],
213 26
                    $consumer['qos_options']['global']
214
                ));
215
            }
216 26
217
            if (isset($consumer['idle_timeout'])) {
218
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
219
            }
220
            if (isset($consumer['idle_timeout_exit_code'])) {
221
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
222
            }
223
            if (isset($consumer['timeout_wait'])) {
224
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
225
            }
226 26
            if (isset($consumer['graceful_max_execution'])) {
227
                $definition->addMethodCall(
228
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
229
                    array($consumer['graceful_max_execution']['timeout'])
230 26
                );
231 26
                $definition->addMethodCall(
232 25
                    'setGracefulMaxExecutionTimeoutExitCode',
233
                    array($consumer['graceful_max_execution']['exit_code'])
234
                );
235 26
            }
236 1
            if (!$consumer['auto_setup_fabric']) {
237
                $definition->addMethodCall('disableAutoSetupFabric');
238
            }
239 26
240 26
            $this->injectConnection($definition, $consumer['connection']);
241 26
            if ($this->collectorEnabled) {
242
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
243 31
            }
244
245 31
            if ($consumer['enable_logger']) {
246
                $this->injectLogger($definition);
247 31
            }
248 22
249 22
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
250
            $this->container->setDefinition($name, $definition);
251 22
            $this->addDequeuerAwareCall($consumer['callback'], $name);
252
        }
253
    }
254
255
    protected function loadMultipleConsumers()
256
    {
257
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
258 22
            $queues = array();
259 22
            $callbacks = array();
260 22
261 22
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
262
                throw new InvalidConfigurationException(
263
                    "Error on loading $key multiple consumer. " .
264 22
                    "Either 'queues' or 'queues_provider' parameters should be defined."
265
                );
266 22
            }
267 22
268 22
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
269 22
                $queues[$queueOptions['name']] = $queueOptions;
270 22
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
271
                $callbacks[] = $queueOptions['callback'];
272 22
            }
273 22
274 22
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
275 22
            $definition
276
                ->setPublic(true)
277
                ->addTag('old_sound_rabbit_mq.base_amqp')
278
                ->addTag('old_sound_rabbit_mq.multi_consumer')
279 22
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
280
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
281
282
            foreach ($consumer['groups'] as $group) {
283
                $this->groups[] = $group;
284
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
285
            }
286
287 22
            if ($consumer['queues_provider']) {
288
                $definition->addMethodCall(
289
                    'setQueuesProvider',
290 22
                    array(new Reference($consumer['queues_provider']))
291
                );
292
            }
293 22
294
            if (array_key_exists('qos_options', $consumer)) {
295
                $definition->addMethodCall('setQosOptions', array(
296
                    $consumer['qos_options']['prefetch_size'],
297
                    $consumer['qos_options']['prefetch_count'],
298
                    $consumer['qos_options']['global']
299
                ));
300
            }
301
302
            if (isset($consumer['idle_timeout'])) {
303 22
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
304
            }
305
            if (isset($consumer['idle_timeout_exit_code'])) {
306
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
307 22
            }
308 22
            if (isset($consumer['timeout_wait'])) {
309 22
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
310
            }
311
            if (isset($consumer['graceful_max_execution'])) {
312 22
                $definition->addMethodCall(
313
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
314
                    array($consumer['graceful_max_execution']['timeout'])
315
                );
316 22
                $definition->addMethodCall(
317 22
                    'setGracefulMaxExecutionTimeoutExitCode',
318 22
                    array($consumer['graceful_max_execution']['exit_code'])
319 22
                );
320
            }
321 22
            if (!$consumer['auto_setup_fabric']) {
322 22
                $definition->addMethodCall('disableAutoSetupFabric');
323
            }
324
325 31
            $this->injectConnection($definition, $consumer['connection']);
326
            if ($this->collectorEnabled) {
327 31
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
328
            }
329 31
330
            if ($consumer['enable_logger']) {
331 22
                $this->injectLogger($definition);
332
            }
333
334
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
335
            $this->container->setDefinition($name, $definition);
336
            if ($consumer['queues_provider']) {
337
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
338 22
            }
339
            foreach ($callbacks as $callback) {
340 22
                $this->addDequeuerAwareCall($callback, $name);
341 22
            }
342 22
        }
343 22
    }
344 22
345 22
    protected function loadDynamicConsumers()
346
    {
347 22
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
348
349
            if (empty($consumer['queue_options_provider'])) {
350
                throw new InvalidConfigurationException(
351
                    "Error on loading $key dynamic consumer. " .
352
                    "'queue_provider' parameter should be defined."
353
                );
354
            }
355 22
356 22
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
357 22
            $definition
358
                ->setPublic(true)
359
                ->addTag('old_sound_rabbit_mq.base_amqp')
360 22
                ->addTag('old_sound_rabbit_mq.consumer')
361
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
362
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
363 22
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
364
365
            foreach ($consumer['groups'] as $group) {
366 22
                $this->groups[] = $group;
367
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
368
            }
369
370
            if (array_key_exists('qos_options', $consumer)) {
371
                $definition->addMethodCall('setQosOptions', array(
372
                    $consumer['qos_options']['prefetch_size'],
373
                    $consumer['qos_options']['prefetch_count'],
374
                    $consumer['qos_options']['global']
375
                ));
376 22
            }
377
378
            $definition->addMethodCall(
379
                'setQueueOptionsProvider',
380 22
                array(new Reference($consumer['queue_options_provider']))
381 22
            );
382 22
383
            if (isset($consumer['idle_timeout'])) {
384
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
385 22
            }
386
            if (isset($consumer['idle_timeout_exit_code'])) {
387
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
388
            }
389 22
            if (isset($consumer['timeout_wait'])) {
390 22
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
391 22
            }
392 22
            if (isset($consumer['graceful_max_execution'])) {
393
                $definition->addMethodCall(
394 31
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
395
                    array($consumer['graceful_max_execution']['timeout'])
396 31
                );
397
                $definition->addMethodCall(
398 31
                    'setGracefulMaxExecutionTimeoutExitCode',
399
                    array($consumer['graceful_max_execution']['exit_code'])
400
                );
401
            }
402
            if (!$consumer['auto_setup_fabric']) {
403
                $definition->addMethodCall('disableAutoSetupFabric');
404
            }
405
406
            $this->injectConnection($definition, $consumer['connection']);
407
            if ($this->collectorEnabled) {
408
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
409
            }
410
411
            if ($consumer['enable_logger']) {
412
                $this->injectLogger($definition);
413
            }
414
415
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
416
            $this->container->setDefinition($name, $definition);
417
            $this->addDequeuerAwareCall($consumer['callback'], $name);
418
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
419
        }
420
    }
421
422
    protected function loadBatchConsumers()
423
    {
424
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
425
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
426
427
            if (!isset($consumer['exchange_options'])) {
428
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
429
            }
430
431
            $definition
432
                ->setPublic(true)
433
                ->addTag('old_sound_rabbit_mq.base_amqp')
434
                ->addTag('old_sound_rabbit_mq.batch_consumer')
435
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
436
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
437
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
438
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
439
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
440
                ->addMethodCall('setQosOptions', array(
441
                    $consumer['qos_options']['prefetch_size'],
442
                    $consumer['qos_options']['prefetch_count'],
443
                    $consumer['qos_options']['global']
444
                ))
445
            ;
446
447
            foreach ($consumer['groups'] as $group) {
448
                $this->groups[] = $group;
449
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
450
            }
451
452
            if (isset($consumer['idle_timeout_exit_code'])) {
453
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
454
            }
455 31
456
            if (isset($consumer['idle_timeout'])) {
457 31
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
458
            }
459 31
460 22
            if (isset($consumer['graceful_max_execution'])) {
461
                $definition->addMethodCall(
462 22
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
463 22
                    array($consumer['graceful_max_execution']['timeout'])
464 22
                );
465 22
            }
466 22
467 22
            if (!$consumer['auto_setup_fabric']) {
468 22
                $definition->addMethodCall('disableAutoSetupFabric');
469 22
            }
470
471
            if ($consumer['keep_alive']) {
472 22
                $definition->addMethodCall('keepAlive');
473 22
            }
474 22
475
            $this->injectConnection($definition, $consumer['connection']);
476 31
            if ($this->collectorEnabled) {
477
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
478
            }
479
480
            if ($consumer['enable_logger']) {
481
                $this->injectLogger($definition);
482
            }
483
484
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
485
        }
486 27
    }
487
488 27
    protected function loadAnonConsumers()
489 1
    {
490
        foreach ($this->config['anon_consumers'] as $key => $anon) {
491 1
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
492
            $definition
493
                ->setPublic(true)
494
                ->addTag('old_sound_rabbit_mq.base_amqp')
495 1
                ->addTag('old_sound_rabbit_mq.anon_consumer')
496 1
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
497 1
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
498
            $this->injectConnection($definition, $anon['connection']);
499
            if ($this->collectorEnabled) {
500 1
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
501
            }
502 1
503
            foreach ($anon['groups'] as $group) {
504 27
                $this->groups[] = $group;
505
                $definition->addTag('old_sound_rabbit_mq.consumer.group.' . $group);
506
            }
507
508
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
509
            $this->container->setDefinition($name, $definition);
510
            $this->addDequeuerAwareCall($anon['callback'], $name);
511
        }
512
    }
513
    
514
    protected function loadGroupConsumers()
515
    {
516
        $groupConsumers = $this->config['group_consumers'];
517
        $this->groups = array_unique(array_merge($this->groups, array_keys($groupConsumers)));
518
519
        $defaultConfig = array('connection' => 'default', 'auto_setup_fabric' => true);
520
        foreach ($this->groups as $key) {
521
            $group = isset($groupConsumers[$key]) ? $groupConsumers[$key] : $defaultConfig;
522
            $definition = new Definition('%old_sound_rabbit_mq.group_consumer.class%');
523
            $definition
524
                ->setPublic(true)
525
                ->addTag('old_sound_rabbit_mq.base_amqp')
526
                ->addTag('old_sound_rabbit_mq.group_consumer')
527
            ;
528
            $this->injectConnection($definition, $group['connection']);
529
            if ($this->collectorEnabled) {
530
                $this->injectLoggedChannel($definition, $key . '_group_consumer', $group['connection']);
531 31
            }
532
533 31
            $definition->addMethodCall('addConsumers', array(
534 25
                new TaggedIteratorArgument('old_sound_rabbit_mq.consumer.group.' . $key))
535 25
            );
536
537 25
            if (isset($group['timeout_wait'])) {
538 25
                $definition->addMethodCall('setTimeoutWait', array($group['timeout_wait']));
539 25
            }
540 25
541 25
            if (array_key_exists('qos_options', $group)) {
542
                $definition->addMethodCall('setQosOptions', array(
543 25
                    $consumer['qos_options']['prefetch_size'],
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $consumer seems to be never defined.
Loading history...
544 25
                    $consumer['qos_options']['prefetch_count'],
545
                    $consumer['qos_options']['global']
546 25
                ));
547 25
            }
548
549 25
            if (isset($consumer['idle_timeout'])) {
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $consumer seems to never exist and therefore isset should always be false.
Loading history...
550
                $definition->addMethodCall('setIdleTimeout', array($group['idle_timeout']));
551 25
            }
552
            if (isset($consumer['idle_timeout_exit_code'])) {
553 31
                $definition->addMethodCall('setIdleTimeoutExitCode', array($group['idle_timeout_exit_code']));
554
            }
555 31
556
            if (isset($consumer['graceful_max_execution'])) {
557 31
                $definition->addMethodCall(
558 22
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
559
                    array($consumer['graceful_max_execution']['timeout'])
560 22
                );
561 22
                $definition->addMethodCall(
562 22
                    'setGracefulMaxExecutionTimeoutExitCode',
563 22
                    array($consumer['graceful_max_execution']['exit_code'])
564 22
                );
565 22
            }
566 22
567 22
            if (isset($group['auto_setup_fabric'])) {
568
                if (!$group['auto_setup_fabric']) {
569 22
                    $definition->addMethodCall('disableAutoSetupFabric');
570
                } else {
571
                    // TODO disable grouped consumers
572
                }
573
            }
574
575
            $name = sprintf('old_sound_rabbit_mq.%s_group', $key);
576 22
            $this->container->setDefinition($name, $definition);
577 22
        }
578
    }
579 22
580 22
    /**
581
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
582 22
     * parameter. So we revert the change for right configurations.
583 22
     *
584
     * @param array $config
585 22
     *
586
     * @return array
587 31
     */
588
    private function normalizeArgumentKeys(array $config)
589 28
    {
590
        if (isset($config['arguments'])) {
591 28
            $arguments = $config['arguments'];
592 28
            // support for old configuration
593
            if (is_string($arguments)) {
594 28
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

594
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
595 28
            }
596 28
597
            $newArguments = array();
598 28
            foreach ($arguments as $key => $value) {
599
                if (strstr($key, '_')) {
600 28
                    $key = str_replace('_', '-', $key);
601 28
                }
602 28
                $newArguments[$key] = $value;
603
            }
604 30
            $config['arguments'] = $newArguments;
605
        }
606 30
        return $config;
607 30
    }
608
609 31
    /**
610
     * Support for arguments provided as string. Support for old configuration files.
611 31
     *
612
     * @deprecated
613
     * @param string $arguments
614
     * @return array
615
     */
616
    private function argumentsStringAsArray($arguments)
617
    {
618
        $argumentsArray = array();
619
620 26
        $argumentPairs = explode(',', $arguments);
621
        foreach ($argumentPairs as $argument) {
622 26
            $argumentPair = explode(':', $argument);
623 26
            $type = 'S';
624
            if (isset($argumentPair[2])) {
625
                $type = $argumentPair[2];
626
            }
627
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
628
        }
629
630
        return $argumentsArray;
631
    }
632
633 1
    protected function loadRpcClients()
634
    {
635 1
        foreach ($this->config['rpc_clients'] as $key => $client) {
636 1
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
637
            $definition->setLazy($client['lazy']);
638 1
            $definition
639 1
                ->addTag('old_sound_rabbit_mq.rpc_client')
640
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
641
            $this->injectConnection($definition, $client['connection']);
642
            if ($this->collectorEnabled) {
643
                $this->injectLoggedChannel($definition, $key, $client['connection']);
644
            }
645
            if (array_key_exists('unserializer', $client)) {
646 1
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
647
            }
648
            if (array_key_exists('direct_reply_to', $client)) {
649 1
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
650
            }
651
            $definition->setPublic(true);
652
653
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
654
        }
655
    }
656
657
    protected function loadRpcServers()
658
    {
659
        foreach ($this->config['rpc_servers'] as $key => $server) {
660
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
661 27
            $definition
662
                ->setPublic(true)
663
                ->addTag('old_sound_rabbit_mq.base_amqp')
664 27
                ->addTag('old_sound_rabbit_mq.rpc_server')
665
                ->addMethodCall('initServer', array($key))
666
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
667
            $this->injectConnection($definition, $server['connection']);
668
            if ($this->collectorEnabled) {
669
                $this->injectLoggedChannel($definition, $key, $server['connection']);
670
            }
671
            if (array_key_exists('qos_options', $server)) {
672
                $definition->addMethodCall('setQosOptions', array(
673
                    $server['qos_options']['prefetch_size'],
674
                    $server['qos_options']['prefetch_count'],
675
                    $server['qos_options']['global']
676
                ));
677
            }
678
            if (array_key_exists('exchange_options', $server)) {
679
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
680
            }
681
            if (array_key_exists('queue_options', $server)) {
682
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
683
            }
684
            if (array_key_exists('serializer', $server)) {
685
                $definition->addMethodCall('setSerializer', array($server['serializer']));
686
            }
687
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
688
        }
689
    }
690
691
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
692
    {
693
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
694
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
695
        $channel
696
            ->setPublic(false)
697
            ->addTag('old_sound_rabbit_mq.logged_channel');
698
        $this->injectConnection($channel, $connectionName);
699
700
        $this->container->setDefinition($id, $channel);
701
702
        $this->channelIds[] = $id;
703
        $definition->addArgument(new Reference($id));
704
    }
705
706
    protected function injectConnection(Definition $definition, $connectionName)
707
    {
708
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
709
    }
710
711
    public function getAlias()
712
    {
713
        return 'old_sound_rabbit_mq';
714
    }
715
716
    /**
717
     * Add proper dequeuer aware call
718
     *
719
     * @param string $callback
720
     * @param string $name
721
     */
722
    protected function addDequeuerAwareCall($callback, $name)
723
    {
724
        if (!$this->container->has($callback)) {
725
            return;
726
        }
727
728
        $callbackDefinition = $this->container->findDefinition($callback);
729
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
730
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
731
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
732
        }
733
    }
734
735
    private function injectLogger(Definition $definition)
736
    {
737
        $definition->addTag('monolog.logger', array(
738
            'channel' => 'phpamqplib'
739
        ));
740
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
741
    }
742
743
    /**
744
     * Get default AMQP exchange options
745
     *
746
     * @return array
747
     */
748
    protected function getDefaultExchangeOptions()
749
    {
750
        return array(
751
            'name' => '',
752
            'type' => 'direct',
753
            'passive' => true,
754
            'declare' => false
755
        );
756
    }
757
758
    /**
759
     * Get default AMQP queue options
760
     *
761
     * @return array
762
     */
763
    protected function getDefaultQueueOptions()
764
    {
765
        return array(
766
            'name' => '',
767
            'declare' => false
768
        );
769
    }
770
}
771