Passed
Pull Request — master (#38)
by Aleksandr
07:19 queued 02:12
created

OldSoundRabbitMqExtension::loadProducers()   B

Complexity

Conditions 10
Paths 67

Size

Total Lines 41
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 10.6352

Importance

Changes 4
Bugs 0 Features 0
Metric Value
cc 10
eloc 27
c 4
b 0
f 0
nc 67
nop 0
dl 0
loc 41
ccs 22
cts 27
cp 0.8148
crap 10.6352
rs 7.6666

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
6
use Symfony\Component\DependencyInjection\ContainerInterface;
7
use Symfony\Component\DependencyInjection\Extension\Extension;
8
use Symfony\Component\DependencyInjection\ContainerBuilder;
9
use Symfony\Component\DependencyInjection\Definition;
10
use Symfony\Component\DependencyInjection\Reference;
11
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
12
use Symfony\Component\Config\FileLocator;
13
14
/**
15
 * OldSoundRabbitMqExtension.
16
 *
17
 * @author Alvaro Videla
18
 * @author Marc Weistroff <[email protected]>
19
 */
20
class OldSoundRabbitMqExtension extends Extension
21
{
22
    /**
23
     * @var ContainerBuilder
24
     */
25
    private $container;
26
27
    /**
28
     * @var Boolean Whether the data collector is enabled
29
     */
30
    private $collectorEnabled;
31
32
    private $channelIds = array();
33
34
    private $config = array();
35
36 31
    public function load(array $configs, ContainerBuilder $container)
37
    {
38 31
        $this->container = $container;
39
40 31
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
41 31
        $loader->load('rabbitmq.xml');
42
43 31
        $configuration = $this->getConfiguration($configs, $container);
44 31
        $this->config = $this->processConfiguration($configuration, $configs);
45
46 31
        $this->collectorEnabled = $this->config['enable_collector'];
47
48 31
        $this->loadConnections();
49 31
        $this->loadBindings();
50 31
        $this->loadProducers();
51 31
        $this->loadConsumers();
52 31
        $this->loadMultipleConsumers();
53 31
        $this->loadDynamicConsumers();
54 31
        $this->loadBatchConsumers();
55 31
        $this->loadAnonConsumers();
56 31
        $this->loadRpcClients();
57 31
        $this->loadRpcServers();
58
59 31
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
60 28
            $channels = array();
61 28
            foreach (array_unique($this->channelIds) as $id) {
62 28
                $channels[] = new Reference($id);
63
            }
64
65 28
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
66 28
            $definition->replaceArgument(0, $channels);
67
        } else {
68 3
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
69
        }
70 31
    }
71
72 31
    public function getConfiguration(array $config, ContainerBuilder $container)
73
    {
74 31
        return new Configuration($this->getAlias());
75
    }
76
77 31
    protected function loadConnections()
78
    {
79 31
        foreach ($this->config['connections'] as $key => $connection) {
80 31
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
81
            $classParam =
82 31
                $connection['lazy']
83 22
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
84 31
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
85
86 31
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
87 31
                $classParam, $connection,
88
            ));
89 31
            if (isset($connection['connection_parameters_provider'])) {
90
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
91
                unset($connection['connection_parameters_provider']);
92
            }
93 31
            $definition->setPublic(false);
94 31
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
95 31
            $this->container->setDefinition($factoryName, $definition);
96
97 31
            $definition = new Definition($classParam);
98 31
            if (method_exists($definition, 'setFactory')) {
99
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
100 31
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
101
            } else {
102
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
103
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

103
                $definition->/** @scrutinizer ignore-call */ 
104
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
104
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

104
                $definition->/** @scrutinizer ignore-call */ 
105
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
105
            }
106 31
            $definition->addTag('old_sound_rabbit_mq.connection');
107 31
            $definition->setPublic(true);
108
109 31
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
110
        }
111 31
    }
112
113 31
    protected function loadBindings()
