Passed
Push — master ( 5a8d35...554209 )
by Mihai
07:44
created

OldSoundRabbitMqExtension::loadDynamicConsumers()   C

Complexity

Conditions 11
Paths 258

Size

Total Lines 69
Code Lines 45

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 26
CRAP Score 19.2846

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 11
eloc 45
c 2
b 0
f 0
nc 258
nop 0
dl 0
loc 69
ccs 26
cts 44
cp 0.5909
crap 19.2846
rs 5.7583

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
6
use Symfony\Component\DependencyInjection\ContainerInterface;
7
use Symfony\Component\DependencyInjection\Extension\Extension;
8
use Symfony\Component\DependencyInjection\ContainerBuilder;
9
use Symfony\Component\DependencyInjection\Definition;
10
use Symfony\Component\DependencyInjection\Reference;
11
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
12
use Symfony\Component\Config\FileLocator;
13
14
/**
15
 * OldSoundRabbitMqExtension.
16
 *
17
 * @author Alvaro Videla
18
 * @author Marc Weistroff <[email protected]>
19
 */
20
class OldSoundRabbitMqExtension extends Extension
21
{
22
    /**
23
     * @var ContainerBuilder
24
     */
25
    private $container;
26
27
    /**
28
     * @var Boolean Whether the data collector is enabled
29
     */
30
    private $collectorEnabled;
31
32
    private $channelIds = array();
33
34
    private $config = array();
35
36 31
    public function load(array $configs, ContainerBuilder $container)
37
    {
38 31
        $this->container = $container;
39
40 31
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
41 31
        $loader->load('rabbitmq.xml');
42
43 31
        $configuration = $this->getConfiguration($configs, $container);
44 31
        $this->config = $this->processConfiguration($configuration, $configs);
45
46 31
        $this->collectorEnabled = $this->config['enable_collector'];
47
48 31
        $this->loadConnections();
49 31
        $this->loadBindings();
50 31
        $this->loadProducers();
51 31
        $this->loadConsumers();
52 31
        $this->loadMultipleConsumers();
53 31
        $this->loadDynamicConsumers();
54 31
        $this->loadBatchConsumers();
55 31
        $this->loadAnonConsumers();
56 31
        $this->loadRpcClients();
57 31
        $this->loadRpcServers();
58
59 31
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
60 28
            $channels = array();
61 28
            foreach (array_unique($this->channelIds) as $id) {
62 28
                $channels[] = new Reference($id);
63
            }
64
65 28
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
66 28
            $definition->replaceArgument(0, $channels);
67
        } else {
68 3
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
69
        }
70 31
    }
71
72 31
    public function getConfiguration(array $config, ContainerBuilder $container)
73
    {
74 31
        return new Configuration($this->getAlias());
75
    }
76
77 31
    protected function loadConnections()
78
    {
79 31
        foreach ($this->config['connections'] as $key => $connection) {
80 31
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
81
            $classParam =
82 31
                $connection['lazy']
83 22
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
84 31
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
85
86 31
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
87 31
                $classParam, $connection,
88
            ));
89 31
            if (isset($connection['connection_parameters_provider'])) {
90
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
91
                unset($connection['connection_parameters_provider']);
92
            }
93 31
            $definition->setPublic(false);
94 31
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
95 31
            $this->container->setDefinition($factoryName, $definition);
96
97 31
            $definition = new Definition($classParam);
98 31
            if (method_exists($definition, 'setFactory')) {
99
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
100 31
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
101
            } else {
102
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
103
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

103
                $definition->/** @scrutinizer ignore-call */ 
104
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
104
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

104
                $definition->/** @scrutinizer ignore-call */ 
105
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
105
            }
106 31
            $definition->addTag('old_sound_rabbit_mq.connection');
107 31
            $definition->setPublic(true);
108
109 31
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
110
        }
111 31
    }
112
113 31
    protected function loadBindings()
