Passed
Pull Request — master (#662)
by
unknown
13:11
created

OldSoundRabbitMqExtension::loadProducers()   F

Complexity

Conditions 16
Paths 773

Size

Total Lines 75
Code Lines 52

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 38
CRAP Score 18.8966

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 16
eloc 52
c 2
b 0
f 0
nc 773
nop 0
dl 0
loc 75
ccs 38
cts 49
cp 0.7755
crap 18.8966
rs 1.7152

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
6
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
7
use Symfony\Component\Config\Definition\ConfigurationInterface;
8
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
9
use Symfony\Component\Config\FileLocator;
10
use Symfony\Component\DependencyInjection\ContainerBuilder;
11
use Symfony\Component\DependencyInjection\ContainerInterface;
12
use Symfony\Component\DependencyInjection\Definition;
13
use Symfony\Component\DependencyInjection\Extension\Extension;
14
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
15
use Symfony\Component\DependencyInjection\Reference;
16
17
/**
18
 * OldSoundRabbitMqExtension.
19
 *
20
 * @author Alvaro Videla
21
 * @author Marc Weistroff <[email protected]>
22
 */
23
class OldSoundRabbitMqExtension extends Extension
24
{
25
    /**
26
     * @var ContainerBuilder
27
     */
28
    private $container;
29
30
    /**
31
     * @var Boolean Whether the data collector is enabled
32
     */
33
    private $collectorEnabled;
34
35
    private $channelIds = [];
36
37
    private $config = [];
38
39 34
    public function load(array $configs, ContainerBuilder $container)
40
    {
41 34
        $this->container = $container;
42
43 34
        $loader = new XmlFileLoader($this->container, new FileLocator([__DIR__ . '/../Resources/config']));
44 34
        $loader->load('rabbitmq.xml');
45
46 34
        $configuration = $this->getConfiguration($configs, $container);
47 34
        $this->config = $this->processConfiguration($configuration, $configs);
48
49 34
        $this->collectorEnabled = $this->config['enable_collector'];
50
51 34
        $this->loadConnections();
52 34
        $this->loadBindings();
53 34
        $this->loadProducers();
54 34
        $this->loadConsumers();
55 34
        $this->loadMultipleConsumers();
56 34
        $this->loadDynamicConsumers();
57 34
        $this->loadBatchConsumers();
58 34
        $this->loadAnonConsumers();
59 34
        $this->loadRpcClients();
60 34
        $this->loadRpcServers();
61
62 34
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
63 31
            $channels = [];
64 31
            foreach (array_unique($this->channelIds) as $id) {
65 31
                $channels[] = new Reference($id);
66
            }
67
68 31
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
69 31
            $definition->replaceArgument(0, $channels);
70
        } else {
71 3
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
72
        }
73 34
    }
74
75 34
    public function getConfiguration(array $config, ContainerBuilder $container): ?ConfigurationInterface
76
    {
77 34
        return new Configuration($this->getAlias());
78
    }
79
80 34
    protected function loadConnections()
81
    {
82 34
        foreach ($this->config['connections'] as $key => $connection) {
83 34
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
84 34
            $classParam =
85 34
                $connection['lazy']
86 25
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
87 34
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
88
89 34
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', [
90 34
                $classParam, $connection,
91
            ]);
92 34
            if (isset($connection['connection_parameters_provider'])) {
93
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
94
                unset($connection['connection_parameters_provider']);
95
            }
96 34
            $definition->setPublic(false);
97 34
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
98 34
            $this->container->setDefinition($factoryName, $definition);
99
100 34
            $definition = new Definition($classParam);
101 34
            if (method_exists($definition, 'setFactory')) {
102
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
103 34
                $definition->setFactory([new Reference($factoryName), 'createConnection']);
104
            } else {
105
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
106
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

107
                $definition->/** @scrutinizer ignore-call */ 
108
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
108
            }
109 34
            $definition->addTag('old_sound_rabbit_mq.connection');
110 34
            $definition->setPublic(true);
111
112 34
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
113
        }
114 34
    }
115
116 34
    protected function loadBindings()