114
    {
115 31
        if ($this->config['sandbox']) {
116
            return;
117
        }
118 31
        foreach ($this->config['bindings'] as $binding) {
119 22
            ksort($binding);
120 22
            $definition = new Definition($binding['class']);
121 22
            $definition->addTag('old_sound_rabbit_mq.binding');
122 22
            $definition->addMethodCall('setArguments', array($binding['arguments']));
123 22
            $definition->addMethodCall('setDestination', array($binding['destination']));
124 22
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
125 22
            $definition->addMethodCall('setExchange', array($binding['exchange']));
126 22
            $definition->addMethodCall('isNowait', array($binding['nowait']));
127 22
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
128 22
            $this->injectConnection($definition, $binding['connection']);
129 22
            $key = md5(json_encode($binding));
130 22
            if ($this->collectorEnabled) {
131
                // in the context of a binding, I don't thing logged channels are needed?
132 22
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
133
            }
134 22
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
135
        }
136 31
    }
137
138 31
    protected function loadProducers()
139
    {
140 31
        if ($this->config['sandbox'] == false) {
141 31
            foreach ($this->config['producers'] as $key => $producer) {
142 27
                $definition = new Definition($producer['class']);
143 27
                $definition->setPublic(true);
144 27
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
145 27
                $definition->addTag('old_sound_rabbit_mq.producer');
146
                //this producer doesn't define an exchange -> using AMQP Default
147 27
                if (!isset($producer['exchange_options'])) {
148 1
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
149
                }
150 27
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
151
                //this producer doesn't define a queue -> using AMQP Default
152 27
                if (!isset($producer['queue_options'])) {
153 27
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
154
                }
155 27
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
156 27
                $this->injectConnection($definition, $producer['connection']);
157 27
                if ($this->collectorEnabled) {
158 25
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
159
                }
160 27
                if (!$producer['auto_setup_fabric']) {
161
                    $definition->addMethodCall('disableAutoSetupFabric');
162
                }
163
164 27
                if ($producer['enable_logger']) {
165
                    $this->injectLogger($definition);
166
                }
167
168 27
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
169
170 27
                $this->container->setDefinition($producerServiceName, $definition);
171 27
                if (null !== $producer['service_alias']) {
172 31
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
173
                }
174
            }
175
        } else {
176
            foreach ($this->config['producers'] as $key => $producer) {
177
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
178
                $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer', $key), $definition);
179
            }
180
        }
181 31
    }
182
183 31
    protected function loadConsumers()
184
    {
185 31
        foreach ($this->config['consumers'] as $key => $consumer) {
186 26
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
187 26
            $definition->setPublic(true);
188 26
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
189 26
            $definition->addTag('old_sound_rabbit_mq.consumer');
190
            //this consumer doesn't define an exchange -> using AMQP Default
191 26
            if (!isset($consumer['exchange_options'])) {
192
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
193
            }
194 26
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
195
            //this consumer doesn't define a queue -> using AMQP Default
196 26
            if (!isset($consumer['queue_options'])) {
197
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
198
            }
199 26
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
200 26
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
201
202 26
            if (array_key_exists('qos_options', $consumer)) {
203 22
                $definition->addMethodCall('setQosOptions', array(
204 22
                    $consumer['qos_options']['prefetch_size'],
205 22
                    $consumer['qos_options']['prefetch_count'],
206 22
                    $consumer['qos_options']['global']
207
                ));
208
            }
209
210 26
            if (isset($consumer['idle_timeout'])) {
211
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
212
            }
213 26
            if (isset($consumer['idle_timeout_exit_code'])) {
214
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
215
            }
216 26
            if (isset($consumer['groups'])) {
217
                $definition->addMethodCall('setGroups', $consumer['groups']);
218
            }
219 26
            if (isset($consumer['graceful_max_execution'])) {
220
                $definition->addMethodCall(
221
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
222
                    array($consumer['graceful_max_execution']['timeout'])
223
                );
224
                $definition->addMethodCall(
225
                    'setGracefulMaxExecutionTimeoutExitCode',
226
                    array($consumer['graceful_max_execution']['exit_code'])
227
                );
228
            }
229 26
            if (!$consumer['auto_setup_fabric']) {
230
                $definition->addMethodCall('disableAutoSetupFabric');
231
            }
232
233 26
            $this->injectConnection($definition, $consumer['connection']);
234 26
            if ($this->collectorEnabled) {
235 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
236
            }
237
238 26
            if ($consumer['enable_logger']) {
239 1
                $this->injectLogger($definition);
240
            }
241
242 26
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
243 26
            $this->container->setDefinition($name, $definition);
244 26
            $this->addDequeuerAwareCall($consumer['callback'], $name);
245
        }
246 31
    }
247
248 31
    protected function loadMultipleConsumers()
