Passed
Push — master ( 7dc569...e9ad93 )
by Mr
08:03
created

MessageBusProvisioner::buildLazySubscription()   A

Complexity

Conditions 5
Paths 1

Size

Total Lines 36
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 30

Importance

Changes 0
Metric Value
cc 5
eloc 20
nc 1
nop 5
dl 0
loc 36
ccs 0
cts 14
cp 0
crap 30
rs 9.2888
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
/**
3
 * This file is part of the daikon-cqrs/boot project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
namespace Daikon\Boot\Service\Provisioner;
10
11
use Auryn\Injector;
12
use Daikon\Config\ConfigProviderInterface;
13
use Daikon\Boot\Service\ServiceDefinitionInterface;
14
use Daikon\Boot\Service\ServiceDefinitionMap;
15
use Daikon\Dbal\Connector\ConnectorMap;
16
use Daikon\Interop\Assertion;
17
use Daikon\Interop\RuntimeException;
18
use Daikon\MessageBus\EnvelopeInterface;
19
use Daikon\MessageBus\Channel\Channel;
20
use Daikon\MessageBus\Channel\ChannelMap;
21
use Daikon\MessageBus\Channel\Subscription\LazySubscription;
22
use Daikon\MessageBus\Channel\Subscription\MessageHandler\MessageHandlerList;
23
use Daikon\MessageBus\Channel\Subscription\SubscriptionMap;
24
use Daikon\MessageBus\Channel\Subscription\Transport\TransportInterface;
25
use Daikon\MessageBus\Channel\Subscription\Transport\TransportMap;
26
use Daikon\MessageBus\MessageBusInterface;
27
use Daikon\Metadata\MetadataEnricherList;
28
29
final class MessageBusProvisioner implements ProvisionerInterface
30
{
31
    public const COMMANDS_CHANNEL = 'commands';
32
33
    public const COMMITS_CHANNEL = 'commits';
34
35
    public const EVENTS_CHANNEL = 'events';
36
37
    public function provision(
38
        Injector $injector,
39
        ConfigProviderInterface $configProvider,
40
        ServiceDefinitionInterface $serviceDefinition
41
    ): void {
42
        $serviceClass = $serviceDefinition->getServiceClass();
43
        $settings = $serviceDefinition->getSettings();
44
45
        if (!isset($settings['transports'])) {
46
            throw new RuntimeException('Message bus transports configuration is required.');
47
        }
48
49
        $injector
50
            ->delegate($serviceClass, $this->factory($injector, $serviceDefinition))
51
            ->share($serviceClass)
52
            ->alias(MessageBusInterface::class, $serviceClass);
53
    }
54
55
    private function factory(Injector $injector, ServiceDefinitionInterface $serviceDefinition): callable
56
    {
57
        return function (
58
            ConnectorMap $connectorMap,
59
            ServiceDefinitionMap $serviceDefinitionMap
60
        ) use (
61
            $injector,
62
            $serviceDefinition
63
        ): object {
64
            $transportMap = $this->buildTransportMap($injector, $serviceDefinition, $connectorMap);
65
            $channelSubs = $this->collectChannelSubscriptions($injector, $serviceDefinitionMap, $transportMap);
66
            $channels = [];
67
            foreach ($channelSubs as $channelKey => $subscriptions) {
68
                Assertion::keyNotExists($channels, $channelKey, "Channel '$channelKey' is already defined.");
69
                $channels[$channelKey] = new Channel($channelKey, new SubscriptionMap($subscriptions));
70
            }
71
            $serviceClass = $serviceDefinition->getServiceClass();
72
            return new $serviceClass(new ChannelMap($channels));
73
        };
74
    }
75
76
    private function buildTransportMap(
77
        Injector $injector,
78
        ServiceDefinitionInterface $serviceDefinition,
79
        ConnectorMap $connectorMap
80
    ): TransportMap {
81
        $transports = [];
82
        $settings = $serviceDefinition->getSettings();
83
        foreach ($settings['transports'] as $transportKey => $transportConfig) {
84
            Assertion::keyNotExists($transports, $transportKey, "Transport '$transportKey' is already defined.");
85
            $arguments = [':key' => $transportKey];
86
            if (isset($transportConfig['dependencies']['connector'])) {
87
                $arguments[':connector'] = $connectorMap->get($transportConfig['dependencies']['connector']);
88
            }
89
            $transports[$transportKey] = $injector->make($transportConfig['class'], $arguments);
90
        }
91
        return new TransportMap($transports);
92
    }
93
94
    private function collectChannelSubscriptions(
95
        Injector $injector,
96
        ServiceDefinitionMap $serviceDefinitionMap,
97
        TransportMap $transportMap
98
    ): array {
99
        $channelSubs = [self::COMMANDS_CHANNEL => [], self::COMMITS_CHANNEL => [], self::EVENTS_CHANNEL => []];
100
        foreach ($serviceDefinitionMap as $serviceDefinition) {
101
            $this->registerServiceSubs($injector, $serviceDefinition, $transportMap, $channelSubs);
102
        }
103
        return $channelSubs;
104
    }
105
106
    private function registerServiceSubs(
107
        Injector $injector,
108
        ServiceDefinitionInterface $serviceDefinition,
109
        TransportMap $transportMap,
110
        array &$channelSubs
111
    ): void {
112
        foreach ($serviceDefinition->getSubscriptions() as $subscriptionKey => $subscriptionConfig) {
113
            $channelKey = $subscriptionConfig['channel'];
114
            Assertion::keyNotExists(
115
                $channelSubs[$channelKey],
116
                $subscriptionKey,
117
                "Subscription '$subscriptionKey' is already defined on channel '$channelKey'."
118
            );
119
            $transportKey = $subscriptionConfig['transport'];
120
            if (!$transportMap->has($transportKey)) {
121
                throw new RuntimeException("Message bus transport '$transportKey' has not been configured.");
122
            }
123
            $channelSubs[$channelKey][$subscriptionKey] = $this->buildLazySubscription(
124
                $injector,
125
                $serviceDefinition->getServiceClass(),
126
                $subscriptionKey,
127
                $subscriptionConfig,
128
                $transportMap
129
            );
130
        }
131
    }
132
133
    private function buildLazySubscription(
134
        Injector $injector,
135
        string $serviceFqcn,
136
        string $subscriptionKey,
137
        array $subscriptionConfig,
138
        TransportMap $transportMap
139
    ): LazySubscription {
140
        $transportKey = $subscriptionConfig['transport'];
141
        $guards = (array)($subscriptionConfig['guards'] ?? []);
142
        return new LazySubscription(
143
            $subscriptionKey,
144
            /**
145
             * @psalm-suppress InvalidNullableReturnType
146
             * @psalm-suppress NullableReturnStatement
147
             */
148
            fn(): TransportInterface => $transportMap->get($transportKey),
149
            fn(): MessageHandlerList => new MessageHandlerList([$injector->make($serviceFqcn)]),
150
            function (EnvelopeInterface $envelope) use ($guards): bool {
151
                $message = $envelope->getMessage();
152
                $interfaces = class_implements($message);
153
                foreach ($guards as $guard) {
154
                    if ($message instanceof $guard || isset($interfaces[$guard])) {
155
                        return true;
156
                    }
157
                }
158
                return false;
159
            },
160
            function () use ($injector, $subscriptionConfig): MetadataEnricherList {
161
                $enrichers = [];
162
                foreach ($subscriptionConfig['enrichers'] ?? [] as $enricherConfig) {
163
                    $enrichers[] = $injector->make(
164
                        $enricherConfig['class'],
165
                        [':settings' => $enricherConfig['settings'] ?? []]
166
                    );
167
                }
168
                return new MetadataEnricherList($enrichers);
169
            }
170
        );
171
    }
172
}
173