117
    {
118 34
        if ($this->config['sandbox']) {
119
            return;
120
        }
121 34
        foreach ($this->config['bindings'] as $binding) {
122 25
            ksort($binding);
123 25
            $definition = new Definition($binding['class']);
124 25
            $definition->addTag('old_sound_rabbit_mq.binding');
125 25
            $definition->addMethodCall('setArguments', [$binding['arguments']]);
126 25
            $definition->addMethodCall('setDestination', [$binding['destination']]);
127 25
            $definition->addMethodCall('setDestinationIsExchange', [$binding['destination_is_exchange']]);
128 25
            $definition->addMethodCall('setExchange', [$binding['exchange']]);
129 25
            $definition->addMethodCall('isNowait', [$binding['nowait']]);
130 25
            $definition->addMethodCall('setRoutingKey', [$binding['routing_key']]);
131 25
            $this->injectConnection($definition, $binding['connection']);
132 25
            $key = md5(json_encode($binding));
133 25
            if ($this->collectorEnabled) {
134
                // in the context of a binding, I don't thing logged channels are needed?
135 25
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
136
            }
137 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
138
        }
139 34
    }
140
141 34
    protected function loadProducers()
142
    {
143 34
        if ($this->config['sandbox'] == false) {
144 34
            foreach ($this->config['producers'] as $key => $producer) {
145 30
                $definition = new Definition($producer['class']);
146 30
                $definition->setPublic(true);
147 30
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
148 30
                $definition->addTag('old_sound_rabbit_mq.producer');
149
                //this producer doesn't define an exchange -> using AMQP Default
150 30
                if (!isset($producer['exchange_options'])) {
151 1
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
152
                }
153 30
                $definition->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($producer['exchange_options'])]);
154
                //this producer doesn't define a queue -> using AMQP Default
155 30
                if (!isset($producer['queue_options'])) {
156 30
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
157
                }
158 30
                $definition->addMethodCall('setQueueOptions', [$producer['queue_options']]);
159 30
                $this->injectConnection($definition, $producer['connection']);
160 30
                if ($this->collectorEnabled) {
161 28
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
162
                }
163 30
                if (!$producer['auto_setup_fabric']) {
164
                    $definition->addMethodCall('disableAutoSetupFabric');
165
                }
166
                if (isset($producer['confirm_select'])) {
167 30
                    $definition->addArgument(null);
168
                    $confirmSelect = boolval($producer['confirm_select']);
169
                    $definition->addArgument($confirmSelect);
170
                }
171 30
                $confirmationTimeout = 0;
172
                if (isset($producer['confirm_timeout'])) {
173 30
                    $confirmationTimeout = intval($producer['confirm_timeout']);
174 30
                }
175 25
                $definition->addMethodCall('setConfirmationTimeout', array($confirmationTimeout));
176
177
                if ($producer['enable_logger']) {
178
                    $this->injectLogger($definition);
179 30
                }
180 30
181 30
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
182 30
183 30
                $this->container->setDefinition($producerServiceName, $definition);
184
                if (null !== $producer['service_alias']) {
185 30
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
186 30
                }
187 30
188
                // register alias for argument auto wiring
189
                if (method_exists($this->container, 'registerAliasForArgument')) {
190 30
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
191 30
                    $this->container
192 30
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
193
                        ->setPublic(false);
194
195
                    $this->container
196
                        ->registerAliasForArgument($producerServiceName, $producer['class'], $argName)
197
                        ->setPublic(false);
198
                }
199
200
                $definition->addMethodCall('setDefaultRoutingKey', [$producer['default_routing_key']]);
201
                $definition->addMethodCall('setContentType', [$producer['default_content_type']]);
202
                $definition->addMethodCall('setDeliveryMode', [$producer['default_delivery_mode']]);
203
            }
204
        } else {
205
            foreach ($this->config['producers'] as $key => $producer) {
206
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
207
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
208
                $this->container->setDefinition($producerServiceName, $definition);
209 34
210
                // register alias for argumen auto wiring
211 34
                if (method_exists($this->container, 'registerAliasForArgument')) {
212
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
213 34
                    $this->container
214 29
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
215 29
                        ->setPublic(false);
216 29
                }
217 29
            }
218
        }
219 29
    }
220
221
    protected function loadConsumers()
222 29
    {
223
        foreach ($this->config['consumers'] as $key => $consumer) {
224 29
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
225
            $definition->setPublic(true);
226
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
227 29
            $definition->addTag('old_sound_rabbit_mq.consumer');
228 29
            //this consumer doesn't define an exchange -> using AMQP Default
229
            if (!isset($consumer['exchange_options'])) {
230 29
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
231 25
            }
232 25
            $definition->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])]);
