1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace BenTools\MercurePHP\Hub; |
4
|
|
|
|
5
|
|
|
use BenTools\MercurePHP\Configuration\Configuration; |
6
|
|
|
use BenTools\MercurePHP\Helpers\LoggerAwareTrait; |
7
|
|
|
use BenTools\MercurePHP\Metrics\MetricsHandlerInterface; |
8
|
|
|
use BenTools\MercurePHP\Model\Message; |
9
|
|
|
use BenTools\MercurePHP\Model\Subscription; |
10
|
|
|
use BenTools\MercurePHP\Security\CORS; |
11
|
|
|
use BenTools\MercurePHP\Storage\StorageInterface; |
12
|
|
|
use BenTools\MercurePHP\Transport\TransportInterface; |
13
|
|
|
use Psr\Http\Message\ResponseInterface; |
14
|
|
|
use Psr\Http\Message\ServerRequestInterface; |
15
|
|
|
use Psr\Http\Server\RequestHandlerInterface; |
16
|
|
|
use Psr\Log\LoggerInterface; |
17
|
|
|
use Psr\Log\NullLogger; |
18
|
|
|
use Ramsey\Uuid\Uuid; |
19
|
|
|
use React\EventLoop\LoopInterface; |
20
|
|
|
use React\Http; |
21
|
|
|
use React\Promise\PromiseInterface; |
22
|
|
|
use React\Socket; |
23
|
|
|
use React\Socket\ConnectionInterface; |
24
|
|
|
|
25
|
|
|
use function React\Promise\all; |
26
|
|
|
use function React\Promise\resolve; |
27
|
|
|
|
28
|
|
|
final class Hub implements RequestHandlerInterface |
29
|
|
|
{ |
30
|
|
|
use LoggerAwareTrait; |
31
|
1 |
|
|
32
|
|
|
private array $config; |
33
|
|
|
private LoopInterface $loop; |
34
|
|
|
private TransportInterface $transport; |
35
|
|
|
private StorageInterface $storage; |
36
|
|
|
private RequestHandlerInterface $requestHandler; |
37
|
1 |
|
private CORS $cors; |
38
|
1 |
|
private MetricsHandlerInterface $metricsHandler; |
39
|
1 |
|
private ?int $shutdownSignal; |
40
|
1 |
|
|
41
|
1 |
|
public function __construct( |
42
|
1 |
|
array $config, |
43
|
|
|
LoopInterface $loop, |
44
|
|
|
TransportInterface $transport, |
45
|
|
|
StorageInterface $storage, |
46
|
|
|
RequestHandlerInterface $requestHandler, |
47
|
|
|
MetricsHandlerInterface $metricsHandler, |
48
|
|
|
?LoggerInterface $logger = null |
49
|
|
|
) { |
50
|
|
|
$this->config = $config; |
51
|
|
|
$this->loop = $loop; |
52
|
|
|
$this->transport = $transport; |
53
|
|
|
$this->storage = $storage; |
54
|
|
|
$this->requestHandler = $requestHandler; |
55
|
|
|
$this->metricsHandler = $metricsHandler; |
56
|
|
|
$this->logger = $logger ?? new NullLogger(); |
|
|
|
|
57
|
|
|
$this->cors = new CORS($config); |
58
|
|
|
} |
59
|
|
|
|
60
|
|
|
public function run(): void |
61
|
|
|
{ |
62
|
|
|
$localAddress = $this->config[Configuration::ADDR]; |
63
|
|
|
$this->shutdownSignal = null; |
64
|
|
|
$this->metricsHandler->resetUsers($localAddress); |
65
|
|
|
$this->loop->addSignal(SIGINT, function ($signal) { |
66
|
|
|
$this->stop($signal, $this->loop); |
67
|
|
|
}); |
68
|
|
|
$this->loop->addPeriodicTimer( |
69
|
4 |
|
15, |
70
|
|
|
fn() => $this->metricsHandler->getNbUsers()->then( |
71
|
4 |
|
function (int $nbUsers) { |
72
|
4 |
|
$memory = \memory_get_usage(true) / 1024 / 1024; |
73
|
4 |
|
$this->logger()->debug("Users: {$nbUsers} - Memory: {$memory}MB"); |
74
|
|
|
} |
75
|
|
|
) |
76
|
|
|
); |
77
|
4 |
|
|
78
|
|
|
$socket = $this->createSocketConnection($localAddress, $this->loop); |
79
|
4 |
|
$this->serve($localAddress, $socket, $this->loop); |
80
|
|
|
} |
81
|
|
|
|
82
|
|
|
public function handle(ServerRequestInterface $request): ResponseInterface |
83
|
|
|
{ |
84
|
|
|
return $this->cors->decorateResponse( |
85
|
|
|
$request, |
86
|
|
|
$this->requestHandler->handle($request) |
87
|
|
|
); |
88
|
|
|
} |
89
|
|
|
|
90
|
|
|
public function dispatchSubscriptions(Subscription ...$subscriptions): PromiseInterface |
91
|
|
|
{ |
92
|
|
|
return $this->storage->storeSubscriptions(...$subscriptions) |
|
|
|
|
93
|
|
|
->then( |
94
|
|
|
function () use ($subscriptions) { |
95
|
|
|
$promises = []; |
96
|
|
|
foreach ($subscriptions as $subscription) { |
97
|
|
|
$promises[] = $this->transport->publish( |
98
|
|
|
$subscription->getId(), |
99
|
|
|
new Message( |
100
|
|
|
(string) Uuid::uuid4(), |
101
|
|
|
\json_encode($subscription, \JSON_THROW_ON_ERROR), |
102
|
|
|
true |
103
|
|
|
) |
104
|
|
|
); |
105
|
|
|
} |
106
|
|
|
|
107
|
|
|
return all($promises); |
108
|
|
|
} |
109
|
|
|
); |
110
|
|
|
} |
111
|
|
|
|
112
|
|
|
public function __invoke(ServerRequestInterface $request): PromiseInterface |
113
|
|
|
{ |
114
|
|
|
return resolve($this->handle($request)); |
115
|
|
|
} |
116
|
|
|
|
117
|
|
|
private function createSocketConnection(string $localAddress, LoopInterface $loop): Socket\Server |
118
|
|
|
{ |
119
|
|
|
$socket = new Socket\Server($localAddress, $loop); |
120
|
|
|
$socket->on('connection', function (ConnectionInterface $connection) use ($localAddress) { |
121
|
|
|
$this->metricsHandler->incrementUsers($localAddress); |
122
|
|
|
$connection->on('close', fn() => $this->metricsHandler->decrementUsers($localAddress)); |
123
|
|
|
}); |
124
|
|
|
|
125
|
|
|
return $socket; |
126
|
|
|
} |
127
|
|
|
|
128
|
|
|
private function handleClosingConnection(ConnectionInterface $connection, string $localAddress) |
|
|
|
|
129
|
|
|
{ |
130
|
|
|
//$this->metricsHandler->decrementUsers($localAddress) |
131
|
|
|
} |
132
|
|
|
|
133
|
|
|
private function serve(string $localAddress, Socket\Server $socket, LoopInterface $loop): void |
134
|
|
|
{ |
135
|
|
|
$server = new Http\Server($loop, $this); |
136
|
|
|
$server->listen($socket); |
137
|
|
|
|
138
|
|
|
$this->logger()->info("Server running at http://" . $localAddress); |
139
|
|
|
$loop->run(); |
140
|
|
|
} |
141
|
|
|
|
142
|
|
|
public function getShutdownSignal(): ?int |
143
|
|
|
{ |
144
|
|
|
return $this->shutdownSignal; |
145
|
|
|
} |
146
|
|
|
|
147
|
|
|
private function stop(int $signal, LoopInterface $loop): void |
148
|
|
|
{ |
149
|
|
|
$this->shutdownSignal = $signal; |
150
|
|
|
$loop->futureTick(function () use ($loop) { |
151
|
|
|
$loop->stop(); |
152
|
|
|
}); |
153
|
|
|
} |
154
|
|
|
} |
155
|
|
|
|
This property has been deprecated. The supplier of the class has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.