1
|
|
|
<?php declare(strict_types=1); |
2
|
|
|
/** |
3
|
|
|
* This file is part of the daikon-cqrs/boot project. |
4
|
|
|
* |
5
|
|
|
* For the full copyright and license information, please view the LICENSE |
6
|
|
|
* file that was distributed with this source code. |
7
|
|
|
*/ |
8
|
|
|
|
9
|
|
|
namespace Daikon\Boot\Service\Provisioner; |
10
|
|
|
|
11
|
|
|
use Auryn\Injector; |
12
|
|
|
use Daikon\Config\ConfigProviderInterface; |
13
|
|
|
use Daikon\Boot\Service\ServiceDefinitionInterface; |
14
|
|
|
use Daikon\Boot\Service\ServiceDefinitionMap; |
15
|
|
|
use Daikon\Dbal\Connector\ConnectorMap; |
16
|
|
|
use Daikon\Interop\Assertion; |
17
|
|
|
use Daikon\Interop\RuntimeException; |
18
|
|
|
use Daikon\MessageBus\EnvelopeInterface; |
19
|
|
|
use Daikon\MessageBus\Channel\Channel; |
20
|
|
|
use Daikon\MessageBus\Channel\ChannelMap; |
21
|
|
|
use Daikon\MessageBus\Channel\Subscription\LazySubscription; |
22
|
|
|
use Daikon\MessageBus\Channel\Subscription\MessageHandler\MessageHandlerList; |
23
|
|
|
use Daikon\MessageBus\Channel\Subscription\SubscriptionMap; |
24
|
|
|
use Daikon\MessageBus\Channel\Subscription\Transport\TransportInterface; |
25
|
|
|
use Daikon\MessageBus\Channel\Subscription\Transport\TransportMap; |
26
|
|
|
use Daikon\MessageBus\MessageBusInterface; |
27
|
|
|
use Daikon\Metadata\MetadataEnricherList; |
28
|
|
|
|
29
|
|
|
final class MessageBusProvisioner implements ProvisionerInterface |
30
|
|
|
{ |
31
|
|
|
public const COMMANDS_CHANNEL = 'commands'; |
32
|
|
|
|
33
|
|
|
public const COMMITS_CHANNEL = 'commits'; |
34
|
|
|
|
35
|
|
|
public const EVENTS_CHANNEL = 'events'; |
36
|
|
|
|
37
|
|
|
public function provision( |
38
|
|
|
Injector $injector, |
39
|
|
|
ConfigProviderInterface $configProvider, |
40
|
|
|
ServiceDefinitionInterface $serviceDefinition |
41
|
|
|
): void { |
42
|
|
|
$serviceClass = $serviceDefinition->getServiceClass(); |
43
|
|
|
$settings = $serviceDefinition->getSettings(); |
44
|
|
|
|
45
|
|
|
if (!isset($settings['transports'])) { |
46
|
|
|
throw new RuntimeException('Message bus transports configuration is required.'); |
47
|
|
|
} |
48
|
|
|
|
49
|
|
|
$injector |
50
|
|
|
->delegate($serviceClass, $this->factory($injector, $serviceDefinition)) |
51
|
|
|
->share($serviceClass) |
52
|
|
|
->alias(MessageBusInterface::class, $serviceClass); |
53
|
|
|
} |
54
|
|
|
|
55
|
|
|
private function factory(Injector $injector, ServiceDefinitionInterface $serviceDefinition): callable |
56
|
|
|
{ |
57
|
|
|
return function ( |
58
|
|
|
ConnectorMap $connectorMap, |
59
|
|
|
ServiceDefinitionMap $serviceDefinitionMap |
60
|
|
|
) use ( |
61
|
|
|
$injector, |
62
|
|
|
$serviceDefinition |
63
|
|
|
): object { |
64
|
|
|
$transportMap = $this->buildTransportMap($injector, $serviceDefinition, $connectorMap); |
65
|
|
|
$channelSubs = $this->collectChannelSubscriptions($injector, $serviceDefinitionMap, $transportMap); |
66
|
|
|
$channels = []; |
67
|
|
|
foreach ($channelSubs as $channelKey => $subscriptions) { |
68
|
|
|
Assertion::keyNotExists($channels, $channelKey, "Channel '$channelKey' is already defined."); |
69
|
|
|
$channels[$channelKey] = new Channel($channelKey, new SubscriptionMap($subscriptions)); |
70
|
|
|
} |
71
|
|
|
$serviceClass = $serviceDefinition->getServiceClass(); |
72
|
|
|
return new $serviceClass(new ChannelMap($channels)); |
73
|
|
|
}; |
74
|
|
|
} |
75
|
|
|
|
76
|
|
|
private function buildTransportMap( |
77
|
|
|
Injector $injector, |
78
|
|
|
ServiceDefinitionInterface $serviceDefinition, |
79
|
|
|
ConnectorMap $connectorMap |
80
|
|
|
): TransportMap { |
81
|
|
|
$transports = []; |
82
|
|
|
$settings = $serviceDefinition->getSettings(); |
83
|
|
|
foreach ($settings['transports'] as $transportKey => $transportConfig) { |
84
|
|
|
Assertion::keyNotExists($transports, $transportKey, "Transport '$transportKey' is already defined."); |
85
|
|
|
$arguments = [':key' => $transportKey]; |
86
|
|
|
if (isset($transportConfig['dependencies']['connector'])) { |
87
|
|
|
$arguments[':connector'] = $connectorMap->get($transportConfig['dependencies']['connector']); |
88
|
|
|
} |
89
|
|
|
$transports[$transportKey] = $injector->make($transportConfig['class'], $arguments); |
90
|
|
|
} |
91
|
|
|
return new TransportMap($transports); |
92
|
|
|
} |
93
|
|
|
|
94
|
|
|
private function collectChannelSubscriptions( |
95
|
|
|
Injector $injector, |
96
|
|
|
ServiceDefinitionMap $serviceDefinitionMap, |
97
|
|
|
TransportMap $transportMap |
98
|
|
|
): array { |
99
|
|
|
$channelSubs = [self::COMMANDS_CHANNEL => [], self::COMMITS_CHANNEL => [], self::EVENTS_CHANNEL => []]; |
100
|
|
|
foreach ($serviceDefinitionMap as $serviceDefinition) { |
101
|
|
|
$this->registerServiceSubs($injector, $serviceDefinition, $transportMap, $channelSubs); |
102
|
|
|
} |
103
|
|
|
return $channelSubs; |
104
|
|
|
} |
105
|
|
|
|
106
|
|
|
private function registerServiceSubs( |
107
|
|
|
Injector $injector, |
108
|
|
|
ServiceDefinitionInterface $serviceDefinition, |
109
|
|
|
TransportMap $transportMap, |
110
|
|
|
array &$channelSubs |
111
|
|
|
): void { |
112
|
|
|
foreach ($serviceDefinition->getSubscriptions() as $subscriptionKey => $subscriptionConfig) { |
113
|
|
|
$channelKey = $subscriptionConfig['channel']; |
114
|
|
|
Assertion::keyNotExists( |
115
|
|
|
$channelSubs[$channelKey], |
116
|
|
|
$subscriptionKey, |
117
|
|
|
"Subscription '$subscriptionKey' is already defined on channel '$channelKey'." |
118
|
|
|
); |
119
|
|
|
$transportKey = $subscriptionConfig['transport']; |
120
|
|
|
if (!$transportMap->has($transportKey)) { |
121
|
|
|
throw new RuntimeException("Message bus transport '$transportKey' has not been configured."); |
122
|
|
|
} |
123
|
|
|
$channelSubs[$channelKey][$subscriptionKey] = $this->buildLazySubscription( |
124
|
|
|
$injector, |
125
|
|
|
$serviceDefinition->getServiceClass(), |
126
|
|
|
$subscriptionKey, |
127
|
|
|
$subscriptionConfig, |
128
|
|
|
$transportMap |
129
|
|
|
); |
130
|
|
|
} |
131
|
|
|
} |
132
|
|
|
|
133
|
|
|
private function buildLazySubscription( |
134
|
|
|
Injector $injector, |
135
|
|
|
string $serviceFqcn, |
136
|
|
|
string $subscriptionKey, |
137
|
|
|
array $subscriptionConfig, |
138
|
|
|
TransportMap $transportMap |
139
|
|
|
): LazySubscription { |
140
|
|
|
$transportKey = $subscriptionConfig['transport']; |
141
|
|
|
$guards = (array)($subscriptionConfig['guards'] ?? []); |
142
|
|
|
return new LazySubscription( |
143
|
|
|
$subscriptionKey, |
144
|
|
|
/** |
145
|
|
|
* @psalm-suppress InvalidNullableReturnType |
146
|
|
|
* @psalm-suppress NullableReturnStatement |
147
|
|
|
*/ |
148
|
|
|
fn(): TransportInterface => $transportMap->get($transportKey), |
149
|
|
|
fn(): MessageHandlerList => new MessageHandlerList([$injector->make($serviceFqcn)]), |
150
|
|
|
function (EnvelopeInterface $envelope) use ($guards): bool { |
151
|
|
|
$message = $envelope->getMessage(); |
152
|
|
|
$interfaces = class_implements($message); |
153
|
|
|
foreach ($guards as $guard) { |
154
|
|
|
if ($message instanceof $guard || isset($interfaces[$guard])) { |
155
|
|
|
return true; |
156
|
|
|
} |
157
|
|
|
} |
158
|
|
|
return false; |
159
|
|
|
}, |
160
|
|
|
function () use ($injector, $subscriptionConfig): MetadataEnricherList { |
161
|
|
|
$enrichers = []; |
162
|
|
|
foreach ($subscriptionConfig['enrichers'] ?? [] as $enricherConfig) { |
163
|
|
|
$enrichers[] = $injector->make( |
164
|
|
|
$enricherConfig['class'], |
165
|
|
|
[':settings' => $enricherConfig['settings'] ?? []] |
166
|
|
|
); |
167
|
|
|
} |
168
|
|
|
return new MetadataEnricherList($enrichers); |
169
|
|
|
} |
170
|
|
|
); |
171
|
|
|
} |
172
|
|
|
} |
173
|
|
|
|