233 25
            //this consumer doesn't define a queue -> using AMQP Default
234 25
            if (!isset($consumer['queue_options'])) {
235
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
236
            }
237
            $definition->addMethodCall('setQueueOptions', [$this->normalizeArgumentKeys($consumer['queue_options'])]);
238 29
            $definition->addMethodCall('setCallback', [[new Reference($consumer['callback']), 'execute']]);
239
240
            if (array_key_exists('qos_options', $consumer)) {
241 29
                $definition->addMethodCall('setQosOptions', [
242
                    $consumer['qos_options']['prefetch_size'],
243
                    $consumer['qos_options']['prefetch_count'],
244 29
                    $consumer['qos_options']['global'],
245 25
                ]);
246
            }
247 29
248
            if (isset($consumer['idle_timeout'])) {
249
                $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]);
250
            }
251
            if (isset($consumer['idle_timeout_exit_code'])) {
252
                $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]);
253
            }
254
            if (isset($consumer['timeout_wait'])) {
255
                $definition->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']]);
256
            }
257 29
            if (isset($consumer['graceful_max_execution'])) {
258
                $definition->addMethodCall(
259
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
260
                    [$consumer['graceful_max_execution']['timeout']]
261 29
                );
262 29
                $definition->addMethodCall(
263 28
                    'setGracefulMaxExecutionTimeoutExitCode',
264
                    [$consumer['graceful_max_execution']['exit_code']]
265
                );
266 29
            }
267 1
            if (!$consumer['auto_setup_fabric']) {
268
                $definition->addMethodCall('disableAutoSetupFabric');
269
            }
270 29
271 29
            $this->injectConnection($definition, $consumer['connection']);
272 29
            if ($this->collectorEnabled) {
273
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
274
            }
275 29
276 29
            if ($consumer['enable_logger']) {
277 29
                $this->injectLogger($definition);
278 29
            }
279 29
280
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
281 29
            $this->container->setDefinition($name, $definition);
282 29
            $this->addDequeuerAwareCall($consumer['callback'], $name);
283 29
284
            // register alias for argument auto wiring
285
            if (method_exists($this->container, 'registerAliasForArgument')) {
286 34
                $argName = !str_ends_with(strtolower($key), 'consumer') ? sprintf('%sConsumer', $key) : $key;
287
                $this->container
288 34
                    ->registerAliasForArgument($name, ConsumerInterface::class, $argName)
289
                    ->setPublic(false);
290 34
291 25
                $this->container
292 25
                    ->registerAliasForArgument($name, '%old_sound_rabbit_mq.consumer.class%', $argName)
293
                    ->setPublic(false);
294 25
            }
295
        }
296
    }
297
298
    protected function loadMultipleConsumers()
299
    {
300
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
301 25
            $queues = [];
302 25
            $callbacks = [];
303 25
304 25
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
305
                throw new InvalidConfigurationException(
306
                    "Error on loading $key multiple consumer. " .
307 25
                    "Either 'queues' or 'queues_provider' parameters should be defined."
308
                );
309 25
            }
310 25
311 25
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
312 25
                $queues[$queueOptions['name']] = $queueOptions;
313 25
                $queues[$queueOptions['name']]['callback'] = [new Reference($queueOptions['callback']), 'execute'];
314
                $callbacks[] = $queueOptions['callback'];
315 25
            }
316 25
317 25
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
318 25
            $definition
319
                ->setPublic(true)
320
                ->addTag('old_sound_rabbit_mq.base_amqp')
321
                ->addTag('old_sound_rabbit_mq.multi_consumer')
322 25
                ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])])
323
                ->addMethodCall('setQueues', [$this->normalizeArgumentKeys($queues)]);
324
325
            if ($consumer['queues_provider']) {
326
                $definition->addMethodCall(
327
                    'setQueuesProvider',
328
                    [new Reference($consumer['queues_provider'])]
329
                );
330 25
            }
331
332
            if (array_key_exists('qos_options', $consumer)) {
333 25
                $definition->addMethodCall('setQosOptions', [
334
                    $consumer['qos_options']['prefetch_size'],
335
                    $consumer['qos_options']['prefetch_count'],
336 25
                    $consumer['qos_options']['global'],
337 25
                ]);
338
            }
