OldSoundRabbitMqExtension::injectLoggedChannel()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 13
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1.2441

Importance

Changes 0
Metric Value
cc 1
eloc 9
c 0
b 0
f 0
nc 1
nop 3
dl 0
loc 13
rs 9.9666
ccs 3
cts 8
cp 0.375
crap 1.2441
1
<?php
2
3
namespace OldSound\RabbitMqBundle\DependencyInjection;
4
5
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
6
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
7
use Symfony\Component\Config\Definition\ConfigurationInterface;
8
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
9
use Symfony\Component\Config\FileLocator;
10
use Symfony\Component\DependencyInjection\ContainerBuilder;
11
use Symfony\Component\DependencyInjection\ContainerInterface;
12
use Symfony\Component\DependencyInjection\Definition;
13
use Symfony\Component\DependencyInjection\Extension\Extension;
14
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
15
use Symfony\Component\DependencyInjection\Reference;
16
17
/**
18
 * OldSoundRabbitMqExtension.
19
 *
20
 * @author Alvaro Videla
21
 * @author Marc Weistroff <[email protected]>
22
 */
23
class OldSoundRabbitMqExtension extends Extension
24
{
25
    /**
26
     * @var ContainerBuilder
27
     */
28
    private $container;
29
30
    /**
31
     * @var Boolean Whether the data collector is enabled
32
     */
33
    private $collectorEnabled;
34
35
    private $channelIds = [];
36
37
    private $config = [];
38
39 34
    public function load(array $configs, ContainerBuilder $container): void
40
    {
41 34
        $this->container = $container;
42
43 34
        $loader = new XmlFileLoader($this->container, new FileLocator([__DIR__ . '/../Resources/config']));
44 34
        $loader->load('rabbitmq.xml');
45
46 34
        $configuration = $this->getConfiguration($configs, $container);
47 34
        $this->config = $this->processConfiguration($configuration, $configs);
48
49 34
        $this->collectorEnabled = $this->config['enable_collector'];
50
51 34
        $this->loadConnections();
52 34
        $this->loadBindings();
53 34
        $this->loadProducers();
54 34
        $this->loadConsumers();
55 34
        $this->loadMultipleConsumers();
56 34
        $this->loadDynamicConsumers();
57 34
        $this->loadBatchConsumers();
58 34
        $this->loadAnonConsumers();
59 34
        $this->loadRpcClients();
60 34
        $this->loadRpcServers();
61
62 34
        if ($this->collectorEnabled && $this->channelIds) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->channelIds of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
63 31
            $channels = [];
64 31
            foreach (array_unique($this->channelIds) as $id) {
65 31
                $channels[] = new Reference($id);
66
            }
67
68 31
            $definition = $container->getDefinition('old_sound_rabbit_mq.data_collector');
69 31
            $definition->replaceArgument(0, $channels);
70
        } else {
71 3
            $this->container->removeDefinition('old_sound_rabbit_mq.data_collector');
72
        }
73 34
    }
74
75 34
    public function getConfiguration(array $config, ContainerBuilder $container): ?ConfigurationInterface
76
    {
77 34
        return new Configuration($this->getAlias());
78
    }
79
80 34
    protected function loadConnections()
81
    {
82 34
        foreach ($this->config['connections'] as $key => $connection) {
83 34
            $connectionSuffix = $connection['use_socket'] ? 'socket_connection.class' : 'connection.class';
84 34
            $classParam =
85 34
                $connection['lazy']
86 25
                    ? '%old_sound_rabbit_mq.lazy.'.$connectionSuffix.'%'
87 34
                    : '%old_sound_rabbit_mq.'.$connectionSuffix.'%';
88
89 34
            $definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', [
90 34
                $classParam, $connection,
91
            ]);
92 34
            if (isset($connection['connection_parameters_provider'])) {
93
                $definition->addArgument(new Reference($connection['connection_parameters_provider']));
94
                unset($connection['connection_parameters_provider']);
95
            }