249
    {
250 31
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
251 22
            $queues = array();
252 22
            $callbacks = array();
253
254 22
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
255
                throw new InvalidConfigurationException(
256
                    "Error on loading $key multiple consumer. " .
257
                    "Either 'queues' or 'queues_provider' parameters should be defined."
258
                );
259
            }
260
261 22
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
262 22
                $queues[$queueOptions['name']] = $queueOptions;
263 22
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
264 22
                $callbacks[] = $queueOptions['callback'];
265
            }
266
267 22
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
268
            $definition
269 22
                ->setPublic(true)
270 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
271 22
                ->addTag('old_sound_rabbit_mq.multi_consumer')
272 22
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
273 22
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
274
275 22
            if ($consumer['queues_provider']) {
276 22
                $definition->addMethodCall(
277 22
                    'setQueuesProvider',
278 22
                    array(new Reference($consumer['queues_provider']))
279
                );
280
            }
281
282 22
            if (array_key_exists('qos_options', $consumer)) {
283
                $definition->addMethodCall('setQosOptions', array(
284
                    $consumer['qos_options']['prefetch_size'],
285
                    $consumer['qos_options']['prefetch_count'],
286
                    $consumer['qos_options']['global']
287
                ));
288
            }
289
290 22
            if (isset($consumer['idle_timeout'])) {
291
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
292
            }
293 22
            if (isset($consumer['idle_timeout_exit_code'])) {
294
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
295
            }
296 22
            if (isset($consumer['groups'])) {
297
                $definition->addMethodCall('setGroups', $consumer['groups']);
298
            }
299 22
            if (isset($consumer['graceful_max_execution'])) {
300
                $definition->addMethodCall(
301
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
302
                    array($consumer['graceful_max_execution']['timeout'])
303
                );
304
                $definition->addMethodCall(
305
                    'setGracefulMaxExecutionTimeoutExitCode',
306
                    array($consumer['graceful_max_execution']['exit_code'])
307
                );
308
            }
309 22
            if (!$consumer['auto_setup_fabric']) {
310
                $definition->addMethodCall('disableAutoSetupFabric');
311
            }
312
313 22
            $this->injectConnection($definition, $consumer['connection']);
314 22
            if ($this->collectorEnabled) {
315 22
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
316
            }
317
318 22
            if ($consumer['enable_logger']) {
319
                $this->injectLogger($definition);
320
            }
321
322 22
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
323 22
            $this->container->setDefinition($name, $definition);
324 22
            if ($consumer['queues_provider']) {
325 22
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
326
            }
327 22
            foreach ($callbacks as $callback) {
328 22
                $this->addDequeuerAwareCall($callback, $name);
329
            }
330
        }
331 31
    }
332
333 31
    protected function loadDynamicConsumers()
334
    {
335 31
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
336
337 22
            if (empty($consumer['queue_options_provider'])) {
338
                throw new InvalidConfigurationException(
339
                    "Error on loading $key dynamic consumer. " .
340
                    "'queue_provider' parameter should be defined."
341
                );
342
            }
343
344 22
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
345
            $definition
346 22
                ->setPublic(true)
347 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
348 22
                ->addTag('old_sound_rabbit_mq.consumer')
349 22
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
350 22
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
351 22
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
352
353 22
            if (array_key_exists('qos_options', $consumer)) {
354
                $definition->addMethodCall('setQosOptions', array(
355
                    $consumer['qos_options']['prefetch_size'],
356
                    $consumer['qos_options']['prefetch_count'],
357
                    $consumer['qos_options']['global']
358
                ));
359
            }
360
361 22
            $definition->addMethodCall(
362 22
                'setQueueOptionsProvider',
363 22
                array(new Reference($consumer['queue_options_provider']))
364
            );
365
366 22
            if (isset($consumer['idle_timeout'])) {
367
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
368
            }
369 22
            if (isset($consumer['idle_timeout_exit_code'])) {
370
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
371
            }
372 22
            if (isset($consumer['groups'])) {
373
                $definition->addMethodCall('setGroups', $consumer['groups']);
374
            }
375 22
            if (isset($consumer['graceful_max_execution'])) {
376
                $definition->addMethodCall(
377
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
378
                    array($consumer['graceful_max_execution']['timeout'])
379
                );
380
                $definition->addMethodCall(
381
                    'setGracefulMaxExecutionTimeoutExitCode',
382
                    array($consumer['graceful_max_execution']['exit_code'])
383
                );
384
            }
385 22
            if (!$consumer['auto_setup_fabric']) {
386
                $definition->addMethodCall('disableAutoSetupFabric');
387
            }
388
389 22
            $this->injectConnection($definition, $consumer['connection']);
390 22
            if ($this->collectorEnabled) {
391 22
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
392
            }
393
394 22
            if ($consumer['enable_logger']) {
395
                $this->injectLogger($definition);
396
            }
397
398 22
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
399 22
            $this->container->setDefinition($name, $definition);
400 22
            $this->addDequeuerAwareCall($consumer['callback'], $name);
401 22
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
402
        }
