Passed
Push — master ( e10511...d2a17a )
by Mihai
09:05
created

OldSoundRabbitMqExtension::loadBatchConsumers()   F

Complexity

Conditions 11
Paths 513

Size

Total Lines 65
Code Lines 39

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 34.4744

Importance

Changes 7
Bugs 1 Features 1
Metric Value
cc 11
eloc 39
c 7
b 1
f 1
nc 513
nop 0
dl 0
loc 65
ccs 16
cts 38
cp 0.4211
crap 34.4744
rs 3.8263

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
6
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
7
use Symfony\Component\Config\Definition\ConfigurationInterface;
8
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
9
use Symfony\Component\Config\FileLocator;
10
use Symfony\Component\DependencyInjection\ContainerBuilder;
11
use Symfony\Component\DependencyInjection\ContainerInterface;
12
use Symfony\Component\DependencyInjection\Definition;
13
use Symfony\Component\DependencyInjection\Extension\Extension;
14
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
15
use Symfony\Component\DependencyInjection\Reference;
16
17
/**
18
 * OldSoundRabbitMqExtension.
19
 *
20
 * @author Alvaro Videla
21
 * @author Marc Weistroff <[email protected]>
22
 */
23
class OldSoundRabbitMqExtension extends Extension
24
{
25
    /**
26
     * @var ContainerBuilder
27
     */
28
    private $container;
29
30
    /**
31
     * @var Boolean Whether the data collector is enabled
32
     */
33
    private $collectorEnabled;
34
35
    private $channelIds = [];
36
37
    private $config = [];
38
39 34
    public function load(array $configs, ContainerBuilder $container): void
40
    {
41 34
        $this->container = $container;
42
43 34
        $loader = new XmlFileLoader($this->container, new FileLocator([__DIR__ . '/../Resources/config']));
44 34
        $loader->load('rabbitmq.xml');
45
46 34
        $configuration = $this->getConfiguration($configs, $container);
47 34
        $this->config = $this->processConfiguration($configuration, $configs);
48
49 34
        $this->collectorEnabled = $this->config['enable_collector'];
50
51 34
        $this->loadConnections();
52 34
        $this->loadBindings();
53 34
        $this->loadProducers();
54 34
        $this->loadConsumers();
55 34
        $this->loadMultipleConsumers();
56 34
        $this->loadDynamicConsumers();
57 34
        $this->loadBatchConsumers();
58 34
        $this->loadAnonConsumers();
59 34
        $this->loadRpcClients();
60 34
        $this->loadRpcServers();
61
62 34
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
63 31
            $channels = [];
64 31
            foreach (array_unique($this->channelIds) as $id) {
65 31
                $channels[] = new Reference($id);
66
            }
67
68 31
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
69 31
            $definition->replaceArgument(0, $channels);
70
        } else {
71 3
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
72
        }
73 34
    }
74
75 34
    public function getConfiguration(array $config, ContainerBuilder $container): ?ConfigurationInterface
76
    {
77 34
        return new Configuration($this->getAlias());
78
    }
79
80 34
    protected function loadConnections()
81
    {
82 34
        foreach ($this->config['connections'] as $key => $connection) {
83 34
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
84 34
            $classParam =
85 34
                $connection['lazy']
86 25
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
87 34
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
88
89 34
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', [
90 34
                $classParam, $connection,
91
            ]);
92 34
            if (isset($connection['connection_parameters_provider'])) {
93
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
94
                unset($connection['connection_parameters_provider']);
95
            }
96 34
            $definition->setPublic(false);
97 34
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
98 34
            $this->container->setDefinition($factoryName, $definition);
99
100 34
            $definition = new Definition($classParam);
101 34
            if (method_exists($definition, 'setFactory')) {
102
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
103 34
                $definition->setFactory([new Reference($factoryName), 'createConnection']);
104
            } else {
105
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
106
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

107
                $definition->/** @scrutinizer ignore-call */ 
108
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
108
            }
109 34
            $definition->addTag('old_sound_rabbit_mq.connection');
110 34
            $definition->setPublic(true);
111
112 34
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
113
        }
114 34
    }
115
116 34
    protected function loadBindings()