96 34
            $definition->setPublic(false);
97 34
            $factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
98 34
            $this->container->setDefinition($factoryName, $definition);
99
100 34
            $definition = new Definition($classParam);
101 34
            if (method_exists($definition, 'setFactory')) {
102
                // to be inlined in services.xml when dependency on Symfony DependencyInjection is bumped to 2.6
103 34
                $definition->setFactory([new Reference($factoryName), 'createConnection']);
104
            } else {
105
                // to be removed when dependency on Symfony DependencyInjection is bumped to 2.6
106
                $definition->setFactoryService($factoryName);
0 ignored issues
show
Bug introduced by
The method setFactoryService() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
                $definition->/** @scrutinizer ignore-call */ 
107
                             setFactoryService($factoryName);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107
                $definition->setFactoryMethod('createConnection');
0 ignored issues
show
Bug introduced by
The method setFactoryMethod() does not exist on Symfony\Component\DependencyInjection\Definition. Did you maybe mean setFactory()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

107
                $definition->/** @scrutinizer ignore-call */ 
108
                             setFactoryMethod('createConnection');

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
108
            }
109 34
            $definition->addTag('old_sound_rabbit_mq.connection');
110 34
            $definition->setPublic(true);
111
112 34
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.connection.%s', $key), $definition);
113
        }
114 34
    }
115
116 34
    protected function loadBindings()
117
    {
118 34
        if ($this->config['sandbox']) {
119
            return;
120
        }
121 34
        foreach ($this->config['bindings'] as $binding) {
122 25
            ksort($binding);
123 25
            $definition = new Definition($binding['class']);
124 25
            $definition->addTag('old_sound_rabbit_mq.binding');
125 25
            $definition->addMethodCall('setArguments', [$binding['arguments']]);
126 25
            $definition->addMethodCall('setDestination', [$binding['destination']]);
127 25
            $definition->addMethodCall('setDestinationIsExchange', [$binding['destination_is_exchange']]);
128 25
            $definition->addMethodCall('setExchange', [$binding['exchange']]);
129 25
            $definition->addMethodCall('isNowait', [$binding['nowait']]);
130 25
            $definition->addMethodCall('setRoutingKey', [$binding['routing_key']]);
131 25
            $this->injectConnection($definition, $binding['connection']);
132 25
            $key = md5(json_encode($binding));
133 25
            if ($this->collectorEnabled) {
134
                // in the context of a binding, I don't thing logged channels are needed?
135 25
                $this->injectLoggedChannel($definition, $key, $binding['connection']);
136
            }
137 25
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.binding.%s', $key), $definition);
138
        }
139 34
    }
140
141 34
    protected function loadProducers()
142
    {
143 34
        if ($this->config['sandbox'] == false) {
144 34
            foreach ($this->config['producers'] as $key => $producer) {
145 30
                $definition = new Definition($producer['class']);
146 30
                $definition->setPublic(true);
147 30
                $definition->addTag('old_sound_rabbit_mq.base_amqp');
148 30
                $definition->addTag('old_sound_rabbit_mq.producer');
149
                //this producer doesn't define an exchange -> using AMQP Default
150 30
                if (!isset($producer['exchange_options'])) {
151 1
                    $producer['exchange_options'] = $this->getDefaultExchangeOptions();
152
                }
153 30
                $definition->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($producer['exchange_options'])]);
154
                //this producer doesn't define a queue -> using AMQP Default
155 30
                if (!isset($producer['queue_options'])) {
156 30
                    $producer['queue_options'] = $this->getDefaultQueueOptions();
157
                }
158 30
                $definition->addMethodCall('setQueueOptions', [$producer['queue_options']]);
159 30
                $this->injectConnection($definition, $producer['connection']);
160 30
                if ($this->collectorEnabled) {
161 28
                    $this->injectLoggedChannel($definition, $key, $producer['connection']);
162
                }
163 30
                if (!$producer['auto_setup_fabric']) {
164
                    $definition->addMethodCall('disableAutoSetupFabric');
165
                }
166
167 30
                if ($producer['enable_logger']) {
168
                    $this->injectLogger($definition);
169
                }
170
171 30
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
172
173 30
                $this->container->setDefinition($producerServiceName, $definition);
174 30
                if (null !== $producer['service_alias']) {
175 25
                    $this->container->setAlias($producer['service_alias'], $producerServiceName);
176
                }
177
178
                // register alias for argument auto wiring
179 30
                if (method_exists($this->container, 'registerAliasForArgument')) {
180 30
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
181 30
                    $this->container
182 30
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
183 30
                        ->setPublic(false);
184
185 30
                    $this->container
186 30
                        ->registerAliasForArgument($producerServiceName, $producer['class'], $argName)
187 30
                        ->setPublic(false);
188
                }
189
190 30
                $definition->addMethodCall('setDefaultRoutingKey', [$producer['default_routing_key']]);
191 30
                $definition->addMethodCall('setContentType', [$producer['default_content_type']]);
192 30
                $definition->addMethodCall('setDeliveryMode', [$producer['default_delivery_mode']]);
193
            }
