Passed
Pull Request — master (#17)
by BENOIT
02:04
created

Hub::getLastEventID()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 0
dl 0
loc 3
rs 10
c 0
b 0
f 0
ccs 0
cts 0
cp 0
crap 2
1
<?php
2
3
namespace BenTools\MercurePHP\Hub;
4
5
use BenTools\MercurePHP\Configuration\Configuration;
6
use BenTools\MercurePHP\Controller\HealthController;
7
use BenTools\MercurePHP\Controller\PublishController;
8
use BenTools\MercurePHP\Controller\SubscribeController;
9
use BenTools\MercurePHP\Controller\SubscriptionsController;
10
use BenTools\MercurePHP\Helpers\LoggerAwareTrait;
11
use BenTools\MercurePHP\Metrics\MetricsHandlerInterface;
12
use BenTools\MercurePHP\Model\Message;
13
use BenTools\MercurePHP\Model\Subscription;
14
use BenTools\MercurePHP\Security\Authenticator;
15
use BenTools\MercurePHP\Security\CORS;
16
use BenTools\MercurePHP\Security\TopicMatcher;
17
use BenTools\MercurePHP\Storage\StorageInterface;
18
use BenTools\MercurePHP\Transport\TransportInterface;
19
use Lcobucci\JWT\Token;
20
use Psr\Http\Message\ResponseInterface;
21
use Psr\Http\Message\ServerRequestInterface;
22
use Psr\Http\Server\RequestHandlerInterface;
23
use Psr\Log\LoggerInterface;
24
use Psr\Log\NullLogger;
25
use Ramsey\Uuid\Uuid;
26
use React\EventLoop\LoopInterface;
27
use React\Http;
28
use React\Promise\PromiseInterface;
29
use React\Socket;
30
use React\Socket\ConnectionInterface;
31 1
32
use function BenTools\MercurePHP\get_client_id;
33
use function React\Promise\all;
34
use function React\Promise\any;
35
use function React\Promise\resolve;
36
use function Symfony\Component\String\u;
37 1
38 1
final class Hub implements RequestHandlerInterface
39 1
{
40 1
    use LoggerAwareTrait;
41 1
42 1
    private array $config;
43
    private LoopInterface $loop;
44
    private TransportInterface $transport;
45
    private StorageInterface $storage;
46
    private RequestHandlerInterface $requestHandler;
47
    private CORS $cors;
48
    private MetricsHandlerInterface $metricsHandler;
49
    private ?int $shutdownSignal;
50
51
    public function __construct(
52
        array $config,
53
        LoopInterface $loop,
54
        TransportInterface $transport,
55
        StorageInterface $storage,
56
        MetricsHandlerInterface $metricsHandler,
57
        ?LoggerInterface $logger = null
58
    ) {
59
        $this->config = $config;
60
        $this->loop = $loop;
61
        $this->transport = $transport;
62
        $this->storage = $storage;
63
        $this->metricsHandler = $metricsHandler;
64
        $this->logger = $logger ?? new NullLogger();
0 ignored issues
show
Deprecated Code introduced by
The property BenTools\MercurePHP\Help...ggerAwareTrait::$logger has been deprecated: - Please call $this->logger() instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

64
        /** @scrutinizer ignore-deprecated */ $this->logger = $logger ?? new NullLogger();

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
65
        $this->cors = new CORS($config);
66
67
        $subscriberAuthenticator = Authenticator::createSubscriberAuthenticator($this->config);
68
        $publisherAuthenticator = Authenticator::createPublisherAuthenticator($this->config);
69 4
70
        $controllers = [
71 4
            new HealthController(),
72 4
            new SubscribeController(
73 4
                $this->config,
74
                $this,
75
                $subscriberAuthenticator,
76
                $this->logger()
77 4
            ),
78
            new PublishController($this, $publisherAuthenticator),
79 4
        ];
80
81
        if (true === $config[Configuration::SUBSCRIPTIONS]) {
82
            $controllers[] = new SubscriptionsController($this, $subscriberAuthenticator);
83
        }
84
85
        $this->requestHandler = new RequestHandler($controllers);
86
    }
87
88
    public function run(): void
89
    {
90
        $localAddress = $this->config[Configuration::ADDR];
91
        $this->shutdownSignal = null;
92
        $this->metricsHandler->resetUsers($localAddress);
93
        $this->loop->addSignal(SIGINT, function ($signal) {
94
            $this->stop($signal, $this->loop);
95
        });
96
        $this->loop->addPeriodicTimer(
97
            15,
98
            fn() => $this->metricsHandler->getNbUsers()->then(
99
                function (int $nbUsers) {
100
                    $memory = \memory_get_usage(true) / 1024 / 1024;
101
                    $this->logger()->debug("Users: {$nbUsers} - Memory: {$memory}MB");
102
                }
103
            )
104
        );
105
106
        $socket = $this->createSocketConnection($localAddress, $this->loop);
107
        $this->serve($localAddress, $socket, $this->loop);
108
    }
109
110
    public function hook(callable $callback): void
111
    {
112
        $this->loop->futureTick($callback);
113
    }
114
115
    public function handle(ServerRequestInterface $request): ResponseInterface
116
    {
117
        return $this->cors->decorateResponse(
118
            $request,
119
            $this->requestHandler->handle($request)
120
        );
121
    }
122
123
    public function __invoke(ServerRequestInterface $request): PromiseInterface
124
    {
125
        return resolve($this->handle($request));
126
    }
127
128
    public function subscribe(
129
        string $subscriber,
130
        string $topicSelector,
131
        ?Token $token,
132
        callable $callback
133
    ): PromiseInterface {
134
        $allowAnonymous = $this->config[Configuration::ALLOW_ANONYMOUS];
135
136
        if (!TopicMatcher::canSubscribeToTopic($topicSelector, $token, $allowAnonymous)) {
137
            $this->logger()->debug("Client {$subscriber} cannot subscribe to {$topicSelector}");
138
139
            return resolve($topicSelector);
140
        }
141
142
        $this->logger()->debug("Client {$subscriber} subscribed to {$topicSelector}");
143
        return $this->transport->subscribe($topicSelector, $callback);
144
    }
145
146
    public function publish(string $topic, Message $message): PromiseInterface
147
    {
148
        return $this->transport->publish($topic, $message)
149
            ->then(fn() => $this->storage->storeMessage($topic, $message))
150
            ->then(
151
                function () use ($topic, $message) {
152
                    $this->logger()->debug(\sprintf('Created message %s on topic %s', $message->getId(), $topic));
153
                }
154
            );
155
    }
156
157
    public function dispatchSubscriptions(array $subscriptions): PromiseInterface
158
    {
159
        return $this->storage->storeSubscriptions($subscriptions)
160
            ->then(
161
                function () use ($subscriptions) {
162
                    $promises = [];
163
                    foreach ($subscriptions as $subscription) {
164
                        $promises[] = $this->transport->publish(
165
                            $subscription->getId(),
166
                            new Message(
167
                                (string) Uuid::uuid4(),
168
                                \json_encode($subscription, \JSON_THROW_ON_ERROR),
169
                                true
170
                            )
171
                        );
172
                    }
173
174
                    return all($promises);
175
                }
176
            );
177
    }
178
179
    public function getActiveSubscriptions(?string $topic, ?string $subscriber): PromiseInterface
180
    {
181
        return $this->storage->findSubscriptions($topic, $subscriber);
182
    }
183
184
    public function getLastEventID(): PromiseInterface
185
    {
186
        return $this->storage->getLastEventID();
187
    }
188
189
    public function fetchMissedMessages(?string $lastEventID, array $subscribedTopics): PromiseInterface
190
    {
191
        if (null === $lastEventID) {
192
            return resolve([]);
193
        }
194
195
        return $this->storage->retrieveMessagesAfterID($lastEventID, $subscribedTopics);
196
    }
197
198
    private function createSocketConnection(string $localAddress, LoopInterface $loop): Socket\Server
199
    {
200
        $socket = new Socket\Server($localAddress, $loop);
201
        $socket->on('connection', function (ConnectionInterface $connection) use ($localAddress) {
202
            $this->metricsHandler->incrementUsers($localAddress);
203
            $connection->on('close', fn() => $this->handleClosingConnection($connection, $localAddress));
204
        });
205
206
        return $socket;
207
    }
208
209
    private function handleClosingConnection(ConnectionInterface $connection, string $localAddress): PromiseInterface
210
    {
211
        $this->metricsHandler->decrementUsers($localAddress);
212
        [$remoteHost, $remotePort] = u($connection->getRemoteAddress())->after('//')->split(':');
0 ignored issues
show
Bug introduced by
It seems like $connection->getRemoteAddress() can also be of type null; however, parameter $string of Symfony\Component\String\u() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

212
        [$remoteHost, $remotePort] = u(/** @scrutinizer ignore-type */ $connection->getRemoteAddress())->after('//')->split(':');
Loading history...
213
        $subscriber = get_client_id((string) $remoteHost, (int) (string) $remotePort);
214
        return $this->storage->findSubscriptions($subscriber)
215
            ->then(fn(iterable $subscriptions) => $this->dispatchUnsubscriptions($subscriptions));
216
    }
217
218
    /**
219
     * @param Subscription[] $subscriptions
220
     */
221
    private function dispatchUnsubscriptions(iterable $subscriptions): PromiseInterface
222
    {
223
        $promises = [];
224
        $subscriptions = \iterable_to_array($subscriptions);
225
        foreach ($subscriptions as $subscription) {
226
            $subscription->setActive(false);
227
            $message = new Message(
228
                (string) Uuid::uuid4(),
229
                \json_encode($subscription, \JSON_PRETTY_PRINT | JSON_THROW_ON_ERROR),
230
                true,
231
            );
232
            $topic = $subscription->getId();
233
            $promises[] = $this->transport->publish($topic, $message);
234
        }
235
236
        $promises[] = $this->storage->removeSubscriptions($subscriptions);
237
238
        return all($promises);
239
    }
240
241
    private function serve(string $localAddress, Socket\Server $socket, LoopInterface $loop): void
242
    {
243
        $server = new Http\Server($loop, $this);
244
        $server->listen($socket);
245
246
        $this->logger()->info("Server running at http://" . $localAddress);
247
        $loop->run();
248
    }
249
250
    public function getShutdownSignal(): ?int
251
    {
252
        return $this->shutdownSignal;
253
    }
254
255
    private function stop(int $signal, LoopInterface $loop): void
256
    {
257
        $this->shutdownSignal = $signal;
258
        $loop->futureTick(function () use ($loop) {
259
            $loop->stop();
260
        });
261
    }
262
}
263