OldSoundRabbitMqExtension::loadProducers()   B
last analyzed

Complexity

Conditions 10
Paths 67

Size

Total Lines 41
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 10.6352

Importance

Changes 4
Bugs 0 Features 0
Metric Value
cc 10
eloc 27
c 4
b 0
f 0
nc 67
nop 0
dl 0
loc 41
ccs 22
cts 27
cp 0.8148
crap 10.6352
rs 7.6666

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
6
use Symfony\Component\DependencyInjection\ContainerInterface;
7
use Symfony\Component\DependencyInjection\Extension\Extension;
8
use Symfony\Component\DependencyInjection\ContainerBuilder;
9
use Symfony\Component\DependencyInjection\Definition;
10
use Symfony\Component\DependencyInjection\Reference;
11
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
12
use Symfony\Component\Config\FileLocator;
13
14
/**
15
 * OldSoundRabbitMqExtension.
16
 *
17
 * @author Alvaro Videla
18
 * @author Marc Weistroff <[email protected]>
19
 */
20
class OldSoundRabbitMqExtension extends Extension
21
{
22
    /**
23
     * @var ContainerBuilder
24
     */
25
    private $container;
26
27
    /**
28
     * @var Boolean Whether the data collector is enabled
29
     */
30
    private $collectorEnabled;
31
32
    private $channelIds = array();
33
34
    private $config = array();
35
36 31
    public function load(array $configs, ContainerBuilder $container)
37
    {
38 31
        $this->container = $container;
39
40 31
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
41 31
        $loader->load('rabbitmq.xml');
42
43 31
        $configuration = $this->getConfiguration($configs, $container);
44 31
        $this->config = $this->processConfiguration($configuration, $configs);
45
46 31
        $this->collectorEnabled = $this->config['enable_collector'];
47
48 31
        $this->loadConnections();
49 31
        $this->loadBindings();
50 31
        $this->loadProducers();
51 31
        $this->loadConsumers();
52 31
        $this->loadMultipleConsumers();
53 31
        $this->loadDynamicConsumers();
54 31
        $this->loadBatchConsumers();
55 31
        $this->loadAnonConsumers();
56 31
        $this->loadRpcClients();
57 31
        $this->loadRpcServers();
58
59 31
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
60 28
            $channels = array();
61 28
            foreach (array_unique($this->channelIds) as $id) {
62 28
                $channels[] = new Reference($id);
63
            }
64
65 28
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
66 28
            $definition->replaceArgument(0, $channels);
67
        } else {
68 3
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
69
        }
70 31
    }
71
72 31
    public function getConfiguration(array $config, ContainerBuilder $container)
73
    {
74 31
        return new Configuration($this->getAlias());
75
    }
76
77 31
    protected function loadConnections()
78
    {
79 31
        foreach ($this->config['connections'] as $key => $connection) {
80 31
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
81
            $classParam =
82 31
                $connection['lazy']
83 22
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
84 31
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
85
86 31
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
87 31
                $classParam, $connection,
88
            ));
89 31
            if (isset($connection['connection_parameters_provider'])) {
90
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
91
                unset($connection['connection_parameters_provider']);
92
            }
93 31
            $definition->setPublic(false);
94 31
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
95 31
            $this->container->setDefinition($factoryName, $definition);
96
97 31
            $definition = new Definition($classParam);
98 31
            if (method_exists($definition, 'setFactory')) {
99
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
100 31
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
101
            } else {
102
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
103
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

103
                $definition->/** @scrutinizer ignore-call */ 
104
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
104
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

104
                $definition->/** @scrutinizer ignore-call */ 
105
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
105
            }
106 31
            $definition->addTag('old_sound_rabbit_mq.connection');
107 31
            $definition->setPublic(true);
108
109 31
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
110
        }
111 31
    }
112
113 31
    protected function loadBindings()
