Passed
Pull Request — master (#17)
by BENOIT
02:04
created

Hub   A

Complexity

Total Complexity 21

Size/Duplication

Total Lines 216
Duplicated Lines 0 %

Test Coverage

Coverage 22.92%

Importance

Changes 3
Bugs 0 Features 0
Metric Value
eloc 103
c 3
b 0
f 0
dl 0
loc 216
rs 10
ccs 11
cts 48
cp 0.2292
wmc 21

16 Methods

Rating   Name   Duplication   Size   Complexity  
A stop() 0 5 1
A dispatchSubscriptions() 0 18 2
A hook() 0 3 1
A subscribe() 0 16 2
A serve() 0 7 1
A __construct() 0 35 2
A handleClosingConnection() 0 7 1
A createSocketConnection() 0 9 1
A __invoke() 0 3 1
A run() 0 20 1
A getActiveSubscriptions() 0 3 1
A dispatchUnsubscriptions() 0 17 2
A handle() 0 5 1
A getShutdownSignal() 0 3 1
A fetchMissedMessages() 0 7 2
A publish() 0 7 1
1
<?php
2
3
namespace BenTools\MercurePHP\Hub;
4
5
use BenTools\MercurePHP\Configuration\Configuration;
6
use BenTools\MercurePHP\Controller\HealthController;
7
use BenTools\MercurePHP\Controller\PublishController;
8
use BenTools\MercurePHP\Controller\SubscribeController;
9
use BenTools\MercurePHP\Controller\SubscriptionsController;
10
use BenTools\MercurePHP\Helpers\LoggerAwareTrait;
11
use BenTools\MercurePHP\Metrics\MetricsHandlerInterface;
12
use BenTools\MercurePHP\Model\Message;
13
use BenTools\MercurePHP\Model\Subscription;
14
use BenTools\MercurePHP\Security\Authenticator;
15
use BenTools\MercurePHP\Security\CORS;
16
use BenTools\MercurePHP\Security\TopicMatcher;
17
use BenTools\MercurePHP\Storage\StorageInterface;
18
use BenTools\MercurePHP\Transport\TransportInterface;
19
use Lcobucci\JWT\Token;
20
use Psr\Http\Message\ResponseInterface;
21
use Psr\Http\Message\ServerRequestInterface;
22
use Psr\Http\Server\RequestHandlerInterface;
23
use Psr\Log\LoggerInterface;
24
use Psr\Log\NullLogger;
25
use Ramsey\Uuid\Uuid;
26
use React\EventLoop\LoopInterface;
27
use React\Http;
28
use React\Promise\PromiseInterface;
29
use React\Socket;
30
use React\Socket\ConnectionInterface;
31 1
32
use function BenTools\MercurePHP\get_client_id;
33
use function React\Promise\all;
34
use function React\Promise\any;
35
use function React\Promise\resolve;
36
use function Symfony\Component\String\u;
37 1
38 1
final class Hub implements RequestHandlerInterface
39 1
{
40 1
    use LoggerAwareTrait;
41 1
42 1
    private array $config;
43
    private LoopInterface $loop;
44
    private TransportInterface $transport;
45
    private StorageInterface $storage;
46
    private RequestHandlerInterface $requestHandler;
47
    private CORS $cors;
48
    private MetricsHandlerInterface $metricsHandler;
49
    private ?int $shutdownSignal;
50
51
    public function __construct(
52
        array $config,
53
        LoopInterface $loop,
54
        TransportInterface $transport,
55
        StorageInterface $storage,
56
        MetricsHandlerInterface $metricsHandler,
57
        ?LoggerInterface $logger = null
58
    ) {
59
        $this->config = $config;
60
        $this->loop = $loop;
61
        $this->transport = $transport;
62
        $this->storage = $storage;
63
        $this->metricsHandler = $metricsHandler;
64
        $this->logger = $logger ?? new NullLogger();
0 ignored issues
show
Deprecated Code introduced by
The property BenTools\MercurePHP\Help...ggerAwareTrait::$logger has been deprecated: - Please call $this->logger() instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

64
        /** @scrutinizer ignore-deprecated */ $this->logger = $logger ?? new NullLogger();

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
65
        $this->cors = new CORS($config);
66
67
        $subscriberAuthenticator = Authenticator::createSubscriberAuthenticator($this->config);
68
        $publisherAuthenticator = Authenticator::createPublisherAuthenticator($this->config);
69 4
70
        $controllers = [
71 4
            new HealthController(),
72 4
            new SubscribeController(
73 4
                $this->config,
74
                $this,
75
                $subscriberAuthenticator,
76
                $this->logger()
77 4
            ),
78
            new PublishController($this, $publisherAuthenticator),
79 4
        ];
80
81
        if (true === $config[Configuration::SUBSCRIPTIONS]) {
82
            $controllers[] = new SubscriptionsController($this, $subscriberAuthenticator);
83
        }
84
85
        $this->requestHandler = new RequestHandler($controllers);
86
    }
87
88
    public function run(): void
89
    {
90
        $localAddress = $this->config[Configuration::ADDR];
91
        $this->shutdownSignal = null;
92
        $this->metricsHandler->resetUsers($localAddress);
93
        $this->loop->addSignal(SIGINT, function ($signal) {
94
            $this->stop($signal, $this->loop);
95
        });
96
        $this->loop->addPeriodicTimer(
97
            15,
98
            fn() => $this->metricsHandler->getNbUsers()->then(
99
                function (int $nbUsers) {
100
                    $memory = \memory_get_usage(true) / 1024 / 1024;
101
                    $this->logger()->debug("Users: {$nbUsers} - Memory: {$memory}MB");
102
                }
103
            )
104
        );
105
106
        $socket = $this->createSocketConnection($localAddress, $this->loop);
107
        $this->serve($localAddress, $socket, $this->loop);
108
    }
109
110
    public function hook(callable $callback): void
111
    {
112
        $this->loop->futureTick($callback);
113
    }
114
115
    public function handle(ServerRequestInterface $request): ResponseInterface
116
    {
117
        return $this->cors->decorateResponse(
118
            $request,
119
            $this->requestHandler->handle($request)
120
        );
121
    }
122
123
    public function __invoke(ServerRequestInterface $request): PromiseInterface
124
    {
125
        return resolve($this->handle($request));
126
    }
127
128
    public function subscribe(
129
        string $subscriber,
130
        string $topicSelector,
131
        ?Token $token,
132
        callable $callback
133
    ): PromiseInterface {
134
        $allowAnonymous = $this->config[Configuration::ALLOW_ANONYMOUS];
135
136
        if (!TopicMatcher::canSubscribeToTopic($topicSelector, $token, $allowAnonymous)) {
137
            $this->logger()->debug("Client {$subscriber} cannot subscribe to {$topicSelector}");
138
139
            return resolve($topicSelector);
140
        }
141
142
        $this->logger()->debug("Client {$subscriber} subscribed to {$topicSelector}");
143
        return $this->transport->subscribe($topicSelector, $callback);
144
    }
145
146
    public function publish(string $topic, Message $message): PromiseInterface
147
    {
148
        return $this->transport->publish($topic, $message)
149
            ->then(fn() => $this->storage->storeMessage($topic, $message))
150
            ->then(
151
                function () use ($topic, $message) {
152
                    $this->logger()->debug(\sprintf('Created message %s on topic %s', $message->getId(), $topic));
153
                }
154
            );
155
    }
156
157
    public function dispatchSubscriptions(array $subscriptions): PromiseInterface
158
    {
159
        return $this->storage->storeSubscriptions($subscriptions)
160
            ->then(
161
                function () use ($subscriptions) {
162
                    $promises = [];
163
                    foreach ($subscriptions as $subscription) {
164
                        $promises[] = $this->transport->publish(
165
                            $subscription->getId(),
166
                            new Message(
167
                                (string) Uuid::uuid4(),
168
                                \json_encode($subscription, \JSON_THROW_ON_ERROR),
169
                                true
170
                            )
171
                        );
172
                    }
173
174
                    return all($promises);
175
                }
176
            );
177
    }
178
179
    public function getActiveSubscriptions(?string $subscriber, ?string $topic): PromiseInterface
180
    {
181
        return $this->storage->findSubscriptions($subscriber, $topic);
182
    }
183
184
    public function fetchMissedMessages(?string $lastEventID, array $subscribedTopics): PromiseInterface
185
    {
186
        if (null === $lastEventID) {
187
            return resolve([]);
188
        }
189
190
        return $this->storage->retrieveMessagesAfterId($lastEventID, $subscribedTopics);
191
    }
192
193
    private function createSocketConnection(string $localAddress, LoopInterface $loop): Socket\Server
194
    {
195
        $socket = new Socket\Server($localAddress, $loop);
196
        $socket->on('connection', function (ConnectionInterface $connection) use ($localAddress) {
197
            $this->metricsHandler->incrementUsers($localAddress);
198
            $connection->on('close', fn() => $this->handleClosingConnection($connection, $localAddress));
199
        });
200
201
        return $socket;
202
    }
203
204
    private function handleClosingConnection(ConnectionInterface $connection, string $localAddress): PromiseInterface
205
    {
206
        $this->metricsHandler->decrementUsers($localAddress);
207
        [$remoteHost, $remotePort] = u($connection->getRemoteAddress())->after('//')->split(':');
0 ignored issues
show
Bug introduced by
It seems like $connection->getRemoteAddress() can also be of type null; however, parameter $string of Symfony\Component\String\u() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

207
        [$remoteHost, $remotePort] = u(/** @scrutinizer ignore-type */ $connection->getRemoteAddress())->after('//')->split(':');
Loading history...
208
        $subscriber = get_client_id((string) $remoteHost, (int) (string) $remotePort);
209
        return $this->storage->findSubscriptions($subscriber)
210
            ->then(fn(iterable $subscriptions) => $this->dispatchUnsubscriptions($subscriptions));
211
    }
212
213
    /**
214
     * @param Subscription[] $subscriptions
215
     */
216
    private function dispatchUnsubscriptions(iterable $subscriptions): PromiseInterface
217
    {
218
        $promises = [];
219
        foreach ($subscriptions as $subscription) {
220
            $subscription->setActive(false);
221
            $message = new Message(
222
                (string) Uuid::uuid4(),
223
                \json_encode($subscription, \JSON_PRETTY_PRINT | JSON_THROW_ON_ERROR),
224
                true,
225
            );
226
            $topic = $subscription->getId();
227
            $promises[] = $this->transport->publish($topic, $message);
228
        }
229
230
        $promises[] = $this->storage->removeSubscriptions($subscriptions);
231
232
        return all($promises);
233
    }
234
235
    private function serve(string $localAddress, Socket\Server $socket, LoopInterface $loop): void
236
    {
237
        $server = new Http\Server($loop, $this);
238
        $server->listen($socket);
239
240
        $this->logger()->info("Server running at http://" . $localAddress);
241
        $loop->run();
242
    }
243
244
    public function getShutdownSignal(): ?int
245
    {
246
        return $this->shutdownSignal;
247
    }
248
249
    private function stop(int $signal, LoopInterface $loop): void
250
    {
251
        $this->shutdownSignal = $signal;
252
        $loop->futureTick(function () use ($loop) {
253
            $loop->stop();
254
        });
255
    }
256
}
257