339 25
340
            if (isset($consumer['idle_timeout'])) {
341
                $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]);
342
            }
343
            if (isset($consumer['idle_timeout_exit_code'])) {
344
                $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]);
345
            }
346
            if (isset($consumer['timeout_wait'])) {
347
                $definition->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']]);
348
            }
349 25
            if (isset($consumer['graceful_max_execution'])) {
350
                $definition->addMethodCall(
351
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
352
                    [$consumer['graceful_max_execution']['timeout']]
353 25
                );
354 25
                $definition->addMethodCall(
355 25
                    'setGracefulMaxExecutionTimeoutExitCode',
356
                    [$consumer['graceful_max_execution']['exit_code']]
357
                );
358 25
            }
359
            if (!$consumer['auto_setup_fabric']) {
360
                $definition->addMethodCall('disableAutoSetupFabric');
361
            }
362 25
363 25
            $this->injectConnection($definition, $consumer['connection']);
364 25
            if ($this->collectorEnabled) {
365 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
366
            }
367 25
368 25
            if ($consumer['enable_logger']) {
369
                $this->injectLogger($definition);
370
            }
371 34
372
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
373 34
            $this->container->setDefinition($name, $definition);
374
            if ($consumer['queues_provider']) {
375 34
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
376 25
            }
377
            foreach ($callbacks as $callback) {
378
                $this->addDequeuerAwareCall($callback, $name);
379
            }
380
        }
381
    }
382
383 25
    protected function loadDynamicConsumers()
384
    {
385 25
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
386 25
            if (empty($consumer['queue_options_provider'])) {
387 25
                throw new InvalidConfigurationException(
388 25
                    "Error on loading $key dynamic consumer. " .
389 25
                    "'queue_provider' parameter should be defined."
390 25
                );
391
            }
392 25
393
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
394
            $definition
395
                ->setPublic(true)
396
                ->addTag('old_sound_rabbit_mq.base_amqp')
397
                ->addTag('old_sound_rabbit_mq.consumer')
398
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
399
                ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])])
400 25
                ->addMethodCall('setCallback', [[new Reference($consumer['callback']), 'execute']]);
401 25
402 25
            if (array_key_exists('qos_options', $consumer)) {
403
                $definition->addMethodCall('setQosOptions', [
404
                    $consumer['qos_options']['prefetch_size'],
405 25
                    $consumer['qos_options']['prefetch_count'],
406
                    $consumer['qos_options']['global'],
407
                ]);
408 25
            }
409
410
            $definition->addMethodCall(
411 25
                'setQueueOptionsProvider',
412
                [new Reference($consumer['queue_options_provider'])]
413
            );
414 25
415
            if (isset($consumer['idle_timeout'])) {
416
                $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]);
417
            }
418
            if (isset($consumer['idle_timeout_exit_code'])) {
419
                $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]);
420
            }
421
            if (isset($consumer['timeout_wait'])) {
422
                $definition->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']]);
423
            }
424 25
            if (isset($consumer['graceful_max_execution'])) {
425
                $definition->addMethodCall(
426
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
427
                    [$consumer['graceful_max_execution']['timeout']]
428 25
                );
429 25
                $definition->addMethodCall(
430 25
                    'setGracefulMaxExecutionTimeoutExitCode',
431
                    [$consumer['graceful_max_execution']['exit_code']]
432
                );
433 25
            }
434
            if (!$consumer['auto_setup_fabric']) {
435
                $definition->addMethodCall('disableAutoSetupFabric');
436
            }
437 25
438 25
            $this->injectConnection($definition, $consumer['connection']);
439 25
            if ($this->collectorEnabled) {
440 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
441
            }
442 34
443
            if ($consumer['enable_logger']) {
444 34
                $this->injectLogger($definition);
445
            }
446 34
447
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
448
            $this->container->setDefinition($name, $definition);
449
            $this->addDequeuerAwareCall($consumer['callback'], $name);
450
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
451
        }
452
    }
453
454
    protected function loadBatchConsumers()
455
    {
456
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
457
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
458
459
            if (!isset($consumer['exchange_options'])) {
460
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
461
            }
462
463
            $definition
464
                ->setPublic(true)
465
                ->addTag('old_sound_rabbit_mq.base_amqp')
466
                ->addTag('old_sound_rabbit_mq.batch_consumer')
467
                ->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']])
468
                ->addMethodCall('setPrefetchCount', [$consumer['qos_options']['prefetch_count']])
469
                ->addMethodCall('setCallback', [[new Reference($consumer['callback']), 'batchExecute']])
470
                ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])])
