Passed
Pull Request — master (#17)
by BENOIT
03:48
created

Hub::dispatchUnsubscriptions()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 17
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
eloc 11
nc 2
nop 1
dl 0
loc 17
ccs 0
cts 0
cp 0
crap 6
rs 9.9
c 0
b 0
f 0
1
<?php
2
3
namespace BenTools\MercurePHP\Hub;
4
5
use BenTools\MercurePHP\Configuration\Configuration;
6
use BenTools\MercurePHP\Controller\HealthController;
7
use BenTools\MercurePHP\Controller\PublishController;
8
use BenTools\MercurePHP\Controller\SubscribeController;
9
use BenTools\MercurePHP\Helpers\LoggerAwareTrait;
10
use BenTools\MercurePHP\Metrics\MetricsHandlerInterface;
11
use BenTools\MercurePHP\Model\Message;
12
use BenTools\MercurePHP\Model\Subscription;
13
use BenTools\MercurePHP\Security\Authenticator;
14
use BenTools\MercurePHP\Security\CORS;
15
use BenTools\MercurePHP\Security\TopicMatcher;
16
use BenTools\MercurePHP\Storage\StorageInterface;
17
use BenTools\MercurePHP\Transport\TransportInterface;
18
use Lcobucci\JWT\Token;
19
use Psr\Http\Message\ResponseInterface;
20
use Psr\Http\Message\ServerRequestInterface;
21
use Psr\Http\Server\RequestHandlerInterface;
22
use Psr\Log\LoggerInterface;
23
use Psr\Log\NullLogger;
24
use Ramsey\Uuid\Uuid;
25
use React\EventLoop\LoopInterface;
26
use React\Http;
27
use React\Promise\PromiseInterface;
28
use React\Socket;
29
use React\Socket\ConnectionInterface;
30
31 1
use function BenTools\MercurePHP\get_client_id;
32
use function React\Promise\all;
33
use function React\Promise\any;
34
use function React\Promise\resolve;
35
use function Symfony\Component\String\u;
36
37 1
final class Hub implements RequestHandlerInterface
38 1
{
39 1
    use LoggerAwareTrait;
40 1
41 1
    private array $config;
42 1
    private LoopInterface $loop;
43
    private TransportInterface $transport;
44
    private StorageInterface $storage;
45
    private RequestHandlerInterface $requestHandler;
46
    private CORS $cors;
47
    private MetricsHandlerInterface $metricsHandler;
48
    private ?int $shutdownSignal;
49
50
    public function __construct(
51
        array $config,
52
        LoopInterface $loop,
53
        TransportInterface $transport,
54
        StorageInterface $storage,
55
        MetricsHandlerInterface $metricsHandler,
56
        ?LoggerInterface $logger = null
57
    ) {
58
        $this->config = $config;
59
        $this->loop = $loop;
60
        $this->transport = $transport;
61
        $this->storage = $storage;
62
        $this->metricsHandler = $metricsHandler;
63
        $this->logger = $logger ?? new NullLogger();
0 ignored issues
show
Deprecated Code introduced by
The property BenTools\MercurePHP\Help...ggerAwareTrait::$logger has been deprecated: - Please call $this->logger() instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

63
        /** @scrutinizer ignore-deprecated */ $this->logger = $logger ?? new NullLogger();

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
64
        $this->cors = new CORS($config);
65
66
        $subscriberAuthenticator = Authenticator::createSubscriberAuthenticator($this->config);
67
        $publisherAuthenticator = Authenticator::createPublisherAuthenticator($this->config);
68
69 4
        $controllers = [
70
            new HealthController(),
71 4
            new SubscribeController(
72 4
                $this->config,
73 4
                $this,
74
                $subscriberAuthenticator,
75
                $this->logger()
76
            ),
77 4
            new PublishController($this, $publisherAuthenticator),
78
        ];
79 4
80
        $this->requestHandler = new RequestHandler($controllers);
81
    }
82
83
    public function run(): void
84
    {
85
        $localAddress = $this->config[Configuration::ADDR];
86
        $this->shutdownSignal = null;
87
        $this->metricsHandler->resetUsers($localAddress);
88
        $this->loop->addSignal(SIGINT, function ($signal) {
89
            $this->stop($signal, $this->loop);
90
        });
91
        $this->loop->addPeriodicTimer(
92
            15,
93
            fn() => $this->metricsHandler->getNbUsers()->then(
94
                function (int $nbUsers) {
95
                    $memory = \memory_get_usage(true) / 1024 / 1024;
96
                    $this->logger()->debug("Users: {$nbUsers} - Memory: {$memory}MB");
97
                }
98
            )
99
        );
100
101
        $socket = $this->createSocketConnection($localAddress, $this->loop);
102
        $this->serve($localAddress, $socket, $this->loop);
103
    }
104
105
    public function hook(callable $callback): void
106
    {
107
        $this->loop->futureTick($callback);
108
    }
109
110
    public function handle(ServerRequestInterface $request): ResponseInterface
111
    {
112
        return $this->cors->decorateResponse(
113
            $request,
114
            $this->requestHandler->handle($request)
115
        );
116
    }
117
118
    public function __invoke(ServerRequestInterface $request): PromiseInterface
119
    {
120
        return resolve($this->handle($request));
121
    }
122
123
    public function subscribe(
124
        string $subscriber,
125
        string $topicSelector,
126
        ?Token $token,
127
        callable $callback
128
    ): PromiseInterface {
129
        $allowAnonymous = $this->config[Configuration::ALLOW_ANONYMOUS];
130
131
        if (!TopicMatcher::canSubscribeToTopic($topicSelector, $token, $allowAnonymous)) {
132
            $this->logger()->debug("Client {$subscriber} cannot subscribe to {$topicSelector}");
133
134
            return resolve($topicSelector);
135
        }
136
137
        $this->logger()->debug("Client {$subscriber} subscribed to {$topicSelector}");
138
        return $this->transport->subscribe($topicSelector, $callback);
139
    }
140
141
    public function publish(string $topic, Message $message): PromiseInterface
142
    {
143
        return $this->transport->publish($topic, $message)
144
            ->then(fn() => $this->storage->storeMessage($topic, $message))
145
            ->then(
146
                function () use ($topic, $message) {
147
                    $this->logger()->debug(\sprintf('Created message %s on topic %s', $message->getId(), $topic));
148
                }
149
            );
150
    }
151
152
    public function dispatchSubscriptions(array $subscriptions): PromiseInterface
153
    {
154
        return $this->storage->storeSubscriptions($subscriptions)
155
            ->then(
156
                function () use ($subscriptions) {
157
                    $promises = [];
158
                    foreach ($subscriptions as $subscription) {
159
                        $promises[] = $this->transport->publish(
160
                            $subscription->getId(),
161
                            new Message(
162
                                (string) Uuid::uuid4(),
163
                                \json_encode($subscription, \JSON_THROW_ON_ERROR),
164
                                true
165
                            )
166
                        );
167
                    }
168
169
                    return all($promises);
170
                }
171
            );
172
    }
173
174
    public function fetchMissedMessages(?string $lastEventID, array $subscribedTopics): PromiseInterface
175
    {
176
        if (null === $lastEventID) {
177
            return resolve([]);
178
        }
179
180
        return $this->storage->retrieveMessagesAfterId($lastEventID, $subscribedTopics);
181
    }
182
183
    private function createSocketConnection(string $localAddress, LoopInterface $loop): Socket\Server
184
    {
185
        $socket = new Socket\Server($localAddress, $loop);
186
        $socket->on('connection', function (ConnectionInterface $connection) use ($localAddress) {
187
            $this->metricsHandler->incrementUsers($localAddress);
188
            $connection->on('close', fn() => $this->handleClosingConnection($connection, $localAddress));
189
        });
190
191
        return $socket;
192
    }
193
194
    private function handleClosingConnection(ConnectionInterface $connection, string $localAddress): PromiseInterface
195
    {
196
        $this->metricsHandler->decrementUsers($localAddress);
197
        [$remoteHost, $remotePort] = u($connection->getRemoteAddress())->after('//')->split(':');
0 ignored issues
show
Bug introduced by
It seems like $connection->getRemoteAddress() can also be of type null; however, parameter $string of Symfony\Component\String\u() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

197
        [$remoteHost, $remotePort] = u(/** @scrutinizer ignore-type */ $connection->getRemoteAddress())->after('//')->split(':');
Loading history...
198
        $subscriber = get_client_id((string) $remoteHost, (int) (string) $remotePort);
199
        return $this->storage->findSubscriptionsBySubscriber($subscriber)
200
            ->then(fn(iterable $subscriptions) => $this->dispatchUnsubscriptions($subscriptions));
201
    }
202
203
    /**
204
     * @param Subscription[] $subscriptions
205
     */
206
    private function dispatchUnsubscriptions(iterable $subscriptions): PromiseInterface
207
    {
208
        $promises = [];
209
        foreach ($subscriptions as $subscription) {
210
            $subscription->setActive(false);
211
            $message = new Message(
212
                (string) Uuid::uuid4(),
213
                \json_encode($subscription, \JSON_PRETTY_PRINT | JSON_THROW_ON_ERROR),
214
                true,
215
            );
216
            $topic = $subscription->getId();
217
            $promises[] = $this->transport->publish($topic, $message);
218
        }
219
220
        $promises[] = $this->storage->removeSubscriptions($subscriptions);
221
222
        return any($promises);
223
    }
224
225
    private function serve(string $localAddress, Socket\Server $socket, LoopInterface $loop): void
226
    {
227
        $server = new Http\Server($loop, $this);
228
        $server->listen($socket);
229
230
        $this->logger()->info("Server running at http://" . $localAddress);
231
        $loop->run();
232
    }
233
234
    public function getShutdownSignal(): ?int
235
    {
236
        return $this->shutdownSignal;
237
    }
238
239
    private function stop(int $signal, LoopInterface $loop): void
240
    {
241
        $this->shutdownSignal = $signal;
242
        $loop->futureTick(function () use ($loop) {
243
            $loop->stop();
244
        });
245
    }
246
}
247