403 31
    }
404
405 31
    protected function loadBatchConsumers()
406
    {
407 31
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
408
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
409
410
            if (!isset($consumer['exchange_options'])) {
411
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
412
            }
413
414
            $definition
415
                ->setPublic(true)
416
                ->addTag('old_sound_rabbit_mq.base_amqp')
417
                ->addTag('old_sound_rabbit_mq.batch_consumer')
418
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
419
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
420
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
421
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
422
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
423
                ->addMethodCall('setQosOptions', array(
424
                    $consumer['qos_options']['prefetch_size'],
425
                    $consumer['qos_options']['prefetch_count'],
426
                    $consumer['qos_options']['global']
427
                ))
428
            ;
429
430
            if (isset($consumer['idle_timeout_exit_code'])) {
431
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
432
            }
433
            if (isset($consumer['groups'])) {
434
                $definition->addMethodCall('setGroups', $consumer['groups']);
435
            }
436
437
            if (isset($consumer['idle_timeout'])) {
438
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
439
            }
440
441
            if (isset($consumer['graceful_max_execution'])) {
442
                $definition->addMethodCall(
443
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
444
                    array($consumer['graceful_max_execution']['timeout'])
445
                );
446
            }
447
448
            if (!$consumer['auto_setup_fabric']) {
449
                $definition->addMethodCall('disableAutoSetupFabric');
450
            }
451
452
            if ($consumer['keep_alive']) {
453
                $definition->addMethodCall('keepAlive');
454
            }
455
456
            $this->injectConnection($definition, $consumer['connection']);
457
            if ($this->collectorEnabled) {
458
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
459
            }
460
461
            if ($consumer['enable_logger']) {
462
                $this->injectLogger($definition);
463
            }
464
465
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
466
        }
467 31
    }
468
469 31
    protected function loadAnonConsumers()
470
    {
471 31
        foreach ($this->config['anon_consumers'] as $key => $anon) {
472 22
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');     
473
            $definition
474 22
                ->setPublic(true)
475 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
476 22
                ->addTag('old_sound_rabbit_mq.anon_consumer')
477 22
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
478 22
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
479 22
            $this->injectConnection($definition, $anon['connection']);
480 22
            if ($this->collectorEnabled) {
481 22
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
482
            }
483
484 22
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
485 22
            $this->container->setDefinition($name, $definition);
486 22
            $this->addDequeuerAwareCall($anon['callback'], $name);
487
        }
488 31
    }
489
490
    /**
491
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
492
     * parameter. So we revert the change for right configurations.
493
     *
494
     * @param array $config
495
     *
496
     * @return array
497
     */
498 27
    private function normalizeArgumentKeys(array $config)
499
    {
500 27
        if (isset($config['arguments'])) {
501 1
            $arguments = $config['arguments'];
502
            // support for old configuration
503 1
            if (is_string($arguments)) {
504
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

504
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
505
            }
506
507 1
            $newArguments = array();
508 1
            foreach ($arguments as $key => $value) {
509 1
                if (strstr($key, '_')) {
510
                    $key = str_replace('_', '-', $key);
511
                }
512 1
                $newArguments[$key] = $value;
513
            }
514 1
            $config['arguments'] = $newArguments;
515
        }
516 27
        return $config;
517
    }
518
519
    /**
520
     * Support for arguments provided as string. Support for old configuration files.
521
     *
522
     * @deprecated
523
     * @param string $arguments
524
     * @return array
525
     */
526
    private function argumentsStringAsArray($arguments)
