Passed
Pull Request — master (#662)
by
unknown
09:20
created

OldSoundRabbitMqExtension::loadProducers()   F

Complexity

Conditions 16
Paths 773

Size

Total Lines 75
Code Lines 52

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 38
CRAP Score 18.8966

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 16
eloc 52
c 3
b 0
f 0
nc 773
nop 0
dl 0
loc 75
ccs 38
cts 49
cp 0.7755
crap 18.8966
rs 1.7152

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
6
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
7
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
8
use Symfony\Component\Config\FileLocator;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\ContainerInterface;
11
use Symfony\Component\DependencyInjection\Definition;
12
use Symfony\Component\DependencyInjection\Extension\Extension;
13
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
14
use Symfony\Component\DependencyInjection\Reference;
15
16
/**
17
 * OldSoundRabbitMqExtension.
18
 *
19
 * @author Alvaro Videla
20
 * @author Marc Weistroff <[email protected]>
21
 */
22
class OldSoundRabbitMqExtension extends Extension
23
{
24
    /**
25
     * @var ContainerBuilder
26
     */
27
    private $container;
28
29
    /**
30
     * @var Boolean Whether the data collector is enabled
31
     */
32
    private $collectorEnabled;
33
34
    private $channelIds = array();
35
36
    private $config = array();
37
38 34
    public function load(array $configs, ContainerBuilder $container)
39
    {
40 34
        $this->container = $container;
41
42 34
        $loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
43 34
        $loader->load('rabbitmq.xml');
44
45 34
        $configuration = $this->getConfiguration($configs, $container);
46 34
        $this->config = $this->processConfiguration($configuration, $configs);
47
48 34
        $this->collectorEnabled = $this->config['enable_collector'];
49
50 34
        $this->loadConnections();
51 34
        $this->loadBindings();
52 34
        $this->loadProducers();
53 34
        $this->loadConsumers();
54 34
        $this->loadMultipleConsumers();
55 34
        $this->loadDynamicConsumers();
56 34
        $this->loadBatchConsumers();
57 34
        $this->loadAnonConsumers();
58 34
        $this->loadRpcClients();
59 34
        $this->loadRpcServers();
60
61 34
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
62 31
            $channels = array();
63 31
            foreach (array_unique($this->channelIds) as $id) {
64 31
                $channels[] = new Reference($id);
65
            }
66
67 31
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
68 31
            $definition->replaceArgument(0, $channels);
69
        } else {
70 3
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
71
        }
72 34
    }
73
74 34
    public function getConfiguration(array $config, ContainerBuilder $container)
75
    {
76 34
        return new Configuration($this->getAlias());
77
    }
78
79 34
    protected function loadConnections()
80
    {
81 34
        foreach ($this->config['connections'] as $key => $connection) {
82 34
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
83
            $classParam =
84 34
                $connection['lazy']
85 25
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
86 34
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
87
88 34
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
89 34
                $classParam, $connection,
90
            ));
91 34
            if (isset($connection['connection_parameters_provider'])) {
92
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
93
                unset($connection['connection_parameters_provider']);
94
            }
95 34
            $definition->setPublic(false);
96 34
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
97 34
            $this->container->setDefinition($factoryName, $definition);
98
99 34
            $definition = new Definition($classParam);
100 34
            if (method_exists($definition, 'setFactory')) {
101
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
102 34
                $definition->setFactory(array(new Reference($factoryName), 'createConnection'));
103
            } else {
104
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
105
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

105
                $definition->/** @scrutinizer ignore-call */ 
106
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
106
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107
            }
108 34
            $definition->addTag('old_sound_rabbit_mq.connection');
109 34
            $definition->setPublic(true);
110
111 34
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
112
        }
113 34
    }
114
115 34
    protected function loadBindings()