114
    {
115 31
        if ($this->config['sandbox']) {
116
            return;
117
        }
118 31
        foreach ($this->config['bindings'] as $binding) {
119 22
            ksort($binding);
120 22
            $definition = new Definition($binding['class']);
121 22
            $definition->addTag('old_sound_rabbit_mq.binding');
122 22
            $definition->addMethodCall('setArguments', array($binding['arguments']));
123 22
            $definition->addMethodCall('setDestination', array($binding['destination']));
124 22
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
125 22
            $definition->addMethodCall('setExchange', array($binding['exchange']));
126 22
            $definition->addMethodCall('isNowait', array($binding['nowait']));
127 22
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
128 22
            $this->injectConnection($definition, $binding['connection']);
129 22
            $key = md5(json_encode($binding));
130 22
            if ($this->collectorEnabled) {
131
                // in the context of a binding, I don't thing logged channels are needed?
132 22
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
133
            }
134 22
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
135
        }
136 31
    }
137
138 31
    protected function loadProducers()
139
    {
140 31
        if ($this->config['sandbox'] == false) {
141 31
            foreach ($this->config['producers'] as $key => $producer) {
142 27
                $definition = new Definition($producer['class']);
143 27
                $definition->setPublic(true);
144 27
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
145 27
                $definition->addTag('old_sound_rabbit_mq.producer');
146
                //this producer doesn't define an exchange -> using AMQP Default
147 27
                if (!isset($producer['exchange_options'])) {
148 1
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
149
                }
150 27
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
151
                //this producer doesn't define a queue -> using AMQP Default
152 27
                if (!isset($producer['queue_options'])) {
153 27
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
154
                }
155 27
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
156 27
                $this->injectConnection($definition, $producer['connection']);
157 27
                if ($this->collectorEnabled) {
158 25
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
159
                }
160 27
                if (!$producer['auto_setup_fabric']) {
161
                    $definition->addMethodCall('disableAutoSetupFabric');
162
                }
163
164 27
                if ($producer['enable_logger']) {
165
                    $this->injectLogger($definition);
166
                }
167
168 27
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
169
170 27
                $this->container->setDefinition($producerServiceName, $definition);
171 27
                if (null !== $producer['service_alias']) {
172 22
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
173
                }
174
            }
175
        } else {
176
            foreach ($this->config['producers'] as $key => $producer) {
177
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
178
                $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer', $key), $definition);
179
            }
180
        }
181 31
    }
182
183 31
    protected function loadConsumers()
184
    {
185 31
        foreach ($this->config['consumers'] as $key => $consumer) {
186 26
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
187 26
            $definition->setPublic(true);
188 26
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
189 26
            $definition->addTag('old_sound_rabbit_mq.consumer');
190
            //this consumer doesn't define an exchange -> using AMQP Default
191 26
            if (!isset($consumer['exchange_options'])) {
192
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
193
            }
194 26
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
195
            //this consumer doesn't define a queue -> using AMQP Default
196 26
            if (!isset($consumer['queue_options'])) {
197
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
198
            }
199 26
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
200 26
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
201
202 26
            if (array_key_exists('qos_options', $consumer)) {
203 22
                $definition->addMethodCall('setQosOptions', array(
204 22
                    $consumer['qos_options']['prefetch_size'],
205 22
                    $consumer['qos_options']['prefetch_count'],
206 22
                    $consumer['qos_options']['global']
207
                ));
208
            }
209
210 26
            if (isset($consumer['idle_timeout'])) {
211
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
212
            }
213 26
            if (isset($consumer['idle_timeout_exit_code'])) {
214
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
215
            }
216 26
            if (isset($consumer['timeout_wait'])) {
217
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
218
            }
219
            if (isset($consumer['graceful_max_execution'])) {
220
                $definition->addMethodCall(
221
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
222
                    array($consumer['graceful_max_execution']['timeout'])
223
                );
224
                $definition->addMethodCall(
225
                    'setGracefulMaxExecutionTimeoutExitCode',
226 26
                    array($consumer['graceful_max_execution']['exit_code'])
227
                );
228
            }
229
            if (!$consumer['auto_setup_fabric']) {
230 26
                $definition->addMethodCall('disableAutoSetupFabric');
231 26
            }
232 25
233
            $this->injectConnection($definition, $consumer['connection']);
234
            if ($this->collectorEnabled) {
235 26
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
236 1
            }
237
238
            if ($consumer['enable_logger']) {
239 26
                $this->injectLogger($definition);
240 26
            }
241 26
242
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
243 31
            $this->container->setDefinition($name, $definition);
244
            $this->addDequeuerAwareCall($consumer['callback'], $name);
245 31
        }
246
    }