194
        } else {
195
            foreach ($this->config['producers'] as $key => $producer) {
196
                $definition = new Definition('%old_sound_rabbit_mq.fallback.class%');
197
                $producerServiceName = sprintf('old_sound_rabbit_mq.%s_producer', $key);
198
                $this->container->setDefinition($producerServiceName, $definition);
199
200
                // register alias for argumen auto wiring
201
                if (method_exists($this->container, 'registerAliasForArgument')) {
202
                    $argName = !str_ends_with(strtolower($key), 'producer') ? sprintf('%sProducer', $key) : $key;
203
                    $this->container
204
                        ->registerAliasForArgument($producerServiceName, ProducerInterface::class, $argName)
205
                        ->setPublic(false);
206
                }
207
            }
208
        }
209 34
    }
210
211 34
    protected function loadConsumers()
212
    {
213 34
        foreach ($this->config['consumers'] as $key => $consumer) {
214 29
            $definition = new Definition('%old_sound_rabbit_mq.consumer.class%');
215 29
            $definition->setPublic(true);
216 29
            $definition->addTag('old_sound_rabbit_mq.base_amqp');
217 29
            $definition->addTag('old_sound_rabbit_mq.consumer');
218
            //this consumer doesn't define an exchange -> using AMQP Default
219 29
            if (!isset($consumer['exchange_options'])) {
220
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
221
            }
222 29
            $definition->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])]);
223
            //this consumer doesn't define a queue -> using AMQP Default
224 29
            if (!isset($consumer['queue_options'])) {
225
                $consumer['queue_options'] = $this->getDefaultQueueOptions();
226
            }
227 29
            $definition->addMethodCall('setQueueOptions', [$this->normalizeArgumentKeys($consumer['queue_options'])]);
228 29
            $definition->addMethodCall('setCallback', [[new Reference($consumer['callback']), 'execute']]);
229
230 29
            if (array_key_exists('qos_options', $consumer)) {
231 25
                $definition->addMethodCall('setQosOptions', [
232 25
                    $consumer['qos_options']['prefetch_size'],
233 25
                    $consumer['qos_options']['prefetch_count'],
234 25
                    $consumer['qos_options']['global'],
235
                ]);
236
            }
237
238 29
            if (isset($consumer['idle_timeout'])) {
239
                $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]);
240
            }
241 29
            if (isset($consumer['idle_timeout_exit_code'])) {
242
                $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]);
243
            }
244 29
            if (isset($consumer['timeout_wait'])) {
245 25
                $definition->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']]);
246
            }
247 29
            if (isset($consumer['graceful_max_execution'])) {
248
                $definition->addMethodCall(
249
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
250
                    [$consumer['graceful_max_execution']['timeout']]
251
                );
252
                $definition->addMethodCall(
253
                    'setGracefulMaxExecutionTimeoutExitCode',
254
                    [$consumer['graceful_max_execution']['exit_code']]
255
                );
256
            }