116
    {
117 34
        if ($this->config['sandbox']) {
118
            return;
119
        }
120 34
        foreach ($this->config['bindings'] as $binding) {
121 25
            ksort($binding);
122 25
            $definition = new Definition($binding['class']);
123 25
            $definition->addTag('old_sound_rabbit_mq.binding');
124 25
            $definition->addMethodCall('setArguments', array($binding['arguments']));
125 25
            $definition->addMethodCall('setDestination', array($binding['destination']));
126 25
            $definition->addMethodCall('setDestinationIsExchange', array($binding['destination_is_exchange']));
127 25
            $definition->addMethodCall('setExchange', array($binding['exchange']));
128 25
            $definition->addMethodCall('isNowait', array($binding['nowait']));
129 25
            $definition->addMethodCall('setRoutingKey', array($binding['routing_key']));
130 25
            $this->injectConnection($definition, $binding['connection']);
131 25
            $key = md5(json_encode($binding));
132 25
            if ($this->collectorEnabled) {
133
                // in the context of a binding, I don't thing logged channels are needed?
134 25
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
135
            }
136 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
137
        }
138 34
    }
139
140 34
    protected function loadProducers()
141
    {
142 34
        if ($this->config['sandbox'] == false) {
143 34
            foreach ($this->config['producers'] as $key => $producer) {
144 30
                $definition = new Definition($producer['class']);
145 30
                $definition->setPublic(true);
146 30
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
147 30
                $definition->addTag('old_sound_rabbit_mq.producer');
148
                //this producer doesn't define an exchange -> using AMQP Default
149 30
                if (!isset($producer['exchange_options'])) {
150 1
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
151
                }
152 30
                $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($producer['exchange_options'])));
153
                //this producer doesn't define a queue -> using AMQP Default
154 30
                if (!isset($producer['queue_options'])) {
155 30
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
156
                }
157 30
                $definition->addMethodCall('setQueueOptions', array($producer['queue_options']));
158 30
                $this->injectConnection($definition, $producer['connection']);
159 30
                if ($this->collectorEnabled) {
160 28
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
161
                }
162 30
                if (!$producer['auto_setup_fabric']) {
163
                    $definition->addMethodCall('disableAutoSetupFabric');
164
                }
165
                if (isset($producer['confirm_select'])) {
166 30
                    $definition->addArgument(null);
167
                    $confirmSelect = boolval($producer['confirm_select']);
168
                    $definition->addArgument($confirmSelect);
169
                }
170 30
                $confirmationTimeout = 0;
171
                if (isset($producer['confirm_timeout'])) {
172 30
                    $confirmationTimeout = intval($producer['confirm_timeout']);
173 30
                }
174 25
                $definition->addMethodCall('setConfirmationTimeout', array($confirmationTimeout));
175
176
                if ($producer['enable_logger']) {
177
                    $this->injectLogger($definition);
178 30
                }
179 30
180 30
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
181 30
182 30
                $this->container->setDefinition($producerServiceName, $definition);
183
                if (null !== $producer['service_alias']) {
184 30
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
185 30
                }
186 30
187
                // register alias for argument auto wiring
188
                if (method_exists($this->container, 'registerAliasForArgument')) {
189 30
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
190 30
                    $this->container
191 30
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
192
                        ->setPublic(false);
193
194
                    $this->container
195
                        ->registerAliasForArgument($producerServiceName, $producer['class'], $argName)
196
                        ->setPublic(false);
197
                }
198
199
                $definition->addMethodCall('setDefaultRoutingKey', array($producer['default_routing_key']));
200
                $definition->addMethodCall('setContentType', array($producer['default_content_type']));
201
                $definition->addMethodCall('setDeliveryMode', array($producer['default_delivery_mode']));
202
            }
203
        } else {
204
            foreach ($this->config['producers'] as $key => $producer) {
205
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
206
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
207
                $this->container->setDefinition($producerServiceName, $definition);
208 34
209
                // register alias for argumen auto wiring
210 34
                if (method_exists($this->container, 'registerAliasForArgument')) {
211
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
212 34
                    $this->container
213 29
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
214 29
                        ->setPublic(false);
215 29
                }
216 29
            }
217
        }
218 29
    }
219
220
    protected function loadConsumers()
221 29
    {
222
        foreach ($this->config['consumers'] as $key => $consumer) {
223 29
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
224
            $definition->setPublic(true);
225
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
226 29
            $definition->addTag('old_sound_rabbit_mq.consumer');
227 29
            //this consumer doesn't define an exchange -> using AMQP Default
228
            if (!isset($consumer['exchange_options'])) {
229 29
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
230 25
            }
231 25
            $definition->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])));