114
    {
115 31
        if ($this->config['sandbox']) {
116
            return;
117
        }
118 31
        foreach ($this->config['bindings'] as $binding) {
119 22
            ksort($binding);
120 22
            $definition = new Definition($binding['class']);
121 22
            $definition->addTag('old_sound_rabbit_mq.binding');
122 22
            $definition->addMethodCall('setArguments', array($binding['arguments']));
123 22
            $definition->addMethodCall('setDestination', array($binding['destination']));
124 22
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
125 22
            $definition->addMethodCall('setExchange', array($binding['exchange']));
126 22
            $definition->addMethodCall('isNowait', array($binding['nowait']));
127 22
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
128 22
            $this->injectConnection($definition, $binding['connection']);
129 22
            $key = md5(json_encode($binding));
130 22
            if ($this->collectorEnabled) {
131
                // in the context of a binding, I don't thing logged channels are needed?
132 22
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
133
            }
134 22
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
135
        }
136 31
    }
137
138 31
    protected function loadProducers()
139
    {
140 31
        if ($this->config['sandbox'] == false) {
141 31
            foreach ($this->config['producers'] as $key => $producer) {
142 27
                $definition = new Definition($producer['class']);
143 27
                $definition->setPublic(true);
144 27
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
145 27
                $definition->addTag('old_sound_rabbit_mq.producer');
146
                //this producer doesn't define an exchange -> using AMQP Default
147 27
                if (!isset($producer['exchange_options'])) {
148 1
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
149
                }
150 27
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
151
                //this producer doesn't define a queue -> using AMQP Default
152 27
                if (!isset($producer['queue_options'])) {
153 27
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
154
                }
155 27
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
156 27
                $this->injectConnection($definition, $producer['connection']);
157 27
                if ($this->collectorEnabled) {
158 25
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
159
                }
160 27
                if (!$producer['auto_setup_fabric']) {
161
                    $definition->addMethodCall('disableAutoSetupFabric');
162
                }
163
164 27
                if ($producer['enable_logger']) {
165
                    $this->injectLogger($definition);
166
                }
167
168 27
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
169
170 27
                $this->container->setDefinition($producerServiceName, $definition);
171 27
                if (null !== $producer['service_alias']) {
172 22
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
173
                }
174
            }
175
        } else {
176
            foreach ($this->config['producers'] as $key => $producer) {
177
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
178
                $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_producer', $key), $definition);
179
            }
180
        }
181 31
    }
182
183 31
    protected function loadConsumers()
184
    {
185 31
        foreach ($this->config['consumers'] as $key => $consumer) {
186 26
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
187 26
            $definition->setPublic(true);
188 26
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
189 26
            $definition->addTag('old_sound_rabbit_mq.consumer');
190
            //this consumer doesn't define an exchange -> using AMQP Default
191 26
            if (!isset($consumer['exchange_options'])) {
192
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
193
            }
194 26
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
195
            //this consumer doesn't define a queue -> using AMQP Default
196 26
            if (!isset($consumer['queue_options'])) {
197
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
198
            }
199 26
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
200 26
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
201
202 26
            if (array_key_exists('qos_options', $consumer)) {
203 22
                $definition->addMethodCall('setQosOptions', array(
204 22
                    $consumer['qos_options']['prefetch_size'],
205 22
                    $consumer['qos_options']['prefetch_count'],
206 22
                    $consumer['qos_options']['global']
207
                ));
208
            }
209
210 26
            if (isset($consumer['idle_timeout'])) {
211
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
212
            }
213 26
            if (isset($consumer['idle_timeout_exit_code'])) {
214
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
215
            }
216 26
            if (isset($consumer['timeout_wait'])) {
217 22
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
218
            }
219 26
            if (isset($consumer['graceful_max_execution'])) {
220
                $definition->addMethodCall(
221
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
222
                    array($consumer['graceful_max_execution']['timeout'])
223
                );
224
                $definition->addMethodCall(
225
                    'setGracefulMaxExecutionTimeoutExitCode',
226
                    array($consumer['graceful_max_execution']['exit_code'])
227
                );
228
            }
229 26
            if (!$consumer['auto_setup_fabric']) {
230
                $definition->addMethodCall('disableAutoSetupFabric');
231
            }
232
233 26
            $this->injectConnection($definition, $consumer['connection']);
234 26
            if ($this->collectorEnabled) {
235 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
236
            }
237
238 26
            if ($consumer['enable_logger']) {
239 1
                $this->injectLogger($definition);
240
            }
241
242 26
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
243 26
            $this->container->setDefinition($name, $definition);
244 26
            $this->addDequeuerAwareCall($consumer['callback'], $name);
245
        }
246 31
    }
247
248 31
    protected function loadMultipleConsumers()
