1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace BenTools\MercurePHP\Hub; |
4
|
|
|
|
5
|
|
|
use BenTools\MercurePHP\Configuration\Configuration; |
6
|
|
|
use BenTools\MercurePHP\Controller\HealthController; |
7
|
|
|
use BenTools\MercurePHP\Controller\PublishController; |
8
|
|
|
use BenTools\MercurePHP\Controller\SubscribeController; |
9
|
|
|
use BenTools\MercurePHP\Controller\SubscriptionsController; |
10
|
|
|
use BenTools\MercurePHP\Helpers\LoggerAwareTrait; |
11
|
|
|
use BenTools\MercurePHP\Metrics\MetricsHandlerInterface; |
12
|
|
|
use BenTools\MercurePHP\Model\Message; |
13
|
|
|
use BenTools\MercurePHP\Model\Subscription; |
14
|
|
|
use BenTools\MercurePHP\Security\Authenticator; |
15
|
|
|
use BenTools\MercurePHP\Security\CORS; |
16
|
|
|
use BenTools\MercurePHP\Security\TopicMatcher; |
17
|
|
|
use BenTools\MercurePHP\Storage\StorageInterface; |
18
|
|
|
use BenTools\MercurePHP\Transport\TransportInterface; |
19
|
|
|
use Lcobucci\JWT\Token; |
20
|
|
|
use Psr\Http\Message\ResponseInterface; |
21
|
|
|
use Psr\Http\Message\ServerRequestInterface; |
22
|
|
|
use Psr\Http\Server\RequestHandlerInterface; |
23
|
|
|
use Psr\Log\LoggerInterface; |
24
|
|
|
use Psr\Log\NullLogger; |
25
|
|
|
use Ramsey\Uuid\Uuid; |
26
|
|
|
use React\EventLoop\LoopInterface; |
27
|
|
|
use React\Http; |
28
|
|
|
use React\Promise\PromiseInterface; |
29
|
|
|
use React\Socket; |
30
|
|
|
use React\Socket\ConnectionInterface; |
31
|
1 |
|
|
32
|
|
|
use function BenTools\MercurePHP\get_client_id; |
33
|
|
|
use function React\Promise\all; |
34
|
|
|
use function React\Promise\any; |
35
|
|
|
use function React\Promise\resolve; |
36
|
|
|
use function Symfony\Component\String\u; |
37
|
1 |
|
|
38
|
1 |
|
final class Hub implements RequestHandlerInterface |
39
|
1 |
|
{ |
40
|
1 |
|
use LoggerAwareTrait; |
41
|
1 |
|
|
42
|
1 |
|
private array $config; |
43
|
|
|
private LoopInterface $loop; |
44
|
|
|
private TransportInterface $transport; |
45
|
|
|
private StorageInterface $storage; |
46
|
|
|
private RequestHandlerInterface $requestHandler; |
47
|
|
|
private CORS $cors; |
48
|
|
|
private MetricsHandlerInterface $metricsHandler; |
49
|
|
|
private ?int $shutdownSignal; |
50
|
|
|
|
51
|
|
|
public function __construct( |
52
|
|
|
array $config, |
53
|
|
|
LoopInterface $loop, |
54
|
|
|
TransportInterface $transport, |
55
|
|
|
StorageInterface $storage, |
56
|
|
|
MetricsHandlerInterface $metricsHandler, |
57
|
|
|
?LoggerInterface $logger = null |
58
|
|
|
) { |
59
|
|
|
$this->config = $config; |
60
|
|
|
$this->loop = $loop; |
61
|
|
|
$this->transport = $transport; |
62
|
|
|
$this->storage = $storage; |
63
|
|
|
$this->metricsHandler = $metricsHandler; |
64
|
|
|
$this->logger = $logger ?? new NullLogger(); |
|
|
|
|
65
|
|
|
$this->cors = new CORS($config); |
66
|
|
|
|
67
|
|
|
$subscriberAuthenticator = Authenticator::createSubscriberAuthenticator($this->config); |
68
|
|
|
$publisherAuthenticator = Authenticator::createPublisherAuthenticator($this->config); |
69
|
4 |
|
|
70
|
|
|
$controllers = [ |
71
|
4 |
|
new HealthController(), |
72
|
4 |
|
new SubscribeController( |
73
|
4 |
|
$this->config, |
74
|
|
|
$this, |
75
|
|
|
$subscriberAuthenticator, |
76
|
|
|
$this->logger() |
77
|
4 |
|
), |
78
|
|
|
new PublishController($this, $publisherAuthenticator), |
79
|
4 |
|
]; |
80
|
|
|
|
81
|
|
|
if (true === $config[Configuration::SUBSCRIPTIONS]) { |
82
|
|
|
$controllers[] = new SubscriptionsController($this, $subscriberAuthenticator); |
83
|
|
|
} |
84
|
|
|
|
85
|
|
|
$this->requestHandler = new RequestHandler($controllers); |
86
|
|
|
} |
87
|
|
|
|
88
|
|
|
public function run(): void |
89
|
|
|
{ |
90
|
|
|
$localAddress = $this->config[Configuration::ADDR]; |
91
|
|
|
$this->shutdownSignal = null; |
92
|
|
|
$this->metricsHandler->resetUsers($localAddress); |
93
|
|
|
$this->loop->addSignal(SIGINT, function ($signal) { |
94
|
|
|
$this->stop($signal, $this->loop); |
95
|
|
|
}); |
96
|
|
|
$this->loop->addPeriodicTimer( |
97
|
|
|
15, |
98
|
|
|
fn() => $this->metricsHandler->getNbUsers()->then( |
99
|
|
|
function (int $nbUsers) { |
100
|
|
|
$memory = \memory_get_usage(true) / 1024 / 1024; |
101
|
|
|
$this->logger()->debug("Users: {$nbUsers} - Memory: {$memory}MB"); |
102
|
|
|
} |
103
|
|
|
) |
104
|
|
|
); |
105
|
|
|
|
106
|
|
|
$socket = $this->createSocketConnection($localAddress, $this->loop); |
107
|
|
|
$this->serve($localAddress, $socket, $this->loop); |
108
|
|
|
} |
109
|
|
|
|
110
|
|
|
public function hook(callable $callback): void |
111
|
|
|
{ |
112
|
|
|
$this->loop->futureTick($callback); |
113
|
|
|
} |
114
|
|
|
|
115
|
|
|
public function handle(ServerRequestInterface $request): ResponseInterface |
116
|
|
|
{ |
117
|
|
|
return $this->cors->decorateResponse( |
118
|
|
|
$request, |
119
|
|
|
$this->requestHandler->handle($request) |
120
|
|
|
); |
121
|
|
|
} |
122
|
|
|
|
123
|
|
|
public function __invoke(ServerRequestInterface $request): PromiseInterface |
124
|
|
|
{ |
125
|
|
|
return resolve($this->handle($request)); |
126
|
|
|
} |
127
|
|
|
|
128
|
|
|
public function subscribe( |
129
|
|
|
string $subscriber, |
130
|
|
|
string $topicSelector, |
131
|
|
|
?Token $token, |
132
|
|
|
callable $callback |
133
|
|
|
): PromiseInterface { |
134
|
|
|
$allowAnonymous = $this->config[Configuration::ALLOW_ANONYMOUS]; |
135
|
|
|
|
136
|
|
|
if (!TopicMatcher::canSubscribeToTopic($topicSelector, $token, $allowAnonymous)) { |
137
|
|
|
$this->logger()->debug("Client {$subscriber} cannot subscribe to {$topicSelector}"); |
138
|
|
|
|
139
|
|
|
return resolve($topicSelector); |
140
|
|
|
} |
141
|
|
|
|
142
|
|
|
$this->logger()->debug("Client {$subscriber} subscribed to {$topicSelector}"); |
143
|
|
|
return $this->transport->subscribe($topicSelector, $callback); |
144
|
|
|
} |
145
|
|
|
|
146
|
|
|
public function publish(string $topic, Message $message): PromiseInterface |
147
|
|
|
{ |
148
|
|
|
return $this->transport->publish($topic, $message) |
149
|
|
|
->then(fn() => $this->storage->storeMessage($topic, $message)) |
150
|
|
|
->then( |
151
|
|
|
function () use ($topic, $message) { |
152
|
|
|
$this->logger()->debug(\sprintf('Created message %s on topic %s', $message->getId(), $topic)); |
153
|
|
|
} |
154
|
|
|
); |
155
|
|
|
} |
156
|
|
|
|
157
|
|
|
public function dispatchSubscriptions(array $subscriptions): PromiseInterface |
158
|
|
|
{ |
159
|
|
|
return $this->storage->storeSubscriptions($subscriptions) |
160
|
|
|
->then( |
161
|
|
|
function () use ($subscriptions) { |
162
|
|
|
$promises = []; |
163
|
|
|
foreach ($subscriptions as $subscription) { |
164
|
|
|
$promises[] = $this->transport->publish( |
165
|
|
|
$subscription->getId(), |
166
|
|
|
new Message( |
167
|
|
|
(string) Uuid::uuid4(), |
168
|
|
|
\json_encode($subscription, \JSON_THROW_ON_ERROR), |
169
|
|
|
true |
170
|
|
|
) |
171
|
|
|
); |
172
|
|
|
} |
173
|
|
|
|
174
|
|
|
return all($promises); |
175
|
|
|
} |
176
|
|
|
); |
177
|
|
|
} |
178
|
|
|
|
179
|
|
|
public function getActiveSubscriptions(?string $topic, ?string $subscriber): PromiseInterface |
180
|
|
|
{ |
181
|
|
|
return $this->storage->findSubscriptions($topic, $subscriber); |
182
|
|
|
} |
183
|
|
|
|
184
|
|
|
public function getLastEventID(): PromiseInterface |
185
|
|
|
{ |
186
|
|
|
return $this->storage->getLastEventID(); |
187
|
|
|
} |
188
|
|
|
|
189
|
|
|
public function fetchMissedMessages(?string $lastEventID, array $subscribedTopics): PromiseInterface |
190
|
|
|
{ |
191
|
|
|
if (null === $lastEventID) { |
192
|
|
|
return resolve([]); |
193
|
|
|
} |
194
|
|
|
|
195
|
|
|
return $this->storage->retrieveMessagesAfterID($lastEventID, $subscribedTopics); |
196
|
|
|
} |
197
|
|
|
|
198
|
|
|
private function createSocketConnection(string $localAddress, LoopInterface $loop): Socket\Server |
199
|
|
|
{ |
200
|
|
|
$socket = new Socket\Server($localAddress, $loop); |
201
|
|
|
$socket->on('connection', function (ConnectionInterface $connection) use ($localAddress) { |
202
|
|
|
$this->metricsHandler->incrementUsers($localAddress); |
203
|
|
|
$connection->on('close', fn() => $this->handleClosingConnection($connection, $localAddress)); |
204
|
|
|
}); |
205
|
|
|
|
206
|
|
|
return $socket; |
207
|
|
|
} |
208
|
|
|
|
209
|
|
|
private function handleClosingConnection(ConnectionInterface $connection, string $localAddress): PromiseInterface |
210
|
|
|
{ |
211
|
|
|
$this->metricsHandler->decrementUsers($localAddress); |
212
|
|
|
[$remoteHost, $remotePort] = u($connection->getRemoteAddress())->after('//')->split(':'); |
|
|
|
|
213
|
|
|
$subscriber = get_client_id((string) $remoteHost, (int) (string) $remotePort); |
214
|
|
|
return $this->storage->findSubscriptions($subscriber) |
215
|
|
|
->then(fn(iterable $subscriptions) => $this->dispatchUnsubscriptions($subscriptions)); |
216
|
|
|
} |
217
|
|
|
|
218
|
|
|
/** |
219
|
|
|
* @param Subscription[] $subscriptions |
220
|
|
|
*/ |
221
|
|
|
private function dispatchUnsubscriptions(iterable $subscriptions): PromiseInterface |
222
|
|
|
{ |
223
|
|
|
$promises = []; |
224
|
|
|
$subscriptions = \iterable_to_array($subscriptions); |
225
|
|
|
foreach ($subscriptions as $subscription) { |
226
|
|
|
$subscription->setActive(false); |
227
|
|
|
$message = new Message( |
228
|
|
|
(string) Uuid::uuid4(), |
229
|
|
|
\json_encode($subscription, \JSON_PRETTY_PRINT | JSON_THROW_ON_ERROR), |
230
|
|
|
true, |
231
|
|
|
); |
232
|
|
|
$topic = $subscription->getId(); |
233
|
|
|
$promises[] = $this->transport->publish($topic, $message); |
234
|
|
|
} |
235
|
|
|
|
236
|
|
|
$promises[] = $this->storage->removeSubscriptions($subscriptions); |
237
|
|
|
|
238
|
|
|
return all($promises); |
239
|
|
|
} |
240
|
|
|
|
241
|
|
|
private function serve(string $localAddress, Socket\Server $socket, LoopInterface $loop): void |
242
|
|
|
{ |
243
|
|
|
$server = new Http\Server($loop, $this); |
244
|
|
|
$server->listen($socket); |
245
|
|
|
|
246
|
|
|
$this->logger()->info("Server running at http://" . $localAddress); |
247
|
|
|
$loop->run(); |
248
|
|
|
} |
249
|
|
|
|
250
|
|
|
public function getShutdownSignal(): ?int |
251
|
|
|
{ |
252
|
|
|
return $this->shutdownSignal; |
253
|
|
|
} |
254
|
|
|
|
255
|
|
|
private function stop(int $signal, LoopInterface $loop): void |
256
|
|
|
{ |
257
|
|
|
$this->shutdownSignal = $signal; |
258
|
|
|
$loop->futureTick(function () use ($loop) { |
259
|
|
|
$loop->stop(); |
260
|
|
|
}); |
261
|
|
|
} |
262
|
|
|
} |
263
|
|
|
|
This property has been deprecated. The supplier of the class has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.