232 25
            //this consumer doesn't define a queue -> using AMQP Default
233 25
            if (!isset($consumer['queue_options'])) {
234
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
235
            }
236
            $definition->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])));
237 29
            $definition->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
238
239
            if (array_key_exists('qos_options', $consumer)) {
240 29
                $definition->addMethodCall('setQosOptions', array(
241
                    $consumer['qos_options']['prefetch_size'],
242
                    $consumer['qos_options']['prefetch_count'],
243 29
                    $consumer['qos_options']['global']
244 25
                ));
245
            }
246 29
247
            if (isset($consumer['idle_timeout'])) {
248
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
249
            }
250
            if (isset($consumer['idle_timeout_exit_code'])) {
251
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
252
            }
253
            if (isset($consumer['timeout_wait'])) {
254
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
255
            }
256 29
            if (isset($consumer['graceful_max_execution'])) {
257
                $definition->addMethodCall(
258
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
259
                    array($consumer['graceful_max_execution']['timeout'])
260 29
                );
261 29
                $definition->addMethodCall(
262 28
                    'setGracefulMaxExecutionTimeoutExitCode',
263
                    array($consumer['graceful_max_execution']['exit_code'])
264
                );
265 29
            }
266 1
            if (!$consumer['auto_setup_fabric']) {
267
                $definition->addMethodCall('disableAutoSetupFabric');
268
            }
269 29
270 29
            $this->injectConnection($definition, $consumer['connection']);
271 29
            if ($this->collectorEnabled) {
272
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
273
            }
274 29
275 29
            if ($consumer['enable_logger']) {
276 29
                $this->injectLogger($definition);
277 29
            }
278 29
279
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
280 29
            $this->container->setDefinition($name, $definition);
281 29
            $this->addDequeuerAwareCall($consumer['callback'], $name);
282 29
283
            // register alias for argument auto wiring
284
            if (method_exists($this->container, 'registerAliasForArgument')) {
285 34
                $argName = !str_ends_with(strtolower($key), 'consumer') ? sprintf('%sConsumer', $key) : $key;
286
                $this->container
287 34
                    ->registerAliasForArgument($name, ConsumerInterface::class, $argName)
288
                    ->setPublic(false);
289 34
290 25
                $this->container
291 25
                    ->registerAliasForArgument($name, '%old_sound_rabbit_mq.consumer.class%', $argName)
292
                    ->setPublic(false);
293 25
                }
294
        }
295
    }
296
297
    protected function loadMultipleConsumers()
298
    {
299
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
300 25
            $queues = array();
301 25
            $callbacks = array();
302 25
303 25
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
304
                throw new InvalidConfigurationException(
305
                    "Error on loading $key multiple consumer. " .
306 25
                    "Either 'queues' or 'queues_provider' parameters should be defined."
307
                );
308 25
            }
309 25
310 25
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
311 25
                $queues[$queueOptions['name']] = $queueOptions;
312 25
                $queues[$queueOptions['name']]['callback'] = array(new Reference($queueOptions['callback']), 'execute');
313
                $callbacks[] = $queueOptions['callback'];
314 25
            }
315 25
316 25
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
317 25
            $definition
318
                ->setPublic(true)
319
                ->addTag('old_sound_rabbit_mq.base_amqp')
320
                ->addTag('old_sound_rabbit_mq.multi_consumer')
321 25
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
322
                ->addMethodCall('setQueues', array($this->normalizeArgumentKeys($queues)));
323
324
            if ($consumer['queues_provider']) {
325
                $definition->addMethodCall(
326
                    'setQueuesProvider',
327
                    array(new Reference($consumer['queues_provider']))
328
                );
329 25
            }
330
331
            if (array_key_exists('qos_options', $consumer)) {
332 25
                $definition->addMethodCall('setQosOptions', array(
333
                    $consumer['qos_options']['prefetch_size'],
334
                    $consumer['qos_options']['prefetch_count'],
335 25
                    $consumer['qos_options']['global']
336 25
                ));
337
            }