117
    {
118 34
        if ($this->config['sandbox']) {
119
            return;
120
        }
121 34
        foreach ($this->config['bindings'] as $binding) {
122 25
            ksort($binding);
123 25
            $definition = new Definition($binding['class']);
124 25
            $definition->addTag('old_sound_rabbit_mq.binding');
125 25
            $definition->addMethodCall('setArguments', [$binding['arguments']]);
126 25
            $definition->addMethodCall('setDestination', [$binding['destination']]);
127 25
            $definition->addMethodCall('setDestinationIsExchange', [$binding['destination_is_exchange']]);
128 25
            $definition->addMethodCall('setExchange', [$binding['exchange']]);
129 25
            $definition->addMethodCall('isNowait', [$binding['nowait']]);
130 25
            $definition->addMethodCall('setRoutingKey', [$binding['routing_key']]);
131 25
            $this->injectConnection($definition, $binding['connection']);
132 25
            $key = md5(json_encode($binding));
133 25
            if ($this->collectorEnabled) {
134
                // in the context of a binding, I don't thing logged channels are needed?
135 25
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
136
            }
137 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
138
        }
139 34
    }
140
141 34
    protected function loadProducers()
142
    {
143 34
        if ($this->config['sandbox'] == false) {
144 34
            foreach ($this->config['producers'] as $key => $producer) {
145 30
                $definition = new Definition($producer['class']);
146 30
                $definition->setPublic(true);
147 30
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
148 30
                $definition->addTag('old_sound_rabbit_mq.producer');
149
                //this producer doesn't define an exchange -> using AMQP Default
150 30
                if (!isset($producer['exchange_options'])) {
151 1
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
152
                }
153 30
                $definition->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($producer['exchange_options'])]);
154
                //this producer doesn't define a queue -> using AMQP Default
155 30
                if (!isset($producer['queue_options'])) {
156 30
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
157
                }
158 30
                $definition->addMethodCall('setQueueOptions', [$producer['queue_options']]);
159 30
                $this->injectConnection($definition, $producer['connection']);
160 30
                if ($this->collectorEnabled) {
161 28
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
162
                }
163 30
                if (!$producer['auto_setup_fabric']) {
164
                    $definition->addMethodCall('disableAutoSetupFabric');
165
                }
166
167 30
                if ($producer['enable_logger']) {
168
                    $this->injectLogger($definition);
169
                }
170
171 30
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
172
173 30
                $this->container->setDefinition($producerServiceName, $definition);
174 30
                if (null !== $producer['service_alias']) {
175 25
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
176
                }
177
178
                // register alias for argument auto wiring
179 30
                if (method_exists($this->container, 'registerAliasForArgument')) {
180 30
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
181 30
                    $this->container
182 30
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
183 30
                        ->setPublic(false);
184
185 30
                    $this->container
186 30
                        ->registerAliasForArgument($producerServiceName, $producer['class'], $argName)
187 30
                        ->setPublic(false);
188
                }
189
190 30
                $definition->addMethodCall('setDefaultRoutingKey', [$producer['default_routing_key']]);
191 30
                $definition->addMethodCall('setContentType', [$producer['default_content_type']]);
192 30
                $definition->addMethodCall('setDeliveryMode', [$producer['default_delivery_mode']]);
193
            }
194
        } else {
195
            foreach ($this->config['producers'] as $key => $producer) {
196
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
197
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
198
                $this->container->setDefinition($producerServiceName, $definition);
199
200
                // register alias for argumen auto wiring
201
                if (method_exists($this->container, 'registerAliasForArgument')) {
202
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
203
                    $this->container
204
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
205
                        ->setPublic(false);
206
                }
207
            }
208
        }
209 34
    }
210
211 34
    protected function loadConsumers()
212
    {
213 34
        foreach ($this->config['consumers'] as $key => $consumer) {
214 29
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
215 29
            $definition->setPublic(true);
216 29
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
217 29
            $definition->addTag('old_sound_rabbit_mq.consumer');
218
            //this consumer doesn't define an exchange -> using AMQP Default
219 29
            if (!isset($consumer['exchange_options'])) {
220
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
221
            }
222 29
            $definition->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])]);
223
            //this consumer doesn't define a queue -> using AMQP Default
224 29
            if (!isset($consumer['queue_options'])) {
225
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
226
            }
227 29
            $definition->addMethodCall('setQueueOptions', [$this->normalizeArgumentKeys($consumer['queue_options'])]);
228 29
            $definition->addMethodCall('setCallback', [[new Reference($consumer['callback']), 'execute']]);
229
230 29
            if (array_key_exists('qos_options', $consumer)) {
231 25
                $definition->addMethodCall('setQosOptions', [
232 25
                    $consumer['qos_options']['prefetch_size'],
233 25
                    $consumer['qos_options']['prefetch_count'],
234 25
                    $consumer['qos_options']['global'],
235
                ]);
236
            }