249
    {
250 31
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
251 22
            $queues = array();
252 22
            $callbacks = array();
253
254 22
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
255
                throw new InvalidConfigurationException(
256
                    "Error on loading $key multiple consumer. " .
257
                    "Either 'queues' or 'queues_provider' parameters should be defined."
258
                );
259
            }
260
261 22
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
262 22
                $queues[$queueOptions['name']] = $queueOptions;
263 22
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
264 22
                $callbacks[] = $queueOptions['callback'];
265
            }
266
267 22
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
268
            $definition
269 22
                ->setPublic(true)
270 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
271 22
                ->addTag('old_sound_rabbit_mq.multi_consumer')
272 22
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
273 22
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
274
275 22
            if ($consumer['queues_provider']) {
276 22
                $definition->addMethodCall(
277 22
                    'setQueuesProvider',
278 22
                    array(new Reference($consumer['queues_provider']))
279
                );
280
            }
281
282 22
            if (array_key_exists('qos_options', $consumer)) {
283
                $definition->addMethodCall('setQosOptions', array(
284
                    $consumer['qos_options']['prefetch_size'],
285
                    $consumer['qos_options']['prefetch_count'],
286
                    $consumer['qos_options']['global']
287
                ));
288
            }
289
290 22
            if (isset($consumer['idle_timeout'])) {
291
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
292
            }
293 22
            if (isset($consumer['idle_timeout_exit_code'])) {
294
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
295
            }
296 22
            if (isset($consumer['timeout_wait'])) {
297 22
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
298
            }
299 22
            if (isset($consumer['graceful_max_execution'])) {
300
                $definition->addMethodCall(
301
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
302
                    array($consumer['graceful_max_execution']['timeout'])
303
                );
304
                $definition->addMethodCall(
305
                    'setGracefulMaxExecutionTimeoutExitCode',
306
                    array($consumer['graceful_max_execution']['exit_code'])
307
                );
308
            }
309 22
            if (!$consumer['auto_setup_fabric']) {
310
                $definition->addMethodCall('disableAutoSetupFabric');
311
            }
312
313 22
            $this->injectConnection($definition, $consumer['connection']);
314 22
            if ($this->collectorEnabled) {
315 22
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
316
            }
317
318 22
            if ($consumer['enable_logger']) {
319
                $this->injectLogger($definition);
320
            }
321
322 22
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
323 22
            $this->container->setDefinition($name, $definition);
324 22
            if ($consumer['queues_provider']) {
325 22
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
326
            }
327 22
            foreach ($callbacks as $callback) {
328 22
                $this->addDequeuerAwareCall($callback, $name);
329
            }
330
        }
331 31
    }
332
333 31
    protected function loadDynamicConsumers()
334
    {
335 31
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
336
337 22
            if (empty($consumer['queue_options_provider'])) {
338
                throw new InvalidConfigurationException(
339
                    "Error on loading $key dynamic consumer. " .
340
                    "'queue_provider' parameter should be defined."
341
                );
342
            }
343
344 22
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
345
            $definition
346 22
                ->setPublic(true)
347 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
348 22
                ->addTag('old_sound_rabbit_mq.consumer')
349 22
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
350 22
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
351 22
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
352
353 22
            if (array_key_exists('qos_options', $consumer)) {
354
                $definition->addMethodCall('setQosOptions', array(
355
                    $consumer['qos_options']['prefetch_size'],
356
                    $consumer['qos_options']['prefetch_count'],
357
                    $consumer['qos_options']['global']
358
                ));
359
            }
360
361 22
            $definition->addMethodCall(
362 22
                'setQueueOptionsProvider',
363 22
                array(new Reference($consumer['queue_options_provider']))
364
            );
365
366 22
            if (isset($consumer['idle_timeout'])) {
367
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
368
            }
369 22
            if (isset($consumer['idle_timeout_exit_code'])) {
370
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
371
            }
372 22
            if (isset($consumer['timeout_wait'])) {
373
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
374
            }
375 22
            if (isset($consumer['graceful_max_execution'])) {
376
                $definition->addMethodCall(
377
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
378
                    array($consumer['graceful_max_execution']['timeout'])
379
                );
380
                $definition->addMethodCall(
381
                    'setGracefulMaxExecutionTimeoutExitCode',
382
                    array($consumer['graceful_max_execution']['exit_code'])
383
                );
384
            }
385 22
            if (!$consumer['auto_setup_fabric']) {
386
                $definition->addMethodCall('disableAutoSetupFabric');
387
            }
388
389 22
            $this->injectConnection($definition, $consumer['connection']);
390 22
            if ($this->collectorEnabled) {
391 22
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
392
            }
393
394 22
            if ($consumer['enable_logger']) {
395
                $this->injectLogger($definition);
396
            }
397
398 22
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
399 22
            $this->container->setDefinition($name, $definition);
400 22
            $this->addDequeuerAwareCall($consumer['callback'], $name);
401 22
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
402
        }