471
                ->addMethodCall('setQueueOptions', [$this->normalizeArgumentKeys($consumer['queue_options'])])
472
                ->addMethodCall('setQosOptions', [
473
                    $consumer['qos_options']['prefetch_size'],
474
                    $consumer['qos_options']['prefetch_count'],
475
                    $consumer['qos_options']['global'],
476
                ])
477
            ;
478
479
            if (isset($consumer['idle_timeout_exit_code'])) {
480
                $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]);
481
            }
482
483
            if (isset($consumer['idle_timeout'])) {
484
                $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]);
485
            }
486
487
            if (isset($consumer['graceful_max_execution'])) {
488
                $definition->addMethodCall(
489
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
490
                    [$consumer['graceful_max_execution']['timeout']]
491
                );
492
            }
493
494
            if (!$consumer['auto_setup_fabric']) {
495
                $definition->addMethodCall('disableAutoSetupFabric');
496
            }
497
498
            if ($consumer['keep_alive']) {
499
                $definition->addMethodCall('keepAlive');
500
            }
501
502
            $this->injectConnection($definition, $consumer['connection']);
503 34
            if ($this->collectorEnabled) {
504
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
505 34
            }
506
507 34
            if ($consumer['enable_logger']) {
508 25
                $this->injectLogger($definition);
509
            }
510 25
511 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
512 25
        }
513 25
    }
514 25
515 25
    protected function loadAnonConsumers()
516 25
    {
517 25
        foreach ($this->config['anon_consumers'] as $key => $anon) {
518
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
519
            $definition
520 25
                ->setPublic(true)
521 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
522 25
                ->addTag('old_sound_rabbit_mq.anon_consumer')
523
                ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($anon['exchange_options'])])
524 34
                ->addMethodCall('setCallback', [[new Reference($anon['callback']), 'execute']]);
525
            $this->injectConnection($definition, $anon['connection']);
526
            if ($this->collectorEnabled) {
527
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
528
            }
529
530
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
531
            $this->container->setDefinition($name, $definition);
532
            $this->addDequeuerAwareCall($anon['callback'], $name);
533
        }
534 30
    }
535
536 30
    /**
537 1
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
538
     * parameter. So we revert the change for right configurations.
539 1
     *
540
     * @param array $config
541
     *
542
     * @return array
543 1
     */
544 1
    private function normalizeArgumentKeys(array $config): array
545 1
    {
546
        if (isset($config['arguments'])) {
547
            $arguments = $config['arguments'];
548 1
            // support for old configuration
549
            if (is_string($arguments)) {
550 1
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

550
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
551
            }
552 30
553
            $newArguments = [];
554
            foreach ($arguments as $key => $value) {
555
                if (strstr($key, '_')) {
556
                    $key = str_replace('_', '-', $key);
557
                }
558
                $newArguments[$key] = $value;
559
            }
560
            $config['arguments'] = $newArguments;
561
        }
562
        return $config;
563
    }
564
565
    /**
566
     * Support for arguments provided as string. Support for old configuration files.
567
     *
568
     * @deprecated
569
     * @param string $arguments
570
     * @return array
571
     */
572
    private function argumentsStringAsArray($arguments): array
573
    {
574
        $argumentsArray = [];
575
576
        $argumentPairs = explode(',', $arguments);
577
        foreach ($argumentPairs as $argument) {
578
            $argumentPair = explode(':', $argument);
579 34
            $type = 'S';
580
            if (isset($argumentPair[2])) {
581 34
                $type = $argumentPair[2];
582 28
            }
583 28
            $argumentsArray[$argumentPair[0]] = [$type, $argumentPair[1]];
584
        }
585 28
586 28
        return $argumentsArray;
587 28
    }
588 28
589 28
    protected function loadRpcClients()