247 31
248 22
    protected function loadMultipleConsumers()
249 22
    {
250
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
251 22
            $queues = array();
252
            $callbacks = array();
253
254
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
255
                throw new InvalidConfigurationException(
256
                    "Error on loading $key multiple consumer. " .
257
                    "Either 'queues' or 'queues_provider' parameters should be defined."
258 22
                );
259 22
            }
260 22
261 22
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
262
                $queues[$queueOptions['name']] = $queueOptions;
263
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
264 22
                $callbacks[] = $queueOptions['callback'];
265
            }
266 22
267 22
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
268 22
            $definition
269 22
                ->setPublic(true)
270 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
271
                ->addTag('old_sound_rabbit_mq.multi_consumer')
272 22
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
273 22
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
274 22
275 22
            if ($consumer['queues_provider']) {
276
                $definition->addMethodCall(
277
                    'setQueuesProvider',
278
                    array(new Reference($consumer['queues_provider']))
279 22
                );
280
            }
281
282
            if (array_key_exists('qos_options', $consumer)) {
283
                $definition->addMethodCall('setQosOptions', array(
284
                    $consumer['qos_options']['prefetch_size'],
285
                    $consumer['qos_options']['prefetch_count'],
286
                    $consumer['qos_options']['global']
287 22
                ));
288
            }
289
290 22
            if (isset($consumer['idle_timeout'])) {
291
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
292
            }
293 22
            if (isset($consumer['idle_timeout_exit_code'])) {
294
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
295
            }
296
            if (isset($consumer['timeout_wait'])) {
297
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
298
            }
299
            if (isset($consumer['graceful_max_execution'])) {
300
                $definition->addMethodCall(
301
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
302
                    array($consumer['graceful_max_execution']['timeout'])
303 22
                );
304
                $definition->addMethodCall(
305
                    'setGracefulMaxExecutionTimeoutExitCode',
306
                    array($consumer['graceful_max_execution']['exit_code'])
307 22
                );
308 22
            }
309 22
            if (!$consumer['auto_setup_fabric']) {
310
                $definition->addMethodCall('disableAutoSetupFabric');
311
            }
312 22
313
            $this->injectConnection($definition, $consumer['connection']);
314
            if ($this->collectorEnabled) {
315
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
316 22
            }
317 22
318 22
            if ($consumer['enable_logger']) {
319 22
                $this->injectLogger($definition);
320
            }
321 22
322 22
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
323
            $this->container->setDefinition($name, $definition);
324
            if ($consumer['queues_provider']) {
325 31
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
326
            }
327 31
            foreach ($callbacks as $callback) {
328
                $this->addDequeuerAwareCall($callback, $name);
329 31
            }
330
        }
331 22
    }
332
333
    protected function loadDynamicConsumers()
334
    {
335
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
336
337
            if (empty($consumer['queue_options_provider'])) {
338 22
                throw new InvalidConfigurationException(
339
                    "Error on loading $key dynamic consumer. " .
340 22
                    "'queue_provider' parameter should be defined."
341 22
                );
342 22
            }
343 22
344 22
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
345 22
            $definition
346
                ->setPublic(true)
347 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
348
                ->addTag('old_sound_rabbit_mq.consumer')
349
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
350
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
351
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
352
353
            if (array_key_exists('qos_options', $consumer)) {
354
                $definition->addMethodCall('setQosOptions', array(
355 22
                    $consumer['qos_options']['prefetch_size'],
356 22
                    $consumer['qos_options']['prefetch_count'],
357 22
                    $consumer['qos_options']['global']
358
                ));
359
            }
360 22
361
            $definition->addMethodCall(
362
                'setQueueOptionsProvider',
363 22
                array(new Reference($consumer['queue_options_provider']))
364
            );
365
366 22
            if (isset($consumer['idle_timeout'])) {
367
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
368
            }
369
            if (isset($consumer['idle_timeout_exit_code'])) {
370
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
371
            }
372
            if (isset($consumer['timeout_wait'])) {
373
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
374
            }
375
            if (isset($consumer['graceful_max_execution'])) {
376 22
                $definition->addMethodCall(
377
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
378
                    array($consumer['graceful_max_execution']['timeout'])
379
                );
380 22
                $definition->addMethodCall(
381 22
                    'setGracefulMaxExecutionTimeoutExitCode',
382 22
                    array($consumer['graceful_max_execution']['exit_code'])
383
                );
384
            }
385 22
            if (!$consumer['auto_setup_fabric']) {
386
                $definition->addMethodCall('disableAutoSetupFabric');
387
            }
388
389 22
            $this->injectConnection($definition, $consumer['connection']);
390 22
            if ($this->collectorEnabled) {
391 22
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
392 22
            }
393
394 31
            if ($consumer['enable_logger']) {
395
                $this->injectLogger($definition);
396 31
            }
397
398 31
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
399
            $this->container->setDefinition($name, $definition);
400
            $this->addDequeuerAwareCall($consumer['callback'], $name);
401
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
402
        }
