1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace BenTools\MercurePHP\Controller; |
4
|
|
|
|
5
|
|
|
use BenTools\MercurePHP\Configuration\Configuration; |
6
|
|
|
use BenTools\MercurePHP\Exception\Http\AccessDeniedHttpException; |
7
|
|
|
use BenTools\MercurePHP\Exception\Http\BadRequestHttpException; |
8
|
|
|
use BenTools\MercurePHP\Helpers\QueryStringParser; |
9
|
|
|
use BenTools\MercurePHP\Hub\Hub; |
10
|
|
|
use BenTools\MercurePHP\Model\Subscription; |
11
|
|
|
use BenTools\MercurePHP\Security\Authenticator; |
12
|
|
|
use BenTools\MercurePHP\Security\TopicMatcher; |
13
|
|
|
use BenTools\MercurePHP\Model\Message; |
14
|
|
|
use Lcobucci\JWT\Token; |
15
|
|
|
use Psr\Http\Message\RequestInterface; |
16
|
|
|
use Psr\Http\Message\ResponseInterface; |
17
|
|
|
use Psr\Http\Message\ServerRequestInterface as Request; |
18
|
|
|
use Psr\Log\LoggerInterface; |
19
|
|
|
use React\EventLoop\Factory; |
20
|
|
|
use React\EventLoop\LoopInterface; |
21
|
|
|
use React\Http\Message\Response; |
22
|
|
|
use React\Promise\PromiseInterface; |
23
|
|
|
use React\Stream\ThroughStream; |
24
|
|
|
use React\Stream\WritableStreamInterface as Stream; |
25
|
|
|
|
26
|
|
|
use function BenTools\MercurePHP\nullify; |
27
|
|
|
use function BenTools\QueryString\query_string; |
28
|
|
|
use function React\Promise\all; |
29
|
|
|
use function React\Promise\resolve; |
30
|
|
|
|
31
|
|
|
final class SubscribeController extends AbstractController |
32
|
|
|
{ |
33
|
|
|
private Authenticator $authenticator; |
34
|
9 |
|
private LoopInterface $loop; |
35
|
|
|
private QueryStringParser $queryStringParser; |
36
|
9 |
|
private bool $allowAnonymous; |
37
|
9 |
|
/** |
38
|
9 |
|
* @var Hub |
39
|
9 |
|
*/ |
40
|
9 |
|
private Hub $hub; |
41
|
9 |
|
|
42
|
|
|
public function __construct( |
43
|
7 |
|
array $config, |
44
|
|
|
Hub $hub, |
45
|
7 |
|
Authenticator $authenticator, |
46
|
|
|
?LoopInterface $loop = null, |
47
|
2 |
|
?LoggerInterface $logger = null |
48
|
|
|
) { |
49
|
|
|
$this->config = $config; |
50
|
|
|
$this->hub = $hub; |
51
|
2 |
|
$this->allowAnonymous = $config[Configuration::ALLOW_ANONYMOUS]; |
52
|
|
|
$this->authenticator = $authenticator; |
53
|
2 |
|
$this->queryStringParser = new QueryStringParser(); |
54
|
2 |
|
$this->loop = $loop ?? Factory::create(); |
55
|
2 |
|
$this->logger = $logger; |
|
|
|
|
56
|
2 |
|
} |
57
|
2 |
|
|
58
|
|
|
public function __invoke(Request $request): ResponseInterface |
59
|
|
|
{ |
60
|
2 |
|
$request = $this->withAttributes($request); |
61
|
|
|
|
62
|
|
|
if ('OPTIONS' === $request->getMethod()) { |
63
|
2 |
|
return new Response(200); |
64
|
|
|
} |
65
|
|
|
|
66
|
|
|
$stream = new ThroughStream(); |
67
|
2 |
|
|
68
|
|
|
$lastEventID = $request->getAttribute('lastEventId'); |
69
|
|
|
$subscribedTopics = $request->getAttribute('subscribedTopics'); |
70
|
5 |
|
$this->loop |
71
|
|
|
->futureTick( |
72
|
5 |
|
fn() => $this->hub->dispatchSubscriptions($request->getAttribute('subscriptions')) |
73
|
5 |
|
->then(fn() => $this->hub->fetchMissedMessages($lastEventID, $subscribedTopics)) |
74
|
|
|
->then(fn(iterable $messages) => $this->sendMissedMessages($messages, $request, $stream)) |
75
|
|
|
->then(fn() => $this->subscribe($request, $stream)) |
76
|
7 |
|
); |
77
|
|
|
|
78
|
|
|
$headers = [ |
79
|
7 |
|
'Content-Type' => 'text/event-stream', |
80
|
2 |
|
'Cache-Control' => 'no-cache', |
81
|
2 |
|
]; |
82
|
|
|
|
83
|
|
|
return new Response(200, $headers, $stream); |
84
|
5 |
|
} |
85
|
2 |
|
|
86
|
|
|
public function matchRequest(RequestInterface $request): bool |
87
|
|
|
{ |
88
|
3 |
|
return \in_array($request->getMethod(), ['GET', 'OPTIONS'], true) |
89
|
3 |
|
&& '/.well-known/mercure' === $request->getUri()->getPath(); |
90
|
|
|
} |
91
|
3 |
|
|
92
|
1 |
|
private function withAttributes(Request $request): Request |
93
|
|
|
{ |
94
|
|
|
try { |
95
|
|
|
$token = $this->authenticator->authenticate($request); |
96
|
2 |
|
} catch (\RuntimeException $e) { |
97
|
2 |
|
throw new AccessDeniedHttpException($e->getMessage()); |
98
|
2 |
|
} |
99
|
2 |
|
|
100
|
|
|
if (null === $token && false === $this->allowAnonymous) { |
101
|
|
|
throw new AccessDeniedHttpException('Anonymous subscriptions are not allowed on this hub.', 401); |
102
|
2 |
|
} |
103
|
|
|
|
104
|
|
|
$qs = query_string($request->getUri(), $this->queryStringParser); |
105
|
|
|
$subscribedTopics = \array_map('\\urldecode', $qs->getParam('topic') ?? []); |
106
|
|
|
|
107
|
|
|
if ([] === $subscribedTopics) { |
108
|
|
|
throw new BadRequestHttpException('Missing "topic" parameter.'); |
109
|
|
|
} |
110
|
|
|
|
111
|
|
|
$subscriptions = $this->createSubscriptions( |
112
|
|
|
$subscribedTopics, |
113
|
|
|
$request->getAttribute('clientId'), |
114
|
|
|
$token |
115
|
|
|
); |
116
|
|
|
|
117
|
|
|
$request = $request |
118
|
|
|
->withQueryParams($qs->getParams()) |
119
|
|
|
->withAttribute('token', $token) |
120
|
|
|
->withAttribute('subscribedTopics', $subscribedTopics) |
121
|
|
|
->withAttribute('lastEventId', $this->getLastEventID($request, $qs->getParams())) |
122
|
|
|
->withAttribute('subscriptions', $subscriptions ?? []) |
123
|
|
|
; |
124
|
|
|
|
125
|
|
|
return $request; |
126
|
|
|
} |
127
|
|
|
|
128
|
|
|
private function createSubscriptions(array $subscribedTopics, string $clientId, ?Token $token): array |
129
|
|
|
{ |
130
|
|
|
if (false === $this->config[Configuration::SUBSCRIPTIONS]) { |
131
|
|
|
return []; |
132
|
|
|
} |
133
|
|
|
|
134
|
|
|
if (null !== $token) { |
135
|
|
|
$payload = $token->getClaim('mercure')->payload ?? null; |
136
|
|
|
} |
137
|
|
|
|
138
|
|
|
$subscriptions = []; |
139
|
|
|
foreach ($subscribedTopics as $subscribedTopic) { |
140
|
|
|
$id = \sprintf('/.well-known/mercure/subscriptions/%s/%s', \urlencode($subscribedTopic), $clientId); |
141
|
|
|
$subscriptions[] = new Subscription( |
142
|
|
|
$id, |
143
|
|
|
$clientId, |
144
|
|
|
$subscribedTopic, |
145
|
|
|
true, |
146
|
|
|
$payload ?? null, |
147
|
|
|
); |
148
|
|
|
} |
149
|
|
|
|
150
|
|
|
return $subscriptions; |
151
|
|
|
} |
152
|
|
|
|
153
|
|
|
private function subscribe(Request $request, Stream $stream): PromiseInterface |
154
|
|
|
{ |
155
|
|
|
$subscribedTopics = $request->getAttribute('subscribedTopics'); |
156
|
|
|
$token = $request->getAttribute('token'); |
157
|
|
|
$subscriber = $request->getAttribute('clientId'); |
158
|
|
|
$promises = []; |
159
|
|
|
foreach ($subscribedTopics as $topicSelector) { |
160
|
|
|
$promises[] = $this->hub->subscribe( |
161
|
|
|
$subscriber, |
162
|
|
|
$topicSelector, |
163
|
|
|
$token, |
164
|
|
|
fn(string $topic, Message $message) => $this->sendIfAllowed($topic, $message, $request, $stream) |
165
|
|
|
); |
166
|
|
|
} |
167
|
|
|
|
168
|
|
|
if ([] === $promises) { |
169
|
|
|
return resolve(true); |
170
|
|
|
} |
171
|
|
|
|
172
|
|
|
return all($promises); |
173
|
|
|
} |
174
|
|
|
|
175
|
|
|
private function sendMissedMessages(iterable $messages, Request $request, Stream $stream): PromiseInterface |
176
|
|
|
{ |
177
|
|
|
$promises = []; |
178
|
2 |
|
foreach ($messages as $topic => $message) { |
179
|
|
|
$promises[] = $this->sendIfAllowed($topic, $message, $request, $stream); |
180
|
2 |
|
} |
181
|
2 |
|
|
182
|
2 |
|
if ([] === $promises) { |
183
|
2 |
|
return resolve(true); |
184
|
|
|
} |
185
|
|
|
|
186
|
|
|
return all($promises); |
187
|
|
|
} |
188
|
|
|
|
189
|
|
|
private function sendIfAllowed(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface |
190
|
|
|
{ |
191
|
|
|
$subscribedTopics = $request->getAttribute('subscribedTopics'); |
192
|
|
|
$token = $request->getAttribute('token'); |
193
|
|
|
if (!TopicMatcher::canReceiveUpdate($topic, $message, $subscribedTopics, $token, $this->allowAnonymous)) { |
194
|
|
|
return resolve(false); |
195
|
|
|
} |
196
|
|
|
|
197
|
|
|
return resolve($this->send($topic, $message, $request, $stream)); |
198
|
|
|
} |
199
|
|
|
|
200
|
|
|
private function send(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface |
201
|
|
|
{ |
202
|
|
|
$stream->write((string) $message); |
203
|
|
|
$clientId = $request->getAttribute('clientId'); |
204
|
|
|
$id = $message->getId(); |
205
|
|
|
$this->logger()->debug("Dispatched message {$id} to client {$clientId} on topic {$topic}"); |
206
|
|
|
|
207
|
|
|
return resolve(true); |
208
|
|
|
} |
209
|
|
|
|
210
|
|
|
private function getLastEventID(Request $request, array $queryParams): ?string |
211
|
|
|
{ |
212
|
|
|
return nullify($request->getHeaderLine('Last-Event-ID')) |
213
|
|
|
?? nullify($queryParams['Last-Event-ID'] ?? null) |
214
|
|
|
?? nullify($queryParams['Last-Event-Id'] ?? null) |
215
|
|
|
?? nullify($queryParams['last-event-id'] ?? null); |
216
|
|
|
} |
217
|
|
|
} |
218
|
|
|
|
This property has been deprecated. The supplier of the class has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.