EnqueueSimpleBusExtension::configureQueue()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 36
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 23
dl 0
loc 36
rs 9.552
c 0
b 0
f 0
cc 2
nc 2
nop 3
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Enqueue\SimpleBus\Bridge\Symfony\Bundle\DependencyInjection;
6
7
use Enqueue\SimpleBus\Consumption\Extension\LongRunningExtension;
8
use Enqueue\SimpleBus\Routing\FixedQueueNameResolver;
9
use Enqueue\SimpleBus\Routing\MappedQueueNameResolver;
10
use Enqueue\SimpleBus\SimpleBusProcessor;
11
use Enqueue\SimpleBus\SimpleBusPublisher;
12
use LogicException;
13
use SimpleBus\Serialization\Envelope\Serializer\MessageInEnvelopeSerializer;
14
use SimpleBus\Serialization\ObjectSerializer;
15
use Symfony\Component\Config\FileLocator;
16
use Symfony\Component\DependencyInjection\ContainerBuilder;
17
use Symfony\Component\DependencyInjection\Extension\PrependExtensionInterface;
18
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
19
use Symfony\Component\DependencyInjection\Reference;
20
use Symfony\Component\HttpKernel\DependencyInjection\ConfigurableExtension;
21
22
class EnqueueSimpleBusExtension extends ConfigurableExtension implements PrependExtensionInterface
23
{
24
    public function prepend(ContainerBuilder $container): void
25
    {
26
        $this->requireBundle('SimpleBusAsynchronousBundle', $container);
27
28
        $config = $container->getExtensionConfig($this->getAlias());
29
        $merged = $this->processConfiguration($this->getConfiguration($config, $container), $config);
0 ignored issues
show
Bug introduced by
It seems like $this->getConfiguration($config, $container) can also be of type null; however, parameter $configuration of Symfony\Component\Depend...:processConfiguration() does only seem to accept Symfony\Component\Config...\ConfigurationInterface, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

29
        $merged = $this->processConfiguration(/** @scrutinizer ignore-type */ $this->getConfiguration($config, $container), $config);
Loading history...
30
31
        // Common for all messages.
32
        $container->prependExtensionConfig('simple_bus_asynchronous', [
33
            'object_serializer_service_id' => ObjectSerializer::class,
34
        ]);
35
36
        // Enable async commands.
37
        if ($merged['commands']['enabled']) {
38
            $this->requireBundle('SimpleBusCommandBusBundle', $container);
39
40
            $container->prependExtensionConfig('simple_bus_asynchronous', [
41
                'commands' => [
42
                    'publisher_service_id' => 'enqueue.simple_bus.commands_publisher',
43
                ],
44
            ]);
45
        }
46
47
        // Enable async events.
48
        if ($merged['events']['enabled']) {
49
            $this->requireBundle('SimpleBusEventBusBundle', $container);
50
51
            $container->prependExtensionConfig('simple_bus_asynchronous', [
52
                'events' => [
53
                    'publisher_service_id' => 'enqueue.simple_bus.events_publisher',
54
                    'strategy' => 'predefined',
55
                ],
56
            ]);
57
        }
58
    }
59
60
    protected function loadInternal(array $config, ContainerBuilder $container)
61
    {
62
        $loader = new XmlFileLoader($container, new FileLocator(__DIR__ . '/../Resources/config'));
63
        $loader->load('services.xml');
64
65
        if ($config['commands']['enabled']) {
66
            $this->configureQueue(Configuration::TYPE_COMMANDS, $config['commands'], $container);
67
        }
68
69
        if ($config['events']['enabled']) {
70
            $this->configureQueue(Configuration::TYPE_EVENTS, $config['events'], $container);
71
        }
72
73
        $bundles = $container->getParameter('kernel.bundles');
74
75
        if (isset($bundles['LongRunningBundle'])) {
76
            $container
77
                ->register(LongRunningExtension::class)
78
                ->addArgument(new Reference('long_running.delegating_cleaner'))
79
                ->addTag('enqueue.consumption.extension', ['priority' => -999])
80
            ;
81
        }
82
    }
83
84
    private function configureQueue(string $type, array $config, ContainerBuilder $container): self
85
    {
86
        $queueResolverId = sprintf('enqueue.simple_bus.%s_queue_resolver', $type);
87
        $publisherId = sprintf('enqueue.simple_bus.%s_publisher', $type);
88
        $transportId = sprintf('enqueue.transport.%s.context', $config['transport_name']);
89
        $consumerId = sprintf('simple_bus.asynchronous.standard_serialized_%s_envelope_consumer', rtrim($type, 's'));
90
91
        // Register publisher.
92
        $container
93
            ->register($publisherId, SimpleBusPublisher::class)
94
            ->addArgument(new Reference(MessageInEnvelopeSerializer::class))
95
            ->addArgument(new Reference($queueResolverId))
96
            ->addArgument(new Reference($transportId))
97
        ;
98
99
        // Register processor.
100
        $container
101
            ->register($config['processor_service_id'], SimpleBusProcessor::class)
102
            ->addArgument(new Reference($consumerId))
103
            ->setPublic(true)
104
        ;
105
106
        if (count($config['queue_map'])) {
107
            $container
108
                ->register($queueResolverId, MappedQueueNameResolver::class)
109
                ->addArgument($config['queue_map'])
110
                ->addArgument($config['default_queue'])
111
            ;
112
        } else {
113
            $container
114
                ->register($queueResolverId, FixedQueueNameResolver::class)
115
                ->addArgument($config['default_queue'])
116
            ;
117
        }
118
119
        return $this;
120
    }
121
122
    /**
123
     * @throws LogicException
124
     */
125
    private function requireBundle(string $bundleName, ContainerBuilder $container)
126
    {
127
        $bundles = $container->getParameter('kernel.bundles');
128
129
        if (!isset($bundles[$bundleName])) {
130
            throw new LogicException(sprintf('You need to enable "%s".', $bundleName));
131
        }
132
    }
133
}
134