403 31
    }
404
405 31
    protected function loadBatchConsumers()
406
    {
407 31
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
408
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
409
410
            if (!isset($consumer['exchange_options'])) {
411
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
412
            }
413
414
            $definition
415
                ->setPublic(true)
416
                ->addTag('old_sound_rabbit_mq.base_amqp')
417
                ->addTag('old_sound_rabbit_mq.batch_consumer')
418
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
419
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
420
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
421
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
422
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
423
                ->addMethodCall('setQosOptions', array(
424
                    $consumer['qos_options']['prefetch_size'],
425
                    $consumer['qos_options']['prefetch_count'],
426
                    $consumer['qos_options']['global']
427
                ))
428
            ;
429
430
            if (isset($consumer['idle_timeout_exit_code'])) {
431
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
432
            }
433
434
            if (isset($consumer['idle_timeout'])) {
435
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
436
            }
437
438
            if (isset($consumer['graceful_max_execution'])) {
439
                $definition->addMethodCall(
440
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
441
                    array($consumer['graceful_max_execution']['timeout'])
442
                );
443
            }
444
445
            if (!$consumer['auto_setup_fabric']) {
446
                $definition->addMethodCall('disableAutoSetupFabric');
447
            }
448
449
            if ($consumer['keep_alive']) {
450
                $definition->addMethodCall('keepAlive');
451
            }
452
453
            $this->injectConnection($definition, $consumer['connection']);
454
            if ($this->collectorEnabled) {
455
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
456
            }
457
458
            if ($consumer['enable_logger']) {
459
                $this->injectLogger($definition);
460
            }
461
462
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
463
        }
464 31
    }
465
466 31
    protected function loadAnonConsumers()
467
    {
468 31
        foreach ($this->config['anon_consumers'] as $key => $anon) {
469 22
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
470
            $definition
471 22
                ->setPublic(true)
472 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
473 22
                ->addTag('old_sound_rabbit_mq.anon_consumer')
474 22
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
475 22
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
476 22
            $this->injectConnection($definition, $anon['connection']);
477 22
            if ($this->collectorEnabled) {
478 22
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
479
            }
480
481 22
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
482 22
            $this->container->setDefinition($name, $definition);
483 22
            $this->addDequeuerAwareCall($anon['callback'], $name);
484
        }
485 31
    }
486
487
    /**
488
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
489
     * parameter. So we revert the change for right configurations.
490
     *
491
     * @param array $config
492
     *
493
     * @return array
494
     */
495 27
    private function normalizeArgumentKeys(array $config)
496
    {
497 27
        if (isset($config['arguments'])) {
498 1
            $arguments = $config['arguments'];
499
            // support for old configuration
500 1
            if (is_string($arguments)) {
501
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

501
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
502
            }
503
504 1
            $newArguments = array();
505 1
            foreach ($arguments as $key => $value) {
506 1
                if (strstr($key, '_')) {
507
                    $key = str_replace('_', '-', $key);
508
                }
509 1
                $newArguments[$key] = $value;
510
            }
511 1
            $config['arguments'] = $newArguments;
512
        }
513 27
        return $config;
514
    }
515
516
    /**
517
     * Support for arguments provided as string. Support for old configuration files.
518
     *
519
     * @deprecated
520
     * @param string $arguments
521
     * @return array
522
     */
523
    private function argumentsStringAsArray($arguments)
524
    {
525
        $argumentsArray = array();
526
527
        $argumentPairs = explode(',', $arguments);
528
        foreach ($argumentPairs as $argument) {
529
            $argumentPair = explode(':', $argument);
530
            $type = 'S';
531
            if (isset($argumentPair[2])) {
532
                $type = $argumentPair[2];
533
            }
534
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
535
        }
536
537
        return $argumentsArray;
538
    }
