Passed
Pull Request — master (#17)
by BENOIT
08:21
created

Hub   A

Complexity

Total Complexity 17

Size/Duplication

Total Lines 192
Duplicated Lines 0 %

Test Coverage

Coverage 24.49%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 95
dl 0
loc 192
ccs 12
cts 49
cp 0.2449
rs 10
c 1
b 0
f 0
wmc 17

13 Methods

Rating   Name   Duplication   Size   Complexity  
A stop() 0 5 1
A dispatchSubscriptions() 0 18 2
A subscribe() 0 16 2
A serve() 0 7 1
A __construct() 0 32 1
A handleClosingConnection() 0 7 1
A createSocketConnection() 0 9 1
A __invoke() 0 3 1
A run() 0 20 1
A dispatchUnsubscriptions() 0 17 2
A handle() 0 5 1
A getShutdownSignal() 0 3 1
A fetchMissedMessages() 0 7 2
1
<?php
2
3
namespace BenTools\MercurePHP\Hub;
4
5
use BenTools\MercurePHP\Configuration\Configuration;
6
use BenTools\MercurePHP\Controller\HealthController;
7
use BenTools\MercurePHP\Controller\PublishController;
8
use BenTools\MercurePHP\Controller\SubscribeController;
9
use BenTools\MercurePHP\Helpers\LoggerAwareTrait;
10
use BenTools\MercurePHP\Metrics\MetricsHandlerInterface;
11
use BenTools\MercurePHP\Model\Message;
12
use BenTools\MercurePHP\Model\Subscription;
13
use BenTools\MercurePHP\Security\Authenticator;
14
use BenTools\MercurePHP\Security\CORS;
15
use BenTools\MercurePHP\Security\TopicMatcher;
16
use BenTools\MercurePHP\Storage\StorageInterface;
17
use BenTools\MercurePHP\Transport\TransportInterface;
18
use Lcobucci\JWT\Token;
19
use Psr\Http\Message\ResponseInterface;
20
use Psr\Http\Message\ServerRequestInterface;
21
use Psr\Http\Server\RequestHandlerInterface;
22
use Psr\Log\LoggerInterface;
23
use Psr\Log\NullLogger;
24
use Ramsey\Uuid\Uuid;
25
use React\EventLoop\LoopInterface;
26
use React\Http;
27
use React\Promise\PromiseInterface;
28
use React\Socket;
29
use React\Socket\ConnectionInterface;
30
31 1
use function BenTools\MercurePHP\get_client_id;
32
use function React\Promise\all;
33
use function React\Promise\any;
34
use function React\Promise\resolve;
35
use function Symfony\Component\String\u;
36
37 1
final class Hub implements RequestHandlerInterface
38 1
{
39 1
    use LoggerAwareTrait;
40 1
41 1
    private array $config;
42 1
    private LoopInterface $loop;
43
    private TransportInterface $transport;
44
    private StorageInterface $storage;
45
    private RequestHandlerInterface $requestHandler;
46
    private CORS $cors;
47
    private MetricsHandlerInterface $metricsHandler;
48
    private ?int $shutdownSignal;
49
50
    public function __construct(
51
        array $config,
52
        LoopInterface $loop,
53
        TransportInterface $transport,
54
        StorageInterface $storage,
55
        MetricsHandlerInterface $metricsHandler,
56
        ?LoggerInterface $logger = null
57
    ) {
58
        $this->config = $config;
59
        $this->loop = $loop;
60
        $this->transport = $transport;
61
        $this->storage = $storage;
62
        $this->metricsHandler = $metricsHandler;
63
        $this->logger = $logger ?? new NullLogger();
0 ignored issues
show
Deprecated Code introduced by
The property BenTools\MercurePHP\Help...ggerAwareTrait::$logger has been deprecated: - Please call $this->logger() instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

63
        /** @scrutinizer ignore-deprecated */ $this->logger = $logger ?? new NullLogger();

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
64
        $this->cors = new CORS($config);
65
66
        $subscriberAuthenticator = Authenticator::createSubscriberAuthenticator($this->config);
67
        $publisherAuthenticator = Authenticator::createPublisherAuthenticator($this->config);
68
69 4
        $controllers = [
70
            new HealthController(),
71 4
            new SubscribeController(
72 4
                $this->config,
73 4
                $this,
74
                $subscriberAuthenticator,
75
                $this->loop,
76
                $this->logger()
77 4
            ),
78
            new PublishController($storage, $transport, $publisherAuthenticator, $this->logger()),
79 4
        ];
80
81
        $this->requestHandler = new RequestHandler($controllers);
82
    }
83
84
    public function run(): void
85
    {
86
        $localAddress = $this->config[Configuration::ADDR];
87
        $this->shutdownSignal = null;
88
        $this->metricsHandler->resetUsers($localAddress);
89
        $this->loop->addSignal(SIGINT, function ($signal) {
90
            $this->stop($signal, $this->loop);
91
        });
92
        $this->loop->addPeriodicTimer(
93
            15,
94
            fn() => $this->metricsHandler->getNbUsers()->then(
95
                function (int $nbUsers) {
96
                    $memory = \memory_get_usage(true) / 1024 / 1024;
97
                    $this->logger()->debug("Users: {$nbUsers} - Memory: {$memory}MB");
98
                }
99
            )
100
        );
101
102
        $socket = $this->createSocketConnection($localAddress, $this->loop);
103
        $this->serve($localAddress, $socket, $this->loop);
104
    }
105
106
    public function handle(ServerRequestInterface $request): ResponseInterface
107
    {
108
        return $this->cors->decorateResponse(
109
            $request,
110
            $this->requestHandler->handle($request)
111
        );
112
    }
113
114
    public function __invoke(ServerRequestInterface $request): PromiseInterface
115
    {
116
        return resolve($this->handle($request));
117
    }
118
119
    public function subscribe(
120
        string $subscriber,
121
        string $topicSelector,
122
        ?Token $token,
123
        callable $callback
124
    ): PromiseInterface {
125
        $allowAnonymous = $this->config[Configuration::ALLOW_ANONYMOUS];
126
127
        if (!TopicMatcher::canSubscribeToTopic($topicSelector, $token, $allowAnonymous)) {
128
            $this->logger()->debug("Client {$subscriber} cannot subscribe to {$topicSelector}");
129
130
            return resolve($topicSelector);
131
        }
132
133
        $this->logger()->debug("Client {$subscriber} subscribed to {$topicSelector}");
134
        return $this->transport->subscribe($topicSelector, $callback);
135
    }
136
137
    public function dispatchSubscriptions(array $subscriptions): PromiseInterface
138
    {
139
        return $this->storage->storeSubscriptions($subscriptions)
140
            ->then(
141
                function () use ($subscriptions) {
142
                    $promises = [];
143
                    foreach ($subscriptions as $subscription) {
144
                        $promises[] = $this->transport->publish(
145
                            $subscription->getId(),
146
                            new Message(
147
                                (string) Uuid::uuid4(),
148
                                \json_encode($subscription, \JSON_THROW_ON_ERROR),
149
                                true
150
                            )
151
                        );
152
                    }
153
154
                    return all($promises);
155
                }
156
            );
157
    }
158
159
    public function fetchMissedMessages(?string $lastEventID, array $subscribedTopics): PromiseInterface
160
    {
161
        if (null === $lastEventID) {
162
            return resolve([]);
163
        }
164
165
        return $this->storage->retrieveMessagesAfterId($lastEventID, $subscribedTopics);
166
    }
167
168
    private function createSocketConnection(string $localAddress, LoopInterface $loop): Socket\Server
169
    {
170
        $socket = new Socket\Server($localAddress, $loop);
171
        $socket->on('connection', function (ConnectionInterface $connection) use ($localAddress) {
172
            $this->metricsHandler->incrementUsers($localAddress);
173
            $connection->on('close', fn() => $this->handleClosingConnection($connection, $localAddress));
174
        });
175
176
        return $socket;
177
    }
178
179
    private function handleClosingConnection(ConnectionInterface $connection, string $localAddress): PromiseInterface
180
    {
181
        $this->metricsHandler->decrementUsers($localAddress);
182
        [$remoteHost, $remotePort] = u($connection->getRemoteAddress())->after('//')->split(':');
0 ignored issues
show
Bug introduced by
It seems like $connection->getRemoteAddress() can also be of type null; however, parameter $string of Symfony\Component\String\u() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

182
        [$remoteHost, $remotePort] = u(/** @scrutinizer ignore-type */ $connection->getRemoteAddress())->after('//')->split(':');
Loading history...
183
        $subscriber = get_client_id((string) $remoteHost, (int) (string) $remotePort);
184
        return $this->storage->findSubscriptionsBySubscriber($subscriber)
185
            ->then(fn(iterable $subscriptions) => $this->dispatchUnsubscriptions($subscriptions));
186
    }
187
188
    /**
189
     * @param Subscription[] $subscriptions
190
     */
191
    private function dispatchUnsubscriptions(iterable $subscriptions): PromiseInterface
192
    {
193
        $promises = [];
194
        foreach ($subscriptions as $subscription) {
195
            $subscription->setActive(false);
196
            $message = new Message(
197
                (string) Uuid::uuid4(),
198
                \json_encode($subscription, \JSON_PRETTY_PRINT | JSON_THROW_ON_ERROR),
199
                true,
200
            );
201
            $topic = $subscription->getId();
202
            $promises[] = $this->transport->publish($topic, $message);
203
        }
204
205
        $promises[] = $this->storage->removeSubscriptions($subscriptions);
206
207
        return any($promises);
208
    }
209
210
    private function serve(string $localAddress, Socket\Server $socket, LoopInterface $loop): void
211
    {
212
        $server = new Http\Server($loop, $this);
213
        $server->listen($socket);
214
215
        $this->logger()->info("Server running at http://" . $localAddress);
216
        $loop->run();
217
    }
218
219
    public function getShutdownSignal(): ?int
220
    {
221
        return $this->shutdownSignal;
222
    }
223
224
    private function stop(int $signal, LoopInterface $loop): void
225
    {
226
        $this->shutdownSignal = $signal;
227
        $loop->futureTick(function () use ($loop) {
228
            $loop->stop();
229
        });
230
    }
231
}
232