1 | <?php declare(strict_types=1); |
||
2 | |||
3 | namespace ekinhbayar\GitAmp\Websocket; |
||
4 | |||
5 | use Amp\Delayed; |
||
6 | use Amp\Http\Server\HttpServer; |
||
7 | use Amp\Http\Server\Request; |
||
8 | use Amp\Http\Server\Response; |
||
9 | use Amp\Http\Status; |
||
10 | use Amp\Promise; |
||
11 | use Amp\Success; |
||
12 | use Amp\Websocket\Client; |
||
13 | use Amp\Websocket\Server\ClientHandler; |
||
14 | use Amp\Websocket\Server\Gateway; |
||
15 | use Amp\Websocket\Server\WebsocketServerObserver; |
||
16 | use ekinhbayar\GitAmp\Configuration; |
||
17 | use ekinhbayar\GitAmp\Provider\Listener; |
||
18 | use ekinhbayar\GitAmp\Response\Results; |
||
19 | use Psr\Log\LoggerInterface; |
||
20 | use function Amp\asyncCall; |
||
21 | use function Amp\call; |
||
22 | |||
23 | class Handler implements ClientHandler, WebsocketServerObserver |
||
24 | { |
||
25 | private Gateway $gateway; |
||
26 | |||
27 | private Listener $provider; |
||
28 | |||
29 | private Configuration $configuration; |
||
30 | |||
31 | private Results $lastEvents; |
||
32 | |||
33 | private LoggerInterface $logger; |
||
34 | |||
35 | 8 | public function __construct( |
|
36 | Listener $provider, |
||
37 | Configuration $configuration, |
||
38 | Results $results, |
||
39 | LoggerInterface $logger |
||
40 | ) { |
||
41 | 8 | $this->provider = $provider; |
|
42 | 8 | $this->configuration = $configuration; |
|
43 | 8 | $this->lastEvents = $results; |
|
44 | 8 | $this->logger = $logger; |
|
45 | 8 | } |
|
46 | |||
47 | 2 | public function handleHandshake(Gateway $gateway, Request $request, Response $response): Promise |
|
48 | { |
||
49 | 2 | if (!$this->configuration->websocketAddressExists($request->getHeader('origin'))) { |
|
0 ignored issues
–
show
Bug
introduced
by
![]() |
|||
50 | 1 | return $gateway->getErrorHandler()->handleError(Status::FORBIDDEN, 'Forbidden Origin', $request); |
|
51 | } |
||
52 | |||
53 | 1 | return new Success($response); |
|
54 | } |
||
55 | |||
56 | 2 | public function handleClient(Gateway $gateway, Client $client, Request $request, Response $response): Promise |
|
57 | { |
||
58 | 2 | return call(function () use ($gateway, $client, $request, $response) { |
|
0 ignored issues
–
show
|
|||
59 | 2 | $client->onClose(function (Client $client, int $code, string $reason) use ($gateway) { |
|
60 | yield $this->processDisconnectingClient($gateway, $client, $code, $reason); |
||
61 | 2 | }); |
|
62 | |||
63 | 2 | $this->logger->info( |
|
64 | 2 | \sprintf('Client %d connected. Total clients: %d', $client->getId(), count($gateway->getClients())), |
|
65 | ); |
||
66 | |||
67 | 2 | yield $this->sendConnectedUsersCount(\count($gateway->getClients())); |
|
68 | |||
69 | $client->send($this->lastEvents->jsonEncode()); |
||
70 | |||
71 | yield $client->receive(); |
||
72 | 2 | }); |
|
73 | } |
||
74 | |||
75 | private function processDisconnectingClient(Gateway $gateway, Client $client, int $code, string $reason): Promise |
||
76 | { |
||
77 | return call(function () use ($gateway, $client, $code, $reason) { |
||
78 | $this->logger->info( |
||
79 | \sprintf( |
||
80 | 'Client %d disconnected. Code: %d Reason: %s. Total clients: %d', |
||
81 | $client->getId(), |
||
82 | $code, |
||
83 | $reason, |
||
84 | count($gateway->getClients()), |
||
85 | ) |
||
86 | ); |
||
87 | |||
88 | yield $this->sendConnectedUsersCount(count($gateway->getClients())); |
||
89 | }); |
||
90 | } |
||
91 | |||
92 | 4 | private function emit(Results $events): Promise |
|
93 | { |
||
94 | 4 | if (!$events->hasEvents()) { |
|
95 | 2 | return new Success(); |
|
96 | } |
||
97 | |||
98 | 2 | $this->lastEvents = $events; |
|
99 | |||
100 | 2 | return $this->gateway->broadcast($events->jsonEncode()); |
|
101 | } |
||
102 | |||
103 | 2 | private function sendConnectedUsersCount(int $count): Promise |
|
104 | { |
||
105 | 2 | return $this->gateway->broadcast(\json_encode(['connectedUsers' => $count])); |
|
106 | } |
||
107 | |||
108 | 5 | public function onStart(HttpServer $server, Gateway $gateway): Promise |
|
109 | { |
||
110 | 5 | $this->gateway = $gateway; |
|
111 | |||
112 | 5 | asyncCall(function () { |
|
113 | 5 | while (true) { |
|
114 | 5 | $this->emit(yield $this->provider->listen()); |
|
115 | |||
116 | 4 | yield new Delayed(25000); |
|
117 | } |
||
118 | 5 | }); |
|
119 | |||
120 | 5 | return new Success(); |
|
121 | } |
||
122 | |||
123 | 1 | public function onStop(HttpServer $server, Gateway $gateway): Promise |
|
124 | { |
||
125 | 1 | return new Success(); |
|
126 | } |
||
127 | } |
||
128 |