257 29
            if (!$consumer['auto_setup_fabric']) {
258
                $definition->addMethodCall('disableAutoSetupFabric');
259
            }
260
            if (isset($consumer['options'])) {
261 29
                $definition->addMethodCall(
262 29
                    'setConsumerOptions',
263 28
                    [$this->normalizeArgumentKeys($consumer['options'])]
264
                );
265
            }
266 29
267 1
            $this->injectConnection($definition, $consumer['connection']);
268
            if ($this->collectorEnabled) {
269
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
270 29
            }
271 29
272 29
            if ($consumer['enable_logger']) {
273
                $this->injectLogger($definition);
274
            }
275 29
276 29
            $name = sprintf('old_sound_rabbit_mq.%s_consumer', $key);
277 29
            $this->container->setDefinition($name, $definition);
278 29
            $this->addDequeuerAwareCall($consumer['callback'], $name);
279 29
280
            // register alias for argument auto wiring
281 29
            if (method_exists($this->container, 'registerAliasForArgument')) {
282 29
                $argName = !str_ends_with(strtolower($key), 'consumer') ? sprintf('%sConsumer', $key) : $key;
283 29
                $this->container
284
                    ->registerAliasForArgument($name, ConsumerInterface::class, $argName)
285
                    ->setPublic(false);
286 34
287
                $this->container
288 34
                    ->registerAliasForArgument($name, '%old_sound_rabbit_mq.consumer.class%', $argName)
289
                    ->setPublic(false);
290 34
            }
291 25
        }
292 25
    }
293
294 25
    protected function loadMultipleConsumers()
295
    {
296
        foreach ($this->config['multiple_consumers'] as $key => $consumer) {
297
            $queues = [];
298
            $callbacks = [];
299
300
            if (empty($consumer['queues']) && empty($consumer['queues_provider'])) {
301 25
                throw new InvalidConfigurationException(
302 25
                    "Error on loading $key multiple consumer. " .
303 25
                    "Either 'queues' or 'queues_provider' parameters should be defined."
304 25
                );
305
            }
306
307 25
            foreach ($consumer['queues'] as $queueName => $queueOptions) {
308
                $queues[$queueOptions['name']] = $queueOptions;
309 25
                $queues[$queueOptions['name']]['callback'] = [new Reference($queueOptions['callback']), 'execute'];
310 25
                $callbacks[] = $queueOptions['callback'];
311 25
            }
312 25
313 25
            $definition = new Definition('%old_sound_rabbit_mq.multi_consumer.class%');
314
            $definition
315 25
                ->setPublic(true)
316 25
                ->addTag('old_sound_rabbit_mq.base_amqp')
317 25
                ->addTag('old_sound_rabbit_mq.multi_consumer')
318 25
                ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])])
319
                ->addMethodCall('setQueues', [$this->normalizeArgumentKeys($queues)]);
320
321
            if ($consumer['queues_provider']) {
322 25
                $definition->addMethodCall(
323
                    'setQueuesProvider',
324
                    [new Reference($consumer['queues_provider'])]
325
                );
326
            }
327
328
            if (array_key_exists('qos_options', $consumer)) {
329
                $definition->addMethodCall('setQosOptions', [
330 25
                    $consumer['qos_options']['prefetch_size'],
331
                    $consumer['qos_options']['prefetch_count'],
332
                    $consumer['qos_options']['global'],
333 25
                ]);
334
            }
335
336 25
            if (isset($consumer['idle_timeout'])) {
337 25
                $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]);
338
            }
339 25
            if (isset($consumer['idle_timeout_exit_code'])) {
340
                $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]);
341
            }
342
            if (isset($consumer['timeout_wait'])) {
343
                $definition->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']]);
344
            }