237
238 29
            if (isset($consumer['idle_timeout'])) {
239
                $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]);
240
            }
241 29
            if (isset($consumer['idle_timeout_exit_code'])) {
242
                $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]);
243
            }
244 29
            if (isset($consumer['timeout_wait'])) {
245 25
                $definition->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']]);
246
            }
247 29
            if (isset($consumer['graceful_max_execution'])) {
248
                $definition->addMethodCall(
249
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
250
                    [$consumer['graceful_max_execution']['timeout']]
251
                );
252
                $definition->addMethodCall(
253
                    'setGracefulMaxExecutionTimeoutExitCode',
254
                    [$consumer['graceful_max_execution']['exit_code']]
255
                );
256
            }
257 29
            if (!$consumer['auto_setup_fabric']) {
258
                $definition->addMethodCall('disableAutoSetupFabric');
259
            }
260
            if (isset($consumer['options'])) {
261 29
                $definition->addMethodCall(
262 29
                    'setConsumerOptions',
263 28
                    [$this->normalizeArgumentKeys($consumer['options'])]
264
                );
265
            }
266 29
267 1
            $this->injectConnection($definition, $consumer['connection']);
268
            if ($this->collectorEnabled) {
269
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
270 29
            }
271 29
272 29
            if ($consumer['enable_logger']) {
273
                $this->injectLogger($definition);
274
            }
275 29
276 29
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
277 29
            $this->container->setDefinition($name, $definition);
278 29
            $this->addDequeuerAwareCall($consumer['callback'], $name);
279 29
280
            // register alias for argument auto wiring
281 29
            if (method_exists($this->container, 'registerAliasForArgument')) {
282 29
                $argName = !str_ends_with(strtolower($key), 'consumer') ? sprintf('%sConsumer', $key) : $key;
283 29
                $this->container
284
                    ->registerAliasForArgument($name, ConsumerInterface::class, $argName)
285
                    ->setPublic(false);
286 34
287
                $this->container
288 34
                    ->registerAliasForArgument($name, '%old_sound_rabbit_mq.consumer.class%', $argName)
289
                    ->setPublic(false);
290 34
            }
291 25
        }
292 25
    }
293
294 25
    protected function loadMultipleConsumers()
295
    {
296
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
297
            $queues = [];
298
            $callbacks = [];
299
300
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
301 25
                throw new InvalidConfigurationException(
302 25
                    "Error on loading $key multiple consumer. " .
303 25
                    "Either 'queues' or 'queues_provider' parameters should be defined."
304 25
                );
305
            }
306
307 25
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
308
                $queues[$queueOptions['name']] = $queueOptions;
309 25
                $queues[$queueOptions['name']]['callback'] = [new Reference($queueOptions['callback']), 'execute'];
310 25
                $callbacks[] = $queueOptions['callback'];
311 25
            }
312 25
313 25
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
314
            $definition
315 25
                ->setPublic(true)
316 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
317 25
                ->addTag('old_sound_rabbit_mq.multi_consumer')
318 25
                ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])])
319
                ->addMethodCall('setQueues', [$this->normalizeArgumentKeys($queues)]);
320
321
            if ($consumer['queues_provider']) {
322 25
                $definition->addMethodCall(
323
                    'setQueuesProvider',
324
                    [new Reference($consumer['queues_provider'])]
325
                );
326
            }
327
328
            if (array_key_exists('qos_options', $consumer)) {
329
                $definition->addMethodCall('setQosOptions', [
330 25
                    $consumer['qos_options']['prefetch_size'],
331
                    $consumer['qos_options']['prefetch_count'],
332
                    $consumer['qos_options']['global'],
333 25
                ]);
334
            }
335
336 25
            if (isset($consumer['idle_timeout'])) {
337 25
                $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]);
338
            }
339 25
            if (isset($consumer['idle_timeout_exit_code'])) {
340
                $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]);
341
            }
342
            if (isset($consumer['timeout_wait'])) {
343
                $definition->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']]);
344
            }
345
            if (isset($consumer['graceful_max_execution'])) {
346
                $definition->addMethodCall(
347
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
348
                    [$consumer['graceful_max_execution']['timeout']]
349 25
                );
350
                $definition->addMethodCall(
351
                    'setGracefulMaxExecutionTimeoutExitCode',
352
                    [$consumer['graceful_max_execution']['exit_code']]
353 25
                );
354 25
            }