527
    {
528
        $argumentsArray = array();
529
530
        $argumentPairs = explode(',', $arguments);
531
        foreach ($argumentPairs as $argument) {
532
            $argumentPair = explode(':', $argument);
533
            $type = 'S';
534
            if (isset($argumentPair[2])) {
535
                $type = $argumentPair[2];
536
            }
537
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
538
        }
539
540
        return $argumentsArray;
541
    }
542
543 31
    protected function loadRpcClients()
544
    {
545 31
        foreach ($this->config['rpc_clients'] as $key => $client) {
546 25
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
547 25
            $definition->setLazy($client['lazy']);
548
            $definition
549 25
                ->addTag('old_sound_rabbit_mq.rpc_client')
550 25
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
551 25
            $this->injectConnection($definition, $client['connection']);
552 25
            if ($this->collectorEnabled) {
553 25
                $this->injectLoggedChannel($definition, $key, $client['connection']);
554
            }
555 25
            if (array_key_exists('unserializer', $client)) {
556 25
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
557
            }
558 25
            if (array_key_exists('direct_reply_to', $client)) {
559 25
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
560
            }
561 25
            $definition->setPublic(true);
562
563 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
564
        }
565 31
    }
566
567 31
    protected function loadRpcServers()
568
    {
569 31
        foreach ($this->config['rpc_servers'] as $key => $server) {
570 22
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
571
            $definition
572 22
                ->setPublic(true)
573 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
574 22
                ->addTag('old_sound_rabbit_mq.rpc_server')
575 22
                ->addMethodCall('initServer', array($key))
576 22
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
577 22
            $this->injectConnection($definition, $server['connection']);
578 22
            if ($this->collectorEnabled) {
579 22
                $this->injectLoggedChannel($definition, $key, $server['connection']);
580
            }
581 22
            if (array_key_exists('qos_options', $server)) {
582
                $definition->addMethodCall('setQosOptions', array(
583
                    $server['qos_options']['prefetch_size'],
584
                    $server['qos_options']['prefetch_count'],
585
                    $server['qos_options']['global']
586
                ));
587
            }
588 22
            if (array_key_exists('exchange_options', $server)) {
589 22
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
590
            }
591 22
            if (array_key_exists('queue_options', $server)) {
592 22
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
593
            }
594 22
            if (array_key_exists('serializer', $server)) {
595 22
                $definition->addMethodCall('setSerializer', array($server['serializer']));
596
            }
597 22
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
598
        }
599 31
    }
600
601 28
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
602
    {
603 28
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
604 28
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
605
        $channel
606 28
            ->setPublic(false)
607 28
            ->addTag('old_sound_rabbit_mq.logged_channel');
608 28
        $this->injectConnection($channel, $connectionName);
609
610 28
        $this->container->setDefinition($id, $channel);
611
612 28
        $this->channelIds[] = $id;
613 28
        $definition->addArgument(new Reference($id));
614 28
    }
615
616 30
    protected function injectConnection(Definition $definition, $connectionName)
617
    {
618 30
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
619 30
    }
620
621 31
    public function getAlias()
622
    {
623 31
        return 'old_sound_rabbit_mq';
624
    }
625
626
    /**
627
     * Add proper dequeuer aware call
628
     *
629
     * @param string $callback
630
     * @param string $name
631
     */
632 26
    protected function addDequeuerAwareCall($callback, $name)
633
    {
634 26
        if (!$this->container->has($callback)) {
635 26
            return;
636
        }
637
638
        $callbackDefinition = $this->container->findDefinition($callback);
639
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
640
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
641
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
642
        }
643
    }
644
645 1
    private function injectLogger(Definition $definition)
646
    {
647 1
        $definition->addTag('monolog.logger', array(
648 1
            'channel' => 'phpamqplib'
649
        ));
650 1
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
651 1
    }
652
653
    /**
654
     * Get default AMQP exchange options
655
     *
656
     * @return array
657
     */
658 1
    protected function getDefaultExchangeOptions()
659
    {
660
        return array(
661 1
            'name' => '',
662
            'type' => 'direct',
663
            'passive' => true,
664
            'declare' => false
665
        );
666
    }
667
668
    /**
669
     * Get default AMQP queue options
670
     *
671
     * @return array
672
     */
673 27
    protected function getDefaultQueueOptions()
674
    {
675
        return array(
676 27
            'name' => '',
677
            'declare' => false
678
        );
679
    }
680
}
681