1 | <?php declare(strict_types=1); |
||
15 | use Amp\Websocket\Server\WebsocketServerObserver; |
||
16 | use ekinhbayar\GitAmp\Provider\Listener; |
||
17 | use ekinhbayar\GitAmp\Response\Results; |
||
18 | use Psr\Log\LoggerInterface; |
||
19 | use function Amp\asyncCall; |
||
20 | use function Amp\call; |
||
21 | |||
22 | class Handler implements ClientHandler, WebsocketServerObserver |
||
23 | { |
||
24 | private Gateway $gateway; |
||
|
|||
25 | |||
26 | private string $origin; |
||
27 | |||
28 | private Listener $provider; |
||
29 | |||
30 | private ?Results $lastEvents = null; |
||
31 | |||
32 | private LoggerInterface $logger; |
||
33 | |||
34 | 7 | public function __construct(string $origin, Listener $provider, LoggerInterface $logger) |
|
35 | { |
||
36 | 7 | $this->origin = $origin; |
|
37 | 7 | $this->provider = $provider; |
|
38 | 7 | $this->logger = $logger; |
|
39 | } |
||
40 | |||
41 | 2 | public function handleHandshake(Gateway $gateway, Request $request, Response $response): Promise |
|
42 | { |
||
43 | 2 | if ($request->getHeader('origin') !== $this->origin) { |
|
44 | 1 | return $gateway->getErrorHandler()->handleError(Status::FORBIDDEN, 'Forbidden Origin', $request); |
|
45 | } |
||
46 | |||
47 | 1 | return new Success($response); |
|
48 | } |
||
49 | |||
50 | 2 | public function handleClient(Gateway $gateway, Client $client, Request $request, Response $response): Promise |
|
51 | { |
||
52 | 2 | $client->onClose(function (Client $client, int $code, string $reason) use ($gateway) { |
|
53 | $this->logger->info( |
||
54 | \sprintf( |
||
55 | 'Client %d disconnected. Code: %d Reason: %s. Total clients: %d', |
||
56 | $client->getId(), |
||
57 | $code, |
||
58 | $reason, |
||
59 | count($gateway->getClients()), |
||
60 | ) |
||
61 | ); |
||
62 | |||
63 | $this->sendConnectedUsersCount(count($gateway->getClients())); |
||
64 | 2 | }); |
|
65 | |||
66 | 2 | $this->logger->info( |
|
67 | 2 | \sprintf('Client %d connected. Total clients: %d', $client->getId(), count($gateway->getClients())), |
|
68 | ); |
||
69 | |||
70 | 2 | $this->sendConnectedUsersCount(\count($gateway->getClients())); |
|
71 | |||
72 | 2 | if ($this->lastEvents) { |
|
73 | 1 | $client->send($this->lastEvents->jsonEncode()); |
|
74 | } |
||
75 | |||
76 | 2 | return call(function () use ($gateway, $client): \Generator { |
|
77 | 2 | while ($message = yield $client->receive()) { |
|
78 | // intentionally keep receiving, otherwise the connection closes instantly for some reason |
||
79 | } |
||
80 | 2 | }); |
|
81 | } |
||
82 | |||
83 | 4 | private function emit(Results $events): void |
|
84 | { |
||
85 | 4 | if (!$events->hasEvents()) { |
|
86 | 2 | return; |
|
87 | } |
||
88 | |||
89 | 2 | $this->lastEvents = $events; |
|
90 | |||
91 | 2 | $this->gateway->broadcast($events->jsonEncode()); |
|
92 | } |
||
93 | |||
94 | 2 | private function sendConnectedUsersCount(int $count): void |
|
95 | { |
||
96 | 2 | $this->gateway->broadcast(\json_encode(['connectedUsers' => $count])); |
|
97 | } |
||
98 | |||
99 | 4 | public function onStart(HttpServer $server, Gateway $gateway): Promise |
|
100 | { |
||
101 | 4 | $this->gateway = $gateway; |
|
102 | |||
103 | 4 | asyncCall(function () { |
|
104 | 4 | while (true) { |
|
105 | 4 | $this->emit(yield $this->provider->listen()); |
|
106 | |||
107 | 4 | yield new Delayed(25000); |
|
119 |