355 25
            if (!$consumer['auto_setup_fabric']) {
356
                $definition->addMethodCall('disableAutoSetupFabric');
357
            }
358 25
            if (isset($consumer['options'])) {
359
                $definition->addMethodCall(
360
                    'setConsumerOptions',
361
                    [$this->normalizeArgumentKeys($consumer['options'])]
362 25
                );
363 25
            }
364 25
365 25
            $this->injectConnection($definition, $consumer['connection']);
366
            if ($this->collectorEnabled) {
367 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
368 25
            }
369
370
            if ($consumer['enable_logger']) {
371 34
                $this->injectLogger($definition);
372
            }
373 34
374
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
375 34
            $this->container->setDefinition($name, $definition);
376 25
            if ($consumer['queues_provider']) {
377
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
378
            }
379
            foreach ($callbacks as $callback) {
380
                $this->addDequeuerAwareCall($callback, $name);
381
            }
382
        }
383 25
    }
384
385 25
    protected function loadDynamicConsumers()
386 25
    {
387 25
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
388 25
            if (empty($consumer['queue_options_provider'])) {
389 25
                throw new InvalidConfigurationException(
390 25
                    "Error on loading $key dynamic consumer. " .
391
                    "'queue_provider' parameter should be defined."
392 25
                );
393
            }
394
395
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
396
            $definition
397
                ->setPublic(true)
398
                ->addTag('old_sound_rabbit_mq.base_amqp')
399
                ->addTag('old_sound_rabbit_mq.consumer')
400 25
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
401 25
                ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])])
402 25
                ->addMethodCall('setCallback', [[new Reference($consumer['callback']), 'execute']]);
403
404
            if (array_key_exists('qos_options', $consumer)) {
405 25
                $definition->addMethodCall('setQosOptions', [
406
                    $consumer['qos_options']['prefetch_size'],
407
                    $consumer['qos_options']['prefetch_count'],
408 25
                    $consumer['qos_options']['global'],
409
                ]);
410
            }
411 25
412
            $definition->addMethodCall(
413
                'setQueueOptionsProvider',
414 25
                [new Reference($consumer['queue_options_provider'])]
415
            );
416
417
            if (isset($consumer['idle_timeout'])) {
418
                $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]);
419
            }
420
            if (isset($consumer['idle_timeout_exit_code'])) {
421
                $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]);
422
            }
423
            if (isset($consumer['timeout_wait'])) {
424 25
                $definition->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']]);
425
            }
426
            if (isset($consumer['graceful_max_execution'])) {
427
                $definition->addMethodCall(
428 25
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
429 25
                    [$consumer['graceful_max_execution']['timeout']]
430 25
                );
431
                $definition->addMethodCall(
432
                    'setGracefulMaxExecutionTimeoutExitCode',
433 25
                    [$consumer['graceful_max_execution']['exit_code']]
434
                );
435
            }
436
            if (!$consumer['auto_setup_fabric']) {
437 25
                $definition->addMethodCall('disableAutoSetupFabric');
438 25
            }
439 25
            if (isset($consumer['options'])) {
440 25
                $definition->addMethodCall(
441
                    'setConsumerOptions',
442 34
                    [$this->normalizeArgumentKeys($consumer['options'])]
443
                );
444 34
            }
445
446 34
            $this->injectConnection($definition, $consumer['connection']);
447
            if ($this->collectorEnabled) {
448
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
449
            }
450
451
            if ($consumer['enable_logger']) {
452
                $this->injectLogger($definition);
453
            }
454
455
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
456
            $this->container->setDefinition($name, $definition);
457
            $this->addDequeuerAwareCall($consumer['callback'], $name);
458
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
459
        }
460
    }
461
462
    protected function loadBatchConsumers()
463
    {
464
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
465
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
466
467
            if (!isset($consumer['exchange_options'])) {
468
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
469
            }
470
471
            $definition
472
                ->setPublic(true)
473
                ->addTag('old_sound_rabbit_mq.base_amqp')
474
                ->addTag('old_sound_rabbit_mq.batch_consumer')
475
                ->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']])
476
                ->addMethodCall('setPrefetchCount', [$consumer['qos_options']['prefetch_count']])
477
                ->addMethodCall('setCallback', [[new Reference($consumer['callback']), 'batchExecute']])
478
                ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])])