345
            if (isset($consumer['graceful_max_execution'])) {
346
                $definition->addMethodCall(
347
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
348
                    [$consumer['graceful_max_execution']['timeout']]
349 25
                );
350
                $definition->addMethodCall(
351
                    'setGracefulMaxExecutionTimeoutExitCode',
352
                    [$consumer['graceful_max_execution']['exit_code']]
353 25
                );
354 25
            }
355 25
            if (!$consumer['auto_setup_fabric']) {
356
                $definition->addMethodCall('disableAutoSetupFabric');
357
            }
358 25
            if (isset($consumer['options'])) {
359
                $definition->addMethodCall(
360
                    'setConsumerOptions',
361
                    [$this->normalizeArgumentKeys($consumer['options'])]
362 25
                );
363 25
            }
364 25
365 25
            $this->injectConnection($definition, $consumer['connection']);
366
            if ($this->collectorEnabled) {
367 25
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
368 25
            }
369
370
            if ($consumer['enable_logger']) {
371 34
                $this->injectLogger($definition);
372
            }
373 34
374
            $name = sprintf('old_sound_rabbit_mq.%s_multiple', $key);
375 34
            $this->container->setDefinition($name, $definition);
376 25
            if ($consumer['queues_provider']) {
377
                $this->addDequeuerAwareCall($consumer['queues_provider'], $name);
378
            }
379
            foreach ($callbacks as $callback) {
380
                $this->addDequeuerAwareCall($callback, $name);
381
            }
382
        }
383 25
    }
384
385 25
    protected function loadDynamicConsumers()
386 25
    {
387 25
        foreach ($this->config['dynamic_consumers'] as $key => $consumer) {
388 25
            if (empty($consumer['queue_options_provider'])) {
389 25
                throw new InvalidConfigurationException(
390 25
                    "Error on loading $key dynamic consumer. " .
391
                    "'queue_provider' parameter should be defined."
392 25
                );
393
            }
394
395
            $definition = new Definition('%old_sound_rabbit_mq.dynamic_consumer.class%');
396
            $definition
397
                ->setPublic(true)
398
                ->addTag('old_sound_rabbit_mq.base_amqp')
399
                ->addTag('old_sound_rabbit_mq.consumer')
400 25
                ->addTag('old_sound_rabbit_mq.dynamic_consumer')
401 25
                ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])])
402 25
                ->addMethodCall('setCallback', [[new Reference($consumer['callback']), 'execute']]);
403
404
            if (array_key_exists('qos_options', $consumer)) {
405 25
                $definition->addMethodCall('setQosOptions', [
406
                    $consumer['qos_options']['prefetch_size'],
407
                    $consumer['qos_options']['prefetch_count'],
408 25
                    $consumer['qos_options']['global'],
409
                ]);
410
            }
411 25
412
            $definition->addMethodCall(
413
                'setQueueOptionsProvider',
414 25
                [new Reference($consumer['queue_options_provider'])]
415
            );
416
417
            if (isset($consumer['idle_timeout'])) {
418
                $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]);
419
            }
420
            if (isset($consumer['idle_timeout_exit_code'])) {
421
                $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]);
422
            }
423
            if (isset($consumer['timeout_wait'])) {
424 25
                $definition->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']]);
425
            }
426
            if (isset($consumer['graceful_max_execution'])) {
427
                $definition->addMethodCall(
428 25
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
429 25
                    [$consumer['graceful_max_execution']['timeout']]
430 25
                );
431
                $definition->addMethodCall(
432
                    'setGracefulMaxExecutionTimeoutExitCode',
433 25
                    [$consumer['graceful_max_execution']['exit_code']]
434
                );
435
            }
436
            if (!$consumer['auto_setup_fabric']) {
437 25
                $definition->addMethodCall('disableAutoSetupFabric');
438 25
            }
439 25
            if (isset($consumer['options'])) {
440 25
                $definition->addMethodCall(
441
                    'setConsumerOptions',
442 34
                    [$this->normalizeArgumentKeys($consumer['options'])]
443
                );
444 34
            }
445
446 34
            $this->injectConnection($definition, $consumer['connection']);
447
            if ($this->collectorEnabled) {
448
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
449
            }