338 25
339
            if (isset($consumer['idle_timeout'])) {
340
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
341
            }
342
            if (isset($consumer['idle_timeout_exit_code'])) {
343
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
344
            }
345
            if (isset($consumer['timeout_wait'])) {
346
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
347
            }
348 25
            if (isset($consumer['graceful_max_execution'])) {
349
                $definition->addMethodCall(
350
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
351
                    array($consumer['graceful_max_execution']['timeout'])
352 25
                );
353 25
                $definition->addMethodCall(
354 25
                    'setGracefulMaxExecutionTimeoutExitCode',
355
                    array($consumer['graceful_max_execution']['exit_code'])
356
                );
357 25
            }
358
            if (!$consumer['auto_setup_fabric']) {
359
                $definition->addMethodCall('disableAutoSetupFabric');
360
            }
361 25
362 25
            $this->injectConnection($definition, $consumer['connection']);
363 25
            if ($this->collectorEnabled) {
364 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
365
            }
366 25
367 25
            if ($consumer['enable_logger']) {
368
                $this->injectLogger($definition);
369
            }
370 34
371
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
372 34
            $this->container->setDefinition($name, $definition);
373
            if ($consumer['queues_provider']) {
374 34
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
375
            }
376 25
            foreach ($callbacks as $callback) {
377
                $this->addDequeuerAwareCall($callback, $name);
378
            }
379
        }
380
    }
381
382
    protected function loadDynamicConsumers()
383 25
    {
384
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
385 25
386 25
            if (empty($consumer['queue_options_provider'])) {
387 25
                throw new InvalidConfigurationException(
388 25
                    "Error on loading $key dynamic consumer. " .
389 25
                    "'queue_provider' parameter should be defined."
390 25
                );
391
            }
392 25
393
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
394
            $definition
395
                ->setPublic(true)
396
                ->addTag('old_sound_rabbit_mq.base_amqp')
397
                ->addTag('old_sound_rabbit_mq.consumer')
398
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
399
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
400 25
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
401 25
402 25
            if (array_key_exists('qos_options', $consumer)) {
403
                $definition->addMethodCall('setQosOptions', array(
404
                    $consumer['qos_options']['prefetch_size'],
405 25
                    $consumer['qos_options']['prefetch_count'],
406
                    $consumer['qos_options']['global']
407
                ));
408 25
            }
409
410
            $definition->addMethodCall(
411 25
                'setQueueOptionsProvider',
412
                array(new Reference($consumer['queue_options_provider']))
413
            );
414 25
415
            if (isset($consumer['idle_timeout'])) {
416
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
417
            }
418
            if (isset($consumer['idle_timeout_exit_code'])) {
419
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
420
            }
421
            if (isset($consumer['timeout_wait'])) {
422
                $definition->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']));
423
            }
424 25
            if (isset($consumer['graceful_max_execution'])) {
425
                $definition->addMethodCall(
426
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
427
                    array($consumer['graceful_max_execution']['timeout'])
428 25
                );
429 25
                $definition->addMethodCall(
430 25
                    'setGracefulMaxExecutionTimeoutExitCode',
431
                    array($consumer['graceful_max_execution']['exit_code'])
432
                );
433 25
            }
434
            if (!$consumer['auto_setup_fabric']) {
435
                $definition->addMethodCall('disableAutoSetupFabric');
436
            }
437 25
438 25
            $this->injectConnection($definition, $consumer['connection']);
439 25
            if ($this->collectorEnabled) {
440 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
441
            }
442 34
443
            if ($consumer['enable_logger']) {
444 34
                $this->injectLogger($definition);
445
            }
446 34
447
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
448
            $this->container->setDefinition($name, $definition);
449
            $this->addDequeuerAwareCall($consumer['callback'], $name);
450
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
451
        }
452
    }
453
454
    protected function loadBatchConsumers()