479
                ->addMethodCall('setQueueOptions', [$this->normalizeArgumentKeys($consumer['queue_options'])])
480
                ->addMethodCall('setQosOptions', [
481
                    $consumer['qos_options']['prefetch_size'],
482
                    $consumer['qos_options']['prefetch_count'],
483
                    $consumer['qos_options']['global'],
484
                ])
485
            ;
486
487
            if (isset($consumer['idle_timeout_exit_code'])) {
488
                $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]);
489
            }
490
491
            if (isset($consumer['idle_timeout'])) {
492
                $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]);
493
            }
494
495
            if (isset($consumer['graceful_max_execution'])) {
496
                $definition->addMethodCall(
497
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
498
                    [$consumer['graceful_max_execution']['timeout']]
499
                );
500
            }
501
502
            if (!$consumer['auto_setup_fabric']) {
503 34
                $definition->addMethodCall('disableAutoSetupFabric');
504
            }
505 34
506
            if (isset($consumer['options'])) {
507 34
                $definition->addMethodCall(
508 25
                    'setConsumerOptions',
509
                    [$this->normalizeArgumentKeys($consumer['options'])]
510 25
                );
511 25
            }
512 25
513 25
            if ($consumer['keep_alive']) {
514 25
                $definition->addMethodCall('keepAlive');
515 25
            }
516 25
517 25
            $this->injectConnection($definition, $consumer['connection']);
518
            if ($this->collectorEnabled) {
519
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
520 25
            }
521 25
522 25
            if ($consumer['enable_logger']) {
523
                $this->injectLogger($definition);
524 34
            }
525
526
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
527
        }
528
    }
529
530
    protected function loadAnonConsumers()
531
    {
532
        foreach ($this->config['anon_consumers'] as $key => $anon) {
533
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
534 30
            $definition
535
                ->setPublic(true)
536 30
                ->addTag('old_sound_rabbit_mq.base_amqp')
537 1
                ->addTag('old_sound_rabbit_mq.anon_consumer')
538
                ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($anon['exchange_options'])])
539 1
                ->addMethodCall('setCallback', [[new Reference($anon['callback']), 'execute']]);
540
541
            if (isset($anon['options'])) {
542
                $definition->addMethodCall(
543 1
                    'setConsumerOptions',
544 1
                    [$this->normalizeArgumentKeys($anon['options'])]
545 1
                );
546
            }
547
548 1
            $this->injectConnection($definition, $anon['connection']);
549
            if ($this->collectorEnabled) {
550 1
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
551
            }
552 30
553
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
554
            $this->container->setDefinition($name, $definition);
555
            $this->addDequeuerAwareCall($anon['callback'], $name);
556
        }
557
    }
558
559
    /**
560
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
561
     * parameter. So we revert the change for right configurations.
562
     *
563
     * @param array $config
564
     *
565
     * @return array
566
     */
567
    private function normalizeArgumentKeys(array $config): array
568
    {
569
        if (isset($config['arguments'])) {
570
            $arguments = $config['arguments'];
571
            // support for old configuration
572
            if (is_string($arguments)) {
573
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

573
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
574
            }
575
576
            $newArguments = [];
577
            foreach ($arguments as $key => $value) {
578
                if (strstr($key, '_')) {
579 34
                    $key = str_replace('_', '-', $key);
580
                }
581 34
                $newArguments[$key] = $value;
582 28
            }
583 28
            $config['arguments'] = $newArguments;
584
        }
585 28
        return $config;
586 28
    }
587 28
588 28
    /**
589 28
     * Support for arguments provided as string. Support for old configuration files.
590
     *
591 28
     * @deprecated
592 28
     * @param string $arguments
593
     * @return array
594 28
     */
595 28
    private function argumentsStringAsArray($arguments): array
596
    {
597 28
        $argumentsArray = [];
598
599 28
        $argumentPairs = explode(',', $arguments);
600
        foreach ($argumentPairs as $argument) {
601 34
            $argumentPair = explode(':', $argument);
602
            $type = 'S';
603 34
            if (isset($argumentPair[2])) {
604
                $type = $argumentPair[2];
605 34
            }
606 25
            $argumentsArray[$argumentPair[0]] = [$type, $argumentPair[1]];
607
        }
608 25
609 25
        return $argumentsArray;
610 25
    }
611 25
612 25
    protected function loadRpcClients()