403
    }
404
405
    protected function loadBatchConsumers()
406
    {
407
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
408
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
409
410
            if (!isset($consumer['exchange_options'])) {
411
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
412
            }
413
414
            $definition
415
                ->setPublic(true)
416
                ->addTag('old_sound_rabbit_mq.base_amqp')
417
                ->addTag('old_sound_rabbit_mq.batch_consumer')
418
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
419
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
420
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
421
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
422
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
423
                ->addMethodCall('setQosOptions', array(
424
                    $consumer['qos_options']['prefetch_size'],
425
                    $consumer['qos_options']['prefetch_count'],
426
                    $consumer['qos_options']['global']
427
                ))
428
            ;
429
430
            if (isset($consumer['idle_timeout_exit_code'])) {
431
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
432
            }
433
434
            if (isset($consumer['idle_timeout'])) {
435
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
436
            }
437
438
            if (isset($consumer['graceful_max_execution'])) {
439
                $definition->addMethodCall(
440
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
441
                    array($consumer['graceful_max_execution']['timeout'])
442
                );
443
            }
444
445
            if (!$consumer['auto_setup_fabric']) {
446
                $definition->addMethodCall('disableAutoSetupFabric');
447
            }
448
449
            if ($consumer['keep_alive']) {
450
                $definition->addMethodCall('keepAlive');
451
            }
452
453
            $this->injectConnection($definition, $consumer['connection']);
454
            if ($this->collectorEnabled) {
455 31
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
456
            }
457 31
458
            if ($consumer['enable_logger']) {
459 31
                $this->injectLogger($definition);
460 22
            }
461
462 22
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
463 22
        }
464 22
    }
465 22
466 22
    protected function loadAnonConsumers()
467 22
    {
468 22
        foreach ($this->config['anon_consumers'] as $key => $anon) {
469 22
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
470
            $definition
471
                ->setPublic(true)
472 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
473 22
                ->addTag('old_sound_rabbit_mq.anon_consumer')
474 22
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
475
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
476 31
            $this->injectConnection($definition, $anon['connection']);
477
            if ($this->collectorEnabled) {
478
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
479
            }
480
481
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
482
            $this->container->setDefinition($name, $definition);
483
            $this->addDequeuerAwareCall($anon['callback'], $name);
484
        }
485
    }
486 27
487
    /**
488 27
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
489 1
     * parameter. So we revert the change for right configurations.
490
     *
491 1
     * @param array $config
492
     *
493
     * @return array
494
     */
495 1
    private function normalizeArgumentKeys(array $config)
496 1
    {
497 1
        if (isset($config['arguments'])) {
498
            $arguments = $config['arguments'];
499
            // support for old configuration
500 1
            if (is_string($arguments)) {
501
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

501
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
502 1
            }
503
504 27
            $newArguments = array();
505
            foreach ($arguments as $key => $value) {
506
                if (strstr($key, '_')) {
507
                    $key = str_replace('_', '-', $key);
508
                }
509
                $newArguments[$key] = $value;
510
            }
511
            $config['arguments'] = $newArguments;
512
        }
513
        return $config;
514
    }
515
516
    /**
517
     * Support for arguments provided as string. Support for old configuration files.
518
     *
519
     * @deprecated
520
     * @param string $arguments
521
     * @return array
522
     */
523
    private function argumentsStringAsArray($arguments)
