1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace BenTools\MercurePHP\Controller; |
4
|
|
|
|
5
|
|
|
use BenTools\MercurePHP\Configuration\Configuration; |
6
|
|
|
use BenTools\MercurePHP\Exception\Http\AccessDeniedHttpException; |
7
|
|
|
use BenTools\MercurePHP\Exception\Http\BadRequestHttpException; |
8
|
|
|
use BenTools\MercurePHP\Helpers\QueryStringParser; |
9
|
|
|
use BenTools\MercurePHP\Model\Subscription; |
10
|
|
|
use BenTools\MercurePHP\Security\Authenticator; |
11
|
|
|
use BenTools\MercurePHP\Security\TopicMatcher; |
12
|
|
|
use BenTools\MercurePHP\Model\Message; |
13
|
|
|
use BenTools\MercurePHP\Storage\StorageInterface; |
14
|
|
|
use BenTools\MercurePHP\Transport\TransportInterface; |
15
|
|
|
use Lcobucci\JWT\Token; |
16
|
|
|
use Psr\Http\Message\RequestInterface; |
17
|
|
|
use Psr\Http\Message\ResponseInterface; |
18
|
|
|
use Psr\Http\Message\ServerRequestInterface as Request; |
19
|
|
|
use Psr\Log\LoggerInterface; |
20
|
|
|
use Ramsey\Uuid\Uuid; |
21
|
|
|
use React\EventLoop\Factory; |
22
|
|
|
use React\EventLoop\LoopInterface; |
23
|
|
|
use React\Http\Message\Response; |
24
|
|
|
use React\Promise\PromiseInterface; |
25
|
|
|
use React\Stream\ThroughStream; |
26
|
|
|
use React\Stream\WritableStreamInterface as Stream; |
27
|
|
|
|
28
|
|
|
use function BenTools\MercurePHP\nullify; |
29
|
|
|
use function BenTools\QueryString\query_string; |
30
|
|
|
use function React\Promise\all; |
31
|
|
|
use function React\Promise\any; |
32
|
|
|
use function React\Promise\resolve; |
33
|
|
|
|
34
|
9 |
|
final class SubscribeController extends AbstractController |
35
|
|
|
{ |
36
|
9 |
|
private Authenticator $authenticator; |
37
|
9 |
|
private LoopInterface $loop; |
38
|
9 |
|
private QueryStringParser $queryStringParser; |
39
|
9 |
|
private bool $allowAnonymous; |
40
|
9 |
|
|
41
|
9 |
|
public function __construct( |
42
|
|
|
array $config, |
43
|
7 |
|
StorageInterface $storage, |
44
|
|
|
TransportInterface $transport, |
45
|
7 |
|
Authenticator $authenticator, |
46
|
|
|
?LoopInterface $loop = null, |
47
|
2 |
|
?LoggerInterface $logger = null |
48
|
|
|
) { |
49
|
|
|
$this->config = $config; |
50
|
|
|
$this->storage = $storage; |
51
|
2 |
|
$this->transport = $transport; |
52
|
|
|
$this->allowAnonymous = $config[Configuration::ALLOW_ANONYMOUS]; |
53
|
2 |
|
$this->authenticator = $authenticator; |
54
|
2 |
|
$this->queryStringParser = new QueryStringParser(); |
55
|
2 |
|
$this->loop = $loop ?? Factory::create(); |
56
|
2 |
|
$this->logger = $logger; |
|
|
|
|
57
|
2 |
|
} |
58
|
|
|
|
59
|
|
|
public function __invoke(Request $request): ResponseInterface |
60
|
2 |
|
{ |
61
|
|
|
$request = $this->withAttributes($request); |
62
|
|
|
|
63
|
2 |
|
if ('OPTIONS' === $request->getMethod()) { |
64
|
|
|
return new Response(200); |
65
|
|
|
} |
66
|
|
|
|
67
|
2 |
|
$stream = new ThroughStream(); |
68
|
|
|
|
69
|
|
|
$lastEventID = $request->getAttribute('lastEventId'); |
70
|
5 |
|
$subscribedTopics = $request->getAttribute('subscribedTopics'); |
71
|
|
|
$this->loop |
72
|
5 |
|
->futureTick( |
73
|
5 |
|
fn() => $this->dispatchSubscriptions($request->getAttribute('subscriptions')) |
74
|
|
|
->then(fn() => $this->fetchMissedMessages($lastEventID, $subscribedTopics)) |
75
|
|
|
->then(fn(iterable $messages) => $this->sendMissedMessages($messages, $request, $stream)) |
76
|
7 |
|
->then(fn() => $this->subscribe($request, $stream)) |
77
|
|
|
); |
78
|
|
|
|
79
|
7 |
|
$headers = [ |
80
|
2 |
|
'Content-Type' => 'text/event-stream', |
81
|
2 |
|
'Cache-Control' => 'no-cache', |
82
|
|
|
]; |
83
|
|
|
|
84
|
5 |
|
return new Response(200, $headers, $stream); |
85
|
2 |
|
} |
86
|
|
|
|
87
|
|
|
public function matchRequest(RequestInterface $request): bool |
88
|
3 |
|
{ |
89
|
3 |
|
return \in_array($request->getMethod(), ['GET', 'OPTIONS'], true) |
90
|
|
|
&& '/.well-known/mercure' === $request->getUri()->getPath(); |
91
|
3 |
|
} |
92
|
1 |
|
|
93
|
|
|
private function withAttributes(Request $request): Request |
94
|
|
|
{ |
95
|
|
|
try { |
96
|
2 |
|
$token = $this->authenticator->authenticate($request); |
97
|
2 |
|
} catch (\RuntimeException $e) { |
98
|
2 |
|
throw new AccessDeniedHttpException($e->getMessage()); |
99
|
2 |
|
} |
100
|
|
|
|
101
|
|
|
if (null === $token && false === $this->allowAnonymous) { |
102
|
2 |
|
throw new AccessDeniedHttpException('Anonymous subscriptions are not allowed on this hub.', 401); |
103
|
|
|
} |
104
|
|
|
|
105
|
|
|
$qs = query_string($request->getUri(), $this->queryStringParser); |
106
|
|
|
$subscribedTopics = \array_map('\\urldecode', $qs->getParam('topic') ?? []); |
107
|
|
|
|
108
|
|
|
if ([] === $subscribedTopics) { |
109
|
|
|
throw new BadRequestHttpException('Missing "topic" parameter.'); |
110
|
|
|
} |
111
|
|
|
|
112
|
|
|
$subscriptions = $this->createSubscriptions( |
113
|
|
|
$subscribedTopics, |
114
|
|
|
$request->getAttribute('clientId'), |
115
|
|
|
$token |
116
|
|
|
); |
117
|
|
|
|
118
|
|
|
$request = $request |
119
|
|
|
->withQueryParams($qs->getParams()) |
120
|
|
|
->withAttribute('token', $token) |
121
|
|
|
->withAttribute('subscribedTopics', $subscribedTopics) |
122
|
|
|
->withAttribute('lastEventId', $this->getLastEventID($request, $qs->getParams())) |
123
|
|
|
->withAttribute('subscriptions', $subscriptions) |
124
|
|
|
; |
125
|
|
|
|
126
|
|
|
return $request; |
127
|
|
|
} |
128
|
|
|
|
129
|
|
|
private function createSubscriptions(array $subscribedTopics, string $clientId, ?Token $token): array |
130
|
|
|
{ |
131
|
|
|
if (false === $this->config[Configuration::SUBSCRIPTIONS]) { |
132
|
|
|
return []; |
133
|
|
|
} |
134
|
|
|
|
135
|
|
|
$subscriptions = []; |
136
|
|
|
foreach ($subscribedTopics as $subscribedTopic) { |
137
|
|
|
$id = \sprintf('/.well-known/mercure/subscriptions/%s/%s', \urlencode($subscribedTopic), $clientId); |
138
|
|
|
try { |
139
|
|
|
$payload = null !== $token ? ($token->getClaim('mercure')['payload'] ?? null) : null; |
140
|
|
|
} catch (\Exception $e) { |
141
|
|
|
$payload = null; |
142
|
|
|
} |
143
|
|
|
$subscriptions[] = new Subscription( |
144
|
|
|
$id, |
145
|
|
|
$clientId, |
146
|
|
|
$subscribedTopic, |
147
|
|
|
true, |
148
|
|
|
$payload, |
149
|
|
|
); |
150
|
|
|
} |
151
|
|
|
|
152
|
|
|
return $subscriptions; |
153
|
|
|
} |
154
|
|
|
|
155
|
|
|
/** |
156
|
|
|
* @param Subscription[] $subscriptions |
157
|
|
|
*/ |
158
|
|
|
private function dispatchSubscriptions(array $subscriptions): PromiseInterface |
159
|
|
|
{ |
160
|
|
|
$promises = [$this->storage->storeSubscriptions($subscriptions)]; |
|
|
|
|
161
|
|
|
foreach ($subscriptions as $subscription) { |
162
|
|
|
$message = new Message( |
163
|
|
|
(string) Uuid::uuid4(), |
164
|
|
|
\json_encode($subscription, \JSON_THROW_ON_ERROR), |
165
|
|
|
true, |
166
|
|
|
); |
167
|
|
|
$topic = $subscription->getId(); |
168
|
|
|
$promises[] = $this->transport->publish($topic, $message); |
|
|
|
|
169
|
|
|
} |
170
|
|
|
|
171
|
|
|
return any($promises); |
172
|
|
|
} |
173
|
|
|
|
174
|
|
|
private function subscribe(Request $request, Stream $stream): PromiseInterface |
175
|
|
|
{ |
176
|
|
|
$subscribedTopics = $request->getAttribute('subscribedTopics'); |
177
|
|
|
$token = $request->getAttribute('token'); |
178
|
2 |
|
$promises = []; |
179
|
|
|
foreach ($subscribedTopics as $topicSelector) { |
180
|
2 |
|
if (!TopicMatcher::canSubscribeToTopic($topicSelector, $token, $this->allowAnonymous)) { |
181
|
2 |
|
$clientId = $request->getAttribute('clientId'); |
182
|
2 |
|
$this->logger()->debug("Client {$clientId} cannot subscribe to {$topicSelector}"); |
183
|
2 |
|
continue; |
184
|
|
|
} |
185
|
|
|
$promises[] = $this->transport |
186
|
|
|
->subscribe( |
187
|
|
|
$topicSelector, |
188
|
|
|
fn(string $topic, Message $message) => $this->sendIfAllowed($topic, $message, $request, $stream) |
189
|
|
|
) |
190
|
|
|
->then(function (string $topic) use ($request) { |
191
|
|
|
$clientId = $request->getAttribute('clientId'); |
192
|
|
|
$this->logger()->debug("Client {$clientId} subscribed to {$topic}"); |
193
|
|
|
}); |
194
|
|
|
} |
195
|
|
|
|
196
|
|
|
if ([] === $promises) { |
197
|
|
|
return resolve(true); |
198
|
|
|
} |
199
|
|
|
|
200
|
|
|
return all($promises); |
201
|
|
|
} |
202
|
|
|
|
203
|
|
|
private function fetchMissedMessages(?string $lastEventID, array $subscribedTopics): PromiseInterface |
204
|
|
|
{ |
205
|
|
|
if (null === $lastEventID) { |
206
|
|
|
return resolve([]); |
207
|
|
|
} |
208
|
|
|
|
209
|
|
|
return $this->storage->retrieveMessagesAfterId($lastEventID, $subscribedTopics); |
210
|
|
|
} |
211
|
|
|
|
212
|
|
|
private function sendMissedMessages(iterable $messages, Request $request, Stream $stream): PromiseInterface |
213
|
|
|
{ |
214
|
|
|
$promises = []; |
215
|
|
|
foreach ($messages as $topic => $message) { |
216
|
|
|
$promises[] = $this->sendIfAllowed($topic, $message, $request, $stream); |
217
|
|
|
} |
218
|
|
|
|
219
|
|
|
if ([] === $promises) { |
220
|
|
|
return resolve(true); |
221
|
|
|
} |
222
|
|
|
|
223
|
|
|
return all($promises); |
224
|
|
|
} |
225
|
|
|
|
226
|
|
|
private function sendIfAllowed(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface |
227
|
|
|
{ |
228
|
|
|
$subscribedTopics = $request->getAttribute('subscribedTopics'); |
229
|
|
|
$token = $request->getAttribute('token'); |
230
|
|
|
if (!TopicMatcher::canReceiveUpdate($topic, $message, $subscribedTopics, $token, $this->allowAnonymous)) { |
231
|
|
|
return resolve(false); |
232
|
|
|
} |
233
|
|
|
|
234
|
|
|
return resolve($this->send($topic, $message, $request, $stream)); |
235
|
|
|
} |
236
|
|
|
|
237
|
|
|
private function send(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface |
238
|
|
|
{ |
239
|
|
|
$stream->write((string) $message); |
240
|
|
|
$clientId = $request->getAttribute('clientId'); |
241
|
|
|
$id = $message->getId(); |
242
|
|
|
$this->logger()->debug("Dispatched message {$id} to client {$clientId} on topic {$topic}"); |
243
|
|
|
|
244
|
|
|
return resolve(true); |
245
|
|
|
} |
246
|
|
|
|
247
|
|
|
private function getLastEventID(Request $request, array $queryParams): ?string |
248
|
|
|
{ |
249
|
|
|
return nullify($request->getHeaderLine('Last-Event-ID')) |
250
|
|
|
?? nullify($queryParams['Last-Event-ID'] ?? null) |
251
|
|
|
?? nullify($queryParams['Last-Event-Id'] ?? null) |
252
|
|
|
?? nullify($queryParams['last-event-id'] ?? null); |
253
|
|
|
} |
254
|
|
|
} |
255
|
|
|
|
This property has been deprecated. The supplier of the class has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.