613 25
    {
614 25
        foreach ($this->config['rpc_clients'] as $key => $client) {
615 25
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
616
            $definition->setLazy($client['lazy']);
617 25
            $definition
618
                ->addTag('old_sound_rabbit_mq.rpc_client')
619
                ->addMethodCall('initClient', [$client['expect_serialized_response']]);
620
            $this->injectConnection($definition, $client['connection']);
621
            if ($this->collectorEnabled) {
622
                $this->injectLoggedChannel($definition, $key, $client['connection']);
623
            }
624 25
            if (array_key_exists('unserializer', $client)) {
625 25
                $definition->addMethodCall('setUnserializer', [$client['unserializer']]);
626
            }
627 25
            if (array_key_exists('direct_reply_to', $client)) {
628 25
                $definition->addMethodCall('setDirectReplyTo', [$client['direct_reply_to']]);
629
            }
630 25
            $definition->setPublic(true);
631 25
632
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
633 25
        }
634
    }
635 34
636
    protected function loadRpcServers()
637 31
    {
638
        foreach ($this->config['rpc_servers'] as $key => $server) {
639 31
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
640 31
            $definition
641
                ->setPublic(true)
642 31
                ->addTag('old_sound_rabbit_mq.base_amqp')
643 31
                ->addTag('old_sound_rabbit_mq.rpc_server')
644 31
                ->addMethodCall('initServer', [$key])
645
                ->addMethodCall('setCallback', [[new Reference($server['callback']), 'execute']]);
646 31
            $this->injectConnection($definition, $server['connection']);
647
            if ($this->collectorEnabled) {
648 31
                $this->injectLoggedChannel($definition, $key, $server['connection']);
649 31
            }
650 31
            if (array_key_exists('qos_options', $server)) {
651
                $definition->addMethodCall('setQosOptions', [
652 33
                    $server['qos_options']['prefetch_size'],
653
                    $server['qos_options']['prefetch_count'],
654 33
                    $server['qos_options']['global'],
655 33
                ]);
656
            }
657 34
            if (array_key_exists('exchange_options', $server)) {
658
                $definition->addMethodCall('setExchangeOptions', [$server['exchange_options']]);
659 34
            }
660
            if (array_key_exists('queue_options', $server)) {
661
                $definition->addMethodCall('setQueueOptions', [$server['queue_options']]);
662
            }
663
            if (array_key_exists('serializer', $server)) {
664
                $definition->addMethodCall('setSerializer', [$server['serializer']]);
665
            }
666
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
667
        }
668
    }
669 29
670
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
671 29
    {
672 29
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
673
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
674
        $channel
675
            ->setPublic(false)
676
            ->addTag('old_sound_rabbit_mq.logged_channel');
677
        $this->injectConnection($channel, $connectionName);
678
679
        $this->container->setDefinition($id, $channel);
680
681
        $this->channelIds[] = $id;
682 1
        $definition->addArgument(new Reference($id));
683
    }
684 1
685 1
    protected function injectConnection(Definition $definition, $connectionName)
686
    {
687 1
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
688 1
    }
689
690
    public function getAlias(): string
691
    {
692
        return 'old_sound_rabbit_mq';
693
    }
694
695 1
    /**
696
     * Add proper dequeuer aware call
697
     *
698 1
     * @param string $callback
699
     * @param string $name
700
     * @throws \ReflectionException
701
     */
702
    protected function addDequeuerAwareCall($callback, $name)
703
    {
704
        if (!$this->container->has($callback)) {
705
            return;
706
        }
707
708
        $callbackDefinition = $this->container->findDefinition($callback);
709
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
710 30
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
711
            $callbackDefinition->addMethodCall('setDequeuer', [new Reference($name)]);
712
        }
713 30
    }
714
715
    private function injectLogger(Definition $definition)
716
    {
717
        $definition->addTag('monolog.logger', [
718
            'channel' => 'phpamqplib',
719
        ]);
720
        $definition->addMethodCall('setLogger', [new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)]);
721
    }
722
723
    /**
724
     * Get default AMQP exchange options
725
     *
726
     * @return array
727
     */
728
    protected function getDefaultExchangeOptions(): array
729
    {
730
        return [
731
            'name' => '',
732
            'type' => 'direct',
733
            'passive' => true,
734
            'declare' => false,
735
        ];
736
    }
737
738
    /**
739
     * Get default AMQP queue options
740
     *
741
     * @return array
742
     */
743
    protected function getDefaultQueueOptions(): array
744
    {
745
        return [
746
            'name' => '',
747
            'declare' => false,
748
        ];
749
    }
750
}
751