Hub   A
last analyzed

Complexity

Total Complexity 7

Size/Duplication

Total Lines 83
Duplicated Lines 0 %

Test Coverage

Coverage 30.95%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 42
c 1
b 0
f 0
dl 0
loc 83
ccs 13
cts 42
cp 0.3095
rs 10
wmc 7

7 Methods

Rating   Name   Duplication   Size   Complexity  
A stop() 0 5 1
A serve() 0 7 1
A createSocketConnection() 0 9 1
A getShutdownSignal() 0 3 1
A __construct() 0 13 1
A __invoke() 0 4 1
A run() 0 20 1
1
<?php
2
3
namespace BenTools\MercurePHP\Hub;
4
5
use BenTools\MercurePHP\Configuration\Configuration;
6
use BenTools\MercurePHP\Metrics\MetricsHandlerInterface;
7
use BenTools\MercurePHP\Security\CORS;
8
use Psr\Http\Message\ResponseInterface;
9
use Psr\Http\Message\ServerRequestInterface;
10
use Psr\Log\LoggerInterface;
11
use React\EventLoop\LoopInterface;
12
use React\Http;
13
use React\Promise\PromiseInterface;
14
use React\Socket;
15
use React\Socket\ConnectionInterface;
16
17
final class Hub implements HubInterface
18
{
19
    private array $config;
20
    private LoopInterface $loop;
21
    private LoggerInterface $logger;
22
    private RequestHandler $requestHandler;
23
    private CORS $cors;
24
    private MetricsHandlerInterface $metricsHandler;
25
    private ?int $shutdownSignal;
26
27
    public function __construct(
28
        array $config,
29
        LoopInterface $loop,
30
        RequestHandler $requestHandler,
31 1
        MetricsHandlerInterface $metricsHandler,
32
        LoggerInterface $logger
33
    ) {
34
        $this->config = $config;
35
        $this->loop = $loop;
36
        $this->requestHandler = $requestHandler;
37 1
        $this->metricsHandler = $metricsHandler;
38 1
        $this->logger = $logger;
39 1
        $this->cors = new CORS($config);
40 1
    }
41 1
42 1
    public function run(): void
43
    {
44
        $localAddress = $this->config[Configuration::ADDR];
45
        $this->shutdownSignal = null;
46
        $this->metricsHandler->resetUsers($localAddress);
47
        $this->loop->addSignal(SIGINT, function ($signal) {
48
            $this->stop($signal, $this->loop);
49
        });
50
        $this->loop->addPeriodicTimer(
51
            15,
52
            fn() => $this->metricsHandler->getNbUsers()->then(
53
                function (int $nbUsers) {
54
                    $memory = \memory_get_usage(true) / 1024 / 1024;
55
                    $this->logger->debug("Users: {$nbUsers} - Memory: {$memory}MB");
56
                }
57
            )
58
        );
59
60
        $socket = $this->createSocketConnection($localAddress, $this->loop);
61
        $this->serve($localAddress, $socket, $this->loop);
62
    }
63
64
    public function getShutdownSignal(): ?int
65
    {
66
        return $this->shutdownSignal;
67
    }
68
69 4
    public function __invoke(ServerRequestInterface $request): PromiseInterface
70
    {
71 4
        return $this->requestHandler->handle($request)
72 4
            ->then(fn(ResponseInterface $response) => $this->cors->decorateResponse($request, $response));
73 4
    }
74
75
    private function createSocketConnection(string $localAddress, LoopInterface $loop): Socket\Server
76
    {
77 4
        $socket = new Socket\Server($localAddress, $loop);
78
        $socket->on('connection', function (ConnectionInterface $connection) use ($localAddress) {
79 4
            $this->metricsHandler->incrementUsers($localAddress);
80
            $connection->on('close', fn() => $this->metricsHandler->decrementUsers($localAddress));
81
        });
82
83
        return $socket;
84
    }
85
86
    private function serve(string $localAddress, Socket\Server $socket, LoopInterface $loop): void
87
    {
88
        $server = new Http\Server($loop, $this);
89
        $server->listen($socket);
90
91
        $this->logger->info("Server running at http://" . $localAddress);
92
        $loop->run();
93
    }
94
95
    private function stop(int $signal, LoopInterface $loop): void
96
    {
97
        $this->shutdownSignal = $signal;
98
        $loop->futureTick(function () use ($loop) {
99
            $loop->stop();
100
        });
101
    }
102
}
103