590
    {
591 28
        foreach ($this->config['rpc_clients'] as $key => $client) {
592 28
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
593
            $definition->setLazy($client['lazy']);
594 28
            $definition
595 28
                ->addTag('old_sound_rabbit_mq.rpc_client')
596
                ->addMethodCall('initClient', [$client['expect_serialized_response']]);
597 28
            $this->injectConnection($definition, $client['connection']);
598
            if ($this->collectorEnabled) {
599 28
                $this->injectLoggedChannel($definition, $key, $client['connection']);
600
            }
601 34
            if (array_key_exists('unserializer', $client)) {
602
                $definition->addMethodCall('setUnserializer', [$client['unserializer']]);
603 34
            }
604
            if (array_key_exists('direct_reply_to', $client)) {
605 34
                $definition->addMethodCall('setDirectReplyTo', [$client['direct_reply_to']]);
606 25
            }
607
            $definition->setPublic(true);
608 25
609 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
610 25
        }
611 25
    }
612 25
613 25
    protected function loadRpcServers()
614 25
    {
615 25
        foreach ($this->config['rpc_servers'] as $key => $server) {
616
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
617 25
            $definition
618
                ->setPublic(true)
619
                ->addTag('old_sound_rabbit_mq.base_amqp')
620
                ->addTag('old_sound_rabbit_mq.rpc_server')
621
                ->addMethodCall('initServer', [$key])
622
                ->addMethodCall('setCallback', [[new Reference($server['callback']), 'execute']]);
623
            $this->injectConnection($definition, $server['connection']);
624 25
            if ($this->collectorEnabled) {
625 25
                $this->injectLoggedChannel($definition, $key, $server['connection']);
626
            }
627 25
            if (array_key_exists('qos_options', $server)) {
628 25
                $definition->addMethodCall('setQosOptions', [
629
                    $server['qos_options']['prefetch_size'],
630 25
                    $server['qos_options']['prefetch_count'],
631 25
                    $server['qos_options']['global'],
632
                ]);
633 25
            }
634
            if (array_key_exists('exchange_options', $server)) {
635 34
                $definition->addMethodCall('setExchangeOptions', [$server['exchange_options']]);
636
            }
637 31
            if (array_key_exists('queue_options', $server)) {
638
                $definition->addMethodCall('setQueueOptions', [$server['queue_options']]);
639 31
            }
640 31
            if (array_key_exists('serializer', $server)) {
641
                $definition->addMethodCall('setSerializer', [$server['serializer']]);
642 31
            }
643 31
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
644 31
        }
645
    }
646 31
647
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
648 31
    {
649 31
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
650 31
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
651
        $channel
652 33
            ->setPublic(false)
653
            ->addTag('old_sound_rabbit_mq.logged_channel');
654 33
        $this->injectConnection($channel, $connectionName);
655 33
656
        $this->container->setDefinition($id, $channel);
657 34
658
        $this->channelIds[] = $id;
659 34
        $definition->addArgument(new Reference($id));
660
    }
661
662
    protected function injectConnection(Definition $definition, $connectionName)
663
    {
664
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
665
    }
666
667
    public function getAlias(): string
668
    {
669 29
        return 'old_sound_rabbit_mq';
670
    }
671 29
672 29
    /**
673
     * Add proper dequeuer aware call
674
     *
675
     * @param string $callback
676
     * @param string $name
677
     * @throws \ReflectionException
678
     */
679
    protected function addDequeuerAwareCall($callback, $name)
680
    {
681
        if (!$this->container->has($callback)) {
682 1
            return;
683
        }
684 1
685 1
        $callbackDefinition = $this->container->findDefinition($callback);
686
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
687 1
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
688 1
            $callbackDefinition->addMethodCall('setDequeuer', [new Reference($name)]);
689
        }
690
    }
691
692
    private function injectLogger(Definition $definition)
693
    {
694
        $definition->addTag('monolog.logger', [
695 1
            'channel' => 'phpamqplib',
696
        ]);
697
        $definition->addMethodCall('setLogger', [new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)]);
698 1
    }
699
700
    /**
701
     * Get default AMQP exchange options
702
     *
703
     * @return array
704
     */
705
    protected function getDefaultExchangeOptions(): array
706
    {
707
        return [
708
            'name' => '',
709
            'type' => 'direct',
710 30
            'passive' => true,
711
            'declare' => false,
712
        ];
713 30
    }
714
715
    /**
716
     * Get default AMQP queue options
717
     *
718
     * @return array
719
     */
720
    protected function getDefaultQueueOptions(): array
721
    {
722
        return [
723
            'name' => '',
724
            'declare' => false,
725
        ];
726
    }
727
}
728