524
    {
525
        $argumentsArray = array();
526
527
        $argumentPairs = explode(',', $arguments);
528
        foreach ($argumentPairs as $argument) {
529
            $argumentPair = explode(':', $argument);
530
            $type = 'S';
531 31
            if (isset($argumentPair[2])) {
532
                $type = $argumentPair[2];
533 31
            }
534 25
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
535 25
        }
536
537 25
        return $argumentsArray;
538 25
    }
539 25
540 25
    protected function loadRpcClients()
541 25
    {
542
        foreach ($this->config['rpc_clients'] as $key => $client) {
543 25
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
544 25
            $definition->setLazy($client['lazy']);
545
            $definition
546 25
                ->addTag('old_sound_rabbit_mq.rpc_client')
547 25
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
548
            $this->injectConnection($definition, $client['connection']);
549 25
            if ($this->collectorEnabled) {
550
                $this->injectLoggedChannel($definition, $key, $client['connection']);
551 25
            }
552
            if (array_key_exists('unserializer', $client)) {
553 31
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
554
            }
555 31
            if (array_key_exists('direct_reply_to', $client)) {
556
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
557 31
            }
558 22
            $definition->setPublic(true);
559
560 22
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
561 22
        }
562 22
    }
563 22
564 22
    protected function loadRpcServers()
565 22
    {
566 22
        foreach ($this->config['rpc_servers'] as $key => $server) {
567 22
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
568
            $definition
569 22
                ->setPublic(true)
570
                ->addTag('old_sound_rabbit_mq.base_amqp')
571
                ->addTag('old_sound_rabbit_mq.rpc_server')
572
                ->addMethodCall('initServer', array($key))
573
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
574
            $this->injectConnection($definition, $server['connection']);
575
            if ($this->collectorEnabled) {
576 22
                $this->injectLoggedChannel($definition, $key, $server['connection']);
577 22
            }
578
            if (array_key_exists('qos_options', $server)) {
579 22
                $definition->addMethodCall('setQosOptions', array(
580 22
                    $server['qos_options']['prefetch_size'],
581
                    $server['qos_options']['prefetch_count'],
582 22
                    $server['qos_options']['global']
583 22
                ));
584
            }
585 22
            if (array_key_exists('exchange_options', $server)) {
586
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
587 31
            }
588
            if (array_key_exists('queue_options', $server)) {
589 28
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
590
            }
591 28
            if (array_key_exists('serializer', $server)) {
592 28
                $definition->addMethodCall('setSerializer', array($server['serializer']));
593
            }
594 28
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
595 28
        }
596 28
    }
597
598 28
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
599
    {
600 28
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
601 28
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
602 28
        $channel
603
            ->setPublic(false)
604 30
            ->addTag('old_sound_rabbit_mq.logged_channel');
605
        $this->injectConnection($channel, $connectionName);
606 30
607 30
        $this->container->setDefinition($id, $channel);
608
609 31
        $this->channelIds[] = $id;
610
        $definition->addArgument(new Reference($id));
611 31
    }
612
613
    protected function injectConnection(Definition $definition, $connectionName)
614
    {
615
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
616
    }
617
618
    public function getAlias()
619
    {
620 26
        return 'old_sound_rabbit_mq';
621
    }
622 26
623 26
    /**
624
     * Add proper dequeuer aware call
625
     *
626
     * @param string $callback
627
     * @param string $name
628
     */
629
    protected function addDequeuerAwareCall($callback, $name)
630
    {
631
        if (!$this->container->has($callback)) {
632
            return;
633 1
        }
634
635 1
        $callbackDefinition = $this->container->findDefinition($callback);
636 1
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
637
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
638 1
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
639 1
        }
640
    }
641
642
    private function injectLogger(Definition $definition)
643
    {
644
        $definition->addTag('monolog.logger', array(
645
            'channel' => 'phpamqplib'
646 1
        ));
647
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
648
    }
649 1
650
    /**
651
     * Get default AMQP exchange options
652
     *
653
     * @return array
654
     */
655
    protected function getDefaultExchangeOptions()
656
    {
657
        return array(
658
            'name' => '',
659
            'type' => 'direct',
660
            'passive' => true,
661 27
            'declare' => false
662
        );
663
    }
664 27
665
    /**
666
     * Get default AMQP queue options
667
     *
668
     * @return array
669
     */
670
    protected function getDefaultQueueOptions()
671
    {
672
        return array(
673
            'name' => '',
674
            'declare' => false
675
        );
676
    }
677
}
678