455
    {
456
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
457
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
458
459
            if (!isset($consumer['exchange_options'])) {
460
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
461
            }
462
463
            $definition
464
                ->setPublic(true)
465
                ->addTag('old_sound_rabbit_mq.base_amqp')
466
                ->addTag('old_sound_rabbit_mq.batch_consumer')
467
                ->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
468
                ->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
469
                ->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
470
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
471
                ->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
472
                ->addMethodCall('setQosOptions', array(
473
                    $consumer['qos_options']['prefetch_size'],
474
                    $consumer['qos_options']['prefetch_count'],
475
                    $consumer['qos_options']['global']
476
                ))
477
            ;
478
479
            if (isset($consumer['idle_timeout_exit_code'])) {
480
                $definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
481
            }
482
483
            if (isset($consumer['idle_timeout'])) {
484
                $definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
485
            }
486
487
            if (isset($consumer['graceful_max_execution'])) {
488
                $definition->addMethodCall(
489
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
490
                    array($consumer['graceful_max_execution']['timeout'])
491
                );
492
            }
493
494
            if (!$consumer['auto_setup_fabric']) {
495
                $definition->addMethodCall('disableAutoSetupFabric');
496
            }
497
498
            if ($consumer['keep_alive']) {
499
                $definition->addMethodCall('keepAlive');
500
            }
501
502
            $this->injectConnection($definition, $consumer['connection']);
503 34
            if ($this->collectorEnabled) {
504
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
505 34
            }
506
507 34
            if ($consumer['enable_logger']) {
508 25
                $this->injectLogger($definition);
509
            }
510 25
511 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
512 25
        }
513 25
    }
514 25
515 25
    protected function loadAnonConsumers()
516 25
    {
517 25
        foreach ($this->config['anon_consumers'] as $key => $anon) {
518
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
519
            $definition
520 25
                ->setPublic(true)
521 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
522 25
                ->addTag('old_sound_rabbit_mq.anon_consumer')
523
                ->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($anon['exchange_options'])))
524 34
                ->addMethodCall('setCallback', array(array(new Reference($anon['callback']), 'execute')));
525
            $this->injectConnection($definition, $anon['connection']);
526
            if ($this->collectorEnabled) {
527
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
528
            }
529
530
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
531
            $this->container->setDefinition($name, $definition);
532
            $this->addDequeuerAwareCall($anon['callback'], $name);
533
        }
534 30
    }
535
536 30
    /**
537 1
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
538
     * parameter. So we revert the change for right configurations.
539 1
     *
540
     * @param array $config
541
     *
542
     * @return array
543 1
     */
544 1
    private function normalizeArgumentKeys(array $config)
545 1
    {
546
        if (isset($config['arguments'])) {
547
            $arguments = $config['arguments'];
548 1
            // support for old configuration
549
            if (is_string($arguments)) {
550 1
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

550
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
551
            }
552 30
553
            $newArguments = array();
554
            foreach ($arguments as $key => $value) {
555
                if (strstr($key, '_')) {
556
                    $key = str_replace('_', '-', $key);
557
                }
558
                $newArguments[$key] = $value;
559
            }
560
            $config['arguments'] = $newArguments;
561
        }
562
        return $config;
563
    }
564
565
    /**
566
     * Support for arguments provided as string. Support for old configuration files.
567
     *
568
     * @deprecated
569
     * @param string $arguments
570
     * @return array
571
     */
572
    private function argumentsStringAsArray($arguments)
573
    {
574
        $argumentsArray = array();
575
576
        $argumentPairs = explode(',', $arguments);
577
        foreach ($argumentPairs as $argument) {
578
            $argumentPair = explode(':', $argument);
579 34
            $type = 'S';
580
            if (isset($argumentPair[2])) {
581 34
                $type = $argumentPair[2];
582 28
            }
583 28
            $argumentsArray[$argumentPair[0]] = array($type, $argumentPair[1]);
584
        }
585 28
586 28
        return $argumentsArray;
587 28
    }
588 28
589 28
    protected function loadRpcClients()