450
451
            if ($consumer['enable_logger']) {
452
                $this->injectLogger($definition);
453
            }
454
455
            $name = sprintf('old_sound_rabbit_mq.%s_dynamic', $key);
456
            $this->container->setDefinition($name, $definition);
457
            $this->addDequeuerAwareCall($consumer['callback'], $name);
458
            $this->addDequeuerAwareCall($consumer['queue_options_provider'], $name);
459
        }
460
    }
461
462
    protected function loadBatchConsumers()
463
    {
464
        foreach ($this->config['batch_consumers'] as $key => $consumer) {
465
            $definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');
466
467
            if (!isset($consumer['exchange_options'])) {
468
                $consumer['exchange_options'] = $this->getDefaultExchangeOptions();
469
            }
470
471
            $definition
472
                ->setPublic(true)
473
                ->addTag('old_sound_rabbit_mq.base_amqp')
474
                ->addTag('old_sound_rabbit_mq.batch_consumer')
475
                ->addMethodCall('setTimeoutWait', [$consumer['timeout_wait']])
476
                ->addMethodCall('setPrefetchCount', [$consumer['qos_options']['prefetch_count']])
477
                ->addMethodCall('setCallback', [[new Reference($consumer['callback']), 'batchExecute']])
478
                ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($consumer['exchange_options'])])
479
                ->addMethodCall('setQueueOptions', [$this->normalizeArgumentKeys($consumer['queue_options'])])
480
                ->addMethodCall('setQosOptions', [
481
                    $consumer['qos_options']['prefetch_size'],
482
                    $consumer['qos_options']['prefetch_count'],
483
                    $consumer['qos_options']['global'],
484
                ])
485
            ;
486
487
            if (isset($consumer['idle_timeout_exit_code'])) {
488
                $definition->addMethodCall('setIdleTimeoutExitCode', [$consumer['idle_timeout_exit_code']]);
489
            }
490
491
            if (isset($consumer['idle_timeout'])) {
492
                $definition->addMethodCall('setIdleTimeout', [$consumer['idle_timeout']]);
493
            }
494
495
            if (isset($consumer['graceful_max_execution'])) {
496
                $definition->addMethodCall(
497
                    'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
498
                    [$consumer['graceful_max_execution']['timeout']]
499
                );
500
            }
501
502
            if (!$consumer['auto_setup_fabric']) {
503 34
                $definition->addMethodCall('disableAutoSetupFabric');
504
            }
505 34
506
            if (isset($consumer['options'])) {
507 34
                $definition->addMethodCall(
508 25
                    'setConsumerOptions',
509
                    [$this->normalizeArgumentKeys($consumer['options'])]
510 25
                );
511 25
            }
512 25
513 25
            if ($consumer['keep_alive']) {
514 25
                $definition->addMethodCall('keepAlive');
515 25
            }
516 25
517 25
            $this->injectConnection($definition, $consumer['connection']);
518
            if ($this->collectorEnabled) {
519
                $this->injectLoggedChannel($definition, $key, $consumer['connection']);
520 25
            }
521 25
522 25
            if ($consumer['enable_logger']) {
523
                $this->injectLogger($definition);
524 34
            }
525
526
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
527
        }
528
    }
529
530
    protected function loadAnonConsumers()
531
    {
532
        foreach ($this->config['anon_consumers'] as $key => $anon) {
533
            $definition = new Definition('%old_sound_rabbit_mq.anon_consumer.class%');
534 30
            $definition
535
                ->setPublic(true)
536 30
                ->addTag('old_sound_rabbit_mq.base_amqp')
537 1
                ->addTag('old_sound_rabbit_mq.anon_consumer')
538
                ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($anon['exchange_options'])])
539 1
                ->addMethodCall('setCallback', [[new Reference($anon['callback']), 'execute']]);
540
541
            if (isset($anon['options'])) {
542
                $definition->addMethodCall(
543 1
                    'setConsumerOptions',
544 1
                    [$this->normalizeArgumentKeys($anon['options'])]
545 1
                );
546
            }