539
540 31
    protected function loadRpcClients()
541
    {
542 31
        foreach ($this->config['rpc_clients'] as $key => $client) {
543 25
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
544 25
            $definition->setLazy($client['lazy']);
545
            $definition
546 25
                ->addTag('old_sound_rabbit_mq.rpc_client')
547 25
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
548 25
            $this->injectConnection($definition, $client['connection']);
549 25
            if ($this->collectorEnabled) {
550 25
                $this->injectLoggedChannel($definition, $key, $client['connection']);
551
            }
552 25
            if (array_key_exists('unserializer', $client)) {
553 25
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
554
            }
555 25
            if (array_key_exists('direct_reply_to', $client)) {
556 25
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
557
            }
558 25
            $definition->setPublic(true);
559
560 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
561
        }
562 31
    }
563
564 31
    protected function loadRpcServers()
565
    {
566 31
        foreach ($this->config['rpc_servers'] as $key => $server) {
567 22
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
568
            $definition
569 22
                ->setPublic(true)
570 22
                ->addTag('old_sound_rabbit_mq.base_amqp')
571 22
                ->addTag('old_sound_rabbit_mq.rpc_server')
572 22
                ->addMethodCall('initServer', array($key))
573 22
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
574 22
            $this->injectConnection($definition, $server['connection']);
575 22
            if ($this->collectorEnabled) {
576 22
                $this->injectLoggedChannel($definition, $key, $server['connection']);
577
            }
578 22
            if (array_key_exists('qos_options', $server)) {
579
                $definition->addMethodCall('setQosOptions', array(
580
                    $server['qos_options']['prefetch_size'],
581
                    $server['qos_options']['prefetch_count'],
582
                    $server['qos_options']['global']
583
                ));
584
            }
585 22
            if (array_key_exists('exchange_options', $server)) {
586 22
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
587
            }
588 22
            if (array_key_exists('queue_options', $server)) {
589 22
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
590
            }
591 22
            if (array_key_exists('serializer', $server)) {
592 22
                $definition->addMethodCall('setSerializer', array($server['serializer']));
593
            }
594 22
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
595
        }
596 31
    }
597
598 28
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
599
    {
600 28
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
601 28
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
602
        $channel
603 28
            ->setPublic(false)
604 28
            ->addTag('old_sound_rabbit_mq.logged_channel');
605 28
        $this->injectConnection($channel, $connectionName);
606
607 28
        $this->container->setDefinition($id, $channel);
608
609 28
        $this->channelIds[] = $id;
610 28
        $definition->addArgument(new Reference($id));
611 28
    }
612
613 30
    protected function injectConnection(Definition $definition, $connectionName)
614
    {
615 30
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
616 30
    }
617
618 31
    public function getAlias()
619
    {
620 31
        return 'old_sound_rabbit_mq';
621
    }
622
623
    /**
624
     * Add proper dequeuer aware call
625
     *
626
     * @param string $callback
627
     * @param string $name
628
     */
629 26
    protected function addDequeuerAwareCall($callback, $name)
630
    {
631 26
        if (!$this->container->has($callback)) {
632 26
            return;
633
        }
634
635
        $callbackDefinition = $this->container->findDefinition($callback);
636
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
637
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
638
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
639
        }
640
    }
641
642 1
    private function injectLogger(Definition $definition)
643
    {
644 1
        $definition->addTag('monolog.logger', array(
645 1
            'channel' => 'phpamqplib'
646
        ));
647 1
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
648 1
    }
649
650
    /**
651
     * Get default AMQP exchange options
652
     *
653
     * @return array
654
     */
655 1
    protected function getDefaultExchangeOptions()
656
    {
657
        return array(
658 1
            'name' => '',
659
            'type' => 'direct',
660
            'passive' => true,
661
            'declare' => false
662
        );
663
    }
664
665
    /**
666
     * Get default AMQP queue options
667
     *
668
     * @return array
669
     */
670 27
    protected function getDefaultQueueOptions()
671
    {
672
        return array(
673 27
            'name' => '',
674
            'declare' => false
675
        );
676
    }
677
}
678