590
    {
591 28
        foreach ($this->config['rpc_clients'] as $key => $client) {
592 28
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
593
            $definition->setLazy($client['lazy']);
594 28
            $definition
595 28
                ->addTag('old_sound_rabbit_mq.rpc_client')
596
                ->addMethodCall('initClient', array($client['expect_serialized_response']));
597 28
            $this->injectConnection($definition, $client['connection']);
598
            if ($this->collectorEnabled) {
599 28
                $this->injectLoggedChannel($definition, $key, $client['connection']);
600
            }
601 34
            if (array_key_exists('unserializer', $client)) {
602
                $definition->addMethodCall('setUnserializer', array($client['unserializer']));
603 34
            }
604
            if (array_key_exists('direct_reply_to', $client)) {
605 34
                $definition->addMethodCall('setDirectReplyTo', array($client['direct_reply_to']));
606 25
            }
607
            $definition->setPublic(true);
608 25
609 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
610 25
        }
611 25
    }
612 25
613 25
    protected function loadRpcServers()
614 25
    {
615 25
        foreach ($this->config['rpc_servers'] as $key => $server) {
616
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
617 25
            $definition
618
                ->setPublic(true)
619
                ->addTag('old_sound_rabbit_mq.base_amqp')
620
                ->addTag('old_sound_rabbit_mq.rpc_server')
621
                ->addMethodCall('initServer', array($key))
622
                ->addMethodCall('setCallback', array(array(new Reference($server['callback']), 'execute')));
623
            $this->injectConnection($definition, $server['connection']);
624 25
            if ($this->collectorEnabled) {
625 25
                $this->injectLoggedChannel($definition, $key, $server['connection']);
626
            }
627 25
            if (array_key_exists('qos_options', $server)) {
628 25
                $definition->addMethodCall('setQosOptions', array(
629
                    $server['qos_options']['prefetch_size'],
630 25
                    $server['qos_options']['prefetch_count'],
631 25
                    $server['qos_options']['global']
632
                ));
633 25
            }
634
            if (array_key_exists('exchange_options', $server)) {
635 34
                $definition->addMethodCall('setExchangeOptions', array($server['exchange_options']));
636
            }
637 31
            if (array_key_exists('queue_options', $server)) {
638
                $definition->addMethodCall('setQueueOptions', array($server['queue_options']));
639 31
            }
640 31
            if (array_key_exists('serializer', $server)) {
641
                $definition->addMethodCall('setSerializer', array($server['serializer']));
642 31
            }
643 31
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
644 31
        }
645
    }
646 31
647
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
648 31
    {
649 31
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
650 31
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
651
        $channel
652 33
            ->setPublic(false)
653
            ->addTag('old_sound_rabbit_mq.logged_channel');
654 33
        $this->injectConnection($channel, $connectionName);
655 33
656
        $this->container->setDefinition($id, $channel);
657 34
658
        $this->channelIds[] = $id;
659 34
        $definition->addArgument(new Reference($id));
660
    }
661
662
    protected function injectConnection(Definition $definition, $connectionName)
663
    {
664
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
665
    }
666
667
    public function getAlias()
668 29
    {
669
        return 'old_sound_rabbit_mq';
670 29
    }
671 29
672
    /**
673
     * Add proper dequeuer aware call
674
     *
675
     * @param string $callback
676
     * @param string $name
677
     */
678
    protected function addDequeuerAwareCall($callback, $name)
679
    {
680
        if (!$this->container->has($callback)) {
681 1
            return;
682
        }
683 1
684 1
        $callbackDefinition = $this->container->findDefinition($callback);
685
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
686 1
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
687 1
            $callbackDefinition->addMethodCall('setDequeuer', array(new Reference($name)));
688
        }
689
    }
690
691
    private function injectLogger(Definition $definition)
692
    {
693
        $definition->addTag('monolog.logger', array(
694 1
            'channel' => 'phpamqplib'
695
        ));
696
        $definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
697 1
    }
698
699
    /**
700
     * Get default AMQP exchange options
701
     *
702
     * @return array
703
     */
704
    protected function getDefaultExchangeOptions()
705
    {
706
        return array(
707
            'name' => '',
708
            'type' => 'direct',
709 30
            'passive' => true,
710
            'declare' => false
711
        );
712 30
    }
713
714
    /**
715
     * Get default AMQP queue options
716
     *
717
     * @return array
718
     */
719
    protected function getDefaultQueueOptions()
720
    {
721
        return array(
722
            'name' => '',
723
            'declare' => false
724
        );
725
    }
726
}
727