547
548 1
            $this->injectConnection($definition, $anon['connection']);
549
            if ($this->collectorEnabled) {
550 1
                $this->injectLoggedChannel($definition, $key, $anon['connection']);
551
            }
552 30
553
            $name = sprintf('old_sound_rabbit_mq.%s_anon', $key);
554
            $this->container->setDefinition($name, $definition);
555
            $this->addDequeuerAwareCall($anon['callback'], $name);
556
        }
557
    }
558
559
    /**
560
     * Symfony 2 converts '-' to '_' when defined in the configuration. This leads to problems when using x-ha-policy
561
     * parameter. So we revert the change for right configurations.
562
     *
563
     * @param array $config
564
     *
565
     * @return array
566
     */
567
    private function normalizeArgumentKeys(array $config): array
568
    {
569
        if (isset($config['arguments'])) {
570
            $arguments = $config['arguments'];
571
            // support for old configuration
572
            if (is_string($arguments)) {
573
                $arguments = $this->argumentsStringAsArray($arguments);
0 ignored issues
show
Deprecated Code introduced by
The function OldSound\RabbitMqBundle\...rgumentsStringAsArray() has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

573
                $arguments = /** @scrutinizer ignore-deprecated */ $this->argumentsStringAsArray($arguments);
Loading history...
574
            }
575
576
            $newArguments = [];
577
            foreach ($arguments as $key => $value) {
578
                if (strstr($key, '_')) {
579 34
                    $key = str_replace('_', '-', $key);
580
                }
581 34
                $newArguments[$key] = $value;
582 28
            }
583 28
            $config['arguments'] = $newArguments;
584
        }
585 28
        return $config;
586 28
    }
587 28
588 28
    /**
589 28
     * Support for arguments provided as string. Support for old configuration files.
590
     *
591 28
     * @deprecated
592 28
     * @param string $arguments
593
     * @return array
594 28
     */
595 28
    private function argumentsStringAsArray($arguments): array
596
    {
597 28
        $argumentsArray = [];
598
599 28
        $argumentPairs = explode(',', $arguments);
600
        foreach ($argumentPairs as $argument) {
601 34
            $argumentPair = explode(':', $argument);
602
            $type = 'S';
603 34
            if (isset($argumentPair[2])) {
604
                $type = $argumentPair[2];
605 34
            }
606 25
            $argumentsArray[$argumentPair[0]] = [$type, $argumentPair[1]];
607
        }
608 25
609 25
        return $argumentsArray;
610 25
    }
611 25
612 25
    protected function loadRpcClients()
613 25
    {
614 25
        foreach ($this->config['rpc_clients'] as $key => $client) {
615 25
            $definition = new Definition('%old_sound_rabbit_mq.rpc_client.class%');
616
            $definition->setLazy($client['lazy']);
617 25
            $definition
618
                ->addTag('old_sound_rabbit_mq.rpc_client')
619
                ->addMethodCall('initClient', [$client['expect_serialized_response']]);
620
            $this->injectConnection($definition, $client['connection']);
621
            if ($this->collectorEnabled) {
622
                $this->injectLoggedChannel($definition, $key, $client['connection']);
623
            }
624 25
            if (array_key_exists('unserializer', $client)) {
625 25
                $definition->addMethodCall('setUnserializer', [$client['unserializer']]);
626
            }
627 25
            if (array_key_exists('direct_reply_to', $client)) {
628 25
                $definition->addMethodCall('setDirectReplyTo', [$client['direct_reply_to']]);
629
            }
630 25
            $definition->setPublic(true);
631 25
632
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
633 25
        }
634
    }
635 34
636
    protected function loadRpcServers()
637 31
    {
638
        foreach ($this->config['rpc_servers'] as $key => $server) {
639 31
            $definition = new Definition('%old_sound_rabbit_mq.rpc_server.class%');
640 31
            $definition
641
                ->setPublic(true)
642 31
                ->addTag('old_sound_rabbit_mq.base_amqp')
643 31
                ->addTag('old_sound_rabbit_mq.rpc_server')
644 31
                ->addMethodCall('initServer', [$key])
645
                ->addMethodCall('setCallback', [[new Reference($server['callback']), 'execute']]);
646 31
            $this->injectConnection($definition, $server['connection']);
647
            if ($this->collectorEnabled) {
648 31
                $this->injectLoggedChannel($definition, $key, $server['connection']);
649 31
            }
650 31
            if (array_key_exists('qos_options', $server)) {
651
                $definition->addMethodCall('setQosOptions', [
652 33
                    $server['qos_options']['prefetch_size'],
653
                    $server['qos_options']['prefetch_count'],
654 33
                    $server['qos_options']['global'],
655 33
                ]);
656
            }
657 34
            if (array_key_exists('exchange_options', $server)) {
658
                $definition->addMethodCall('setExchangeOptions', [$server['exchange_options']]);
659 34
            }
660
            if (array_key_exists('queue_options', $server)) {
661
                $definition->addMethodCall('setQueueOptions', [$server['queue_options']]);
662
            }
663
            if (array_key_exists('serializer', $server)) {
664
                $definition->addMethodCall('setSerializer', [$server['serializer']]);
665
            }
666
            $this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
667
        }
668
    }
669 29
670
    protected function injectLoggedChannel(Definition $definition, $name, $connectionName)
671 29
    {
672 29
        $id = sprintf('old_sound_rabbit_mq.channel.%s', $name);
673
        $channel = new Definition('%old_sound_rabbit_mq.logged.channel.class%');
674
        $channel
675
            ->setPublic(false)
676
            ->addTag('old_sound_rabbit_mq.logged_channel');
677
        $this->injectConnection($channel, $connectionName);
678
679
        $this->container->setDefinition($id, $channel);
680
681
        $this->channelIds[] = $id;
682 1
        $definition->addArgument(new Reference($id));
683
    }
684 1
685 1
    protected function injectConnection(Definition $definition, $connectionName)
686
    {
687 1
        $definition->addArgument(new Reference(sprintf('old_sound_rabbit_mq.connection.%s', $connectionName)));
688 1
    }
689
690
    public function getAlias(): string
691
    {
692
        return 'old_sound_rabbit_mq';
693
    }
694
695 1
    /**
696
     * Add proper dequeuer aware call
697
     *
698 1
     * @param string $callback
699
     * @param string $name
700
     * @throws \ReflectionException
701
     */
702
    protected function addDequeuerAwareCall($callback, $name)
703
    {
704
        if (!$this->container->has($callback)) {
705
            return;
706
        }
707
708
        $callbackDefinition = $this->container->findDefinition($callback);
709
        $refClass = new \ReflectionClass($callbackDefinition->getClass());
710 30
        if ($refClass->implementsInterface('OldSound\RabbitMqBundle\RabbitMq\DequeuerAwareInterface')) {
711
            $callbackDefinition->addMethodCall('setDequeuer', [new Reference($name)]);
712
        }
713 30
    }
714
715
    private function injectLogger(Definition $definition)
716
    {
717
        $definition->addTag('monolog.logger', [
718
            'channel' => 'phpamqplib',
719
        ]);
720
        $definition->addMethodCall('setLogger', [new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)]);
721
    }
722
723
    /**
724
     * Get default AMQP exchange options
725
     *
726
     * @return array
727
     */
728
    protected function getDefaultExchangeOptions(): array
729
    {
730
        return [
731
            'name' => '',
732
            'type' => 'direct',
733
            'passive' => true,
734
            'declare' => false,
735
        ];
736
    }
737
738
    /**
739
     * Get default AMQP queue options
740
     *
741
     * @return array
742
     */
743
    protected function getDefaultQueueOptions(): array
744
    {
745
        return [
746
            'name' => '',
747
            'declare' => false,
748
        ];
749
    }
750
}
751