1 | <?php |
||||
2 | |||||
3 | namespace BeyondCode\LaravelWebSockets\ChannelManagers; |
||||
4 | |||||
5 | use BeyondCode\LaravelWebSockets\DashboardLogger; |
||||
6 | use BeyondCode\LaravelWebSockets\Helpers; |
||||
7 | use BeyondCode\LaravelWebSockets\Server\MockableConnection; |
||||
8 | use Carbon\Carbon; |
||||
9 | use Clue\React\Redis\Client; |
||||
10 | use Clue\React\Redis\Factory; |
||||
11 | use Illuminate\Cache\RedisLock; |
||||
12 | use Illuminate\Support\Facades\Redis; |
||||
13 | use Illuminate\Support\Str; |
||||
14 | use Ratchet\ConnectionInterface; |
||||
15 | use React\EventLoop\LoopInterface; |
||||
16 | use React\Promise\PromiseInterface; |
||||
17 | use stdClass; |
||||
18 | |||||
19 | class RedisChannelManager extends LocalChannelManager |
||||
20 | { |
||||
21 | /** |
||||
22 | * The running loop. |
||||
23 | * |
||||
24 | * @var LoopInterface |
||||
25 | */ |
||||
26 | protected $loop; |
||||
27 | |||||
28 | /** |
||||
29 | * The pub client. |
||||
30 | * |
||||
31 | * @var Client |
||||
32 | */ |
||||
33 | protected $publishClient; |
||||
34 | |||||
35 | /** |
||||
36 | * The sub client. |
||||
37 | * |
||||
38 | * @var Client |
||||
39 | */ |
||||
40 | protected $subscribeClient; |
||||
41 | |||||
42 | /** |
||||
43 | * The Redis manager instance. |
||||
44 | * |
||||
45 | * @var \Illuminate\Redis\RedisManager |
||||
46 | */ |
||||
47 | protected $redis; |
||||
48 | |||||
49 | /** |
||||
50 | * Create a new channel manager instance. |
||||
51 | * |
||||
52 | * @param LoopInterface $loop |
||||
53 | * @param string|null $factoryClass |
||||
54 | * @return void |
||||
55 | */ |
||||
56 | public function __construct(LoopInterface $loop, $factoryClass = null) |
||||
57 | { |
||||
58 | parent::__construct($loop, $factoryClass); |
||||
59 | |||||
60 | $this->loop = $loop; |
||||
61 | |||||
62 | $this->redis = Redis::connection( |
||||
0 ignored issues
–
show
|
|||||
63 | config('websockets.replication.modes.redis.connection', 'default') |
||||
64 | ); |
||||
65 | |||||
66 | $connectionUri = $this->getConnectionUri(); |
||||
67 | |||||
68 | $factoryClass = $factoryClass ?: Factory::class; |
||||
69 | $factory = new $factoryClass($this->loop); |
||||
70 | |||||
71 | $this->publishClient = $factory->createLazyClient($connectionUri); |
||||
72 | $this->subscribeClient = $factory->createLazyClient($connectionUri); |
||||
73 | |||||
74 | $this->subscribeClient->on('message', function ($channel, $payload) { |
||||
75 | $this->onMessage($channel, $payload); |
||||
76 | }); |
||||
77 | } |
||||
78 | |||||
79 | /** |
||||
80 | * Get all channels for a specific app |
||||
81 | * across multiple servers. |
||||
82 | * |
||||
83 | * @param string|int $appId |
||||
84 | * @return \React\Promise\PromiseInterface[array] |
||||
0 ignored issues
–
show
|
|||||
85 | */ |
||||
86 | public function getGlobalChannels($appId): PromiseInterface |
||||
87 | { |
||||
88 | return $this->publishClient->smembers( |
||||
89 | $this->getChannelsRedisHash($appId) |
||||
90 | ); |
||||
91 | } |
||||
92 | |||||
93 | /** |
||||
94 | * Remove connection from all channels. |
||||
95 | * |
||||
96 | * @param \Ratchet\ConnectionInterface $connection |
||||
97 | * @return PromiseInterface[bool] |
||||
0 ignored issues
–
show
|
|||||
98 | */ |
||||
99 | public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface |
||||
100 | { |
||||
101 | return $this->getGlobalChannels($connection->app->id) |
||||
0 ignored issues
–
show
|
|||||
102 | ->then(function ($channels) use ($connection) { |
||||
103 | foreach ($channels as $channel) { |
||||
104 | $this->unsubscribeFromChannel($connection, $channel, new stdClass); |
||||
105 | } |
||||
106 | }) |
||||
107 | ->then(function () use ($connection) { |
||||
108 | return parent::unsubscribeFromAllChannels($connection); |
||||
109 | }); |
||||
110 | } |
||||
111 | |||||
112 | /** |
||||
113 | * Subscribe the connection to a specific channel. |
||||
114 | * |
||||
115 | * @param \Ratchet\ConnectionInterface $connection |
||||
116 | * @param string $channelName |
||||
117 | * @param stdClass $payload |
||||
118 | * @return PromiseInterface[bool] |
||||
0 ignored issues
–
show
|
|||||
119 | */ |
||||
120 | public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface |
||||
121 | { |
||||
122 | return $this->subscribeToTopic($connection->app->id, $channelName) |
||||
0 ignored issues
–
show
|
|||||
123 | ->then(function () use ($connection) { |
||||
124 | return $this->addConnectionToSet($connection, Carbon::now()); |
||||
125 | }) |
||||
126 | ->then(function () use ($connection, $channelName) { |
||||
127 | return $this->addChannelToSet($connection->app->id, $channelName); |
||||
0 ignored issues
–
show
|
|||||
128 | }) |
||||
129 | ->then(function () use ($connection, $channelName) { |
||||
130 | return $this->incrementSubscriptionsCount($connection->app->id, $channelName, 1); |
||||
0 ignored issues
–
show
|
|||||
131 | }) |
||||
132 | ->then(function () use ($connection, $channelName, $payload) { |
||||
133 | return parent::subscribeToChannel($connection, $channelName, $payload); |
||||
134 | }); |
||||
135 | } |
||||
136 | |||||
137 | /** |
||||
138 | * Unsubscribe the connection from the channel. |
||||
139 | * |
||||
140 | * @param \Ratchet\ConnectionInterface $connection |
||||
141 | * @param string $channelName |
||||
142 | * @param stdClass $payload |
||||
143 | * @return PromiseInterface[bool] |
||||
0 ignored issues
–
show
|
|||||
144 | */ |
||||
145 | public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface |
||||
146 | { |
||||
147 | return parent::unsubscribeFromChannel($connection, $channelName, $payload) |
||||
148 | ->then(function () use ($connection, $channelName) { |
||||
149 | return $this->decrementSubscriptionsCount($connection->app->id, $channelName); |
||||
0 ignored issues
–
show
|
|||||
150 | }) |
||||
151 | ->then(function ($count) use ($connection, $channelName) { |
||||
152 | $this->removeConnectionFromSet($connection); |
||||
153 | // If the total connections count gets to 0 after unsubscribe, |
||||
154 | // try again to check & unsubscribe from the PubSub topic if needed. |
||||
155 | if ($count < 1) { |
||||
156 | $this->removeChannelFromSet($connection->app->id, $channelName); |
||||
0 ignored issues
–
show
|
|||||
157 | $this->unsubscribeFromTopic($connection->app->id, $channelName); |
||||
158 | } |
||||
159 | }); |
||||
160 | } |
||||
161 | |||||
162 | /** |
||||
163 | * Subscribe the connection to a specific channel, returning |
||||
164 | * a promise containing the amount of connections. |
||||
165 | * |
||||
166 | * @param string|int $appId |
||||
167 | * @return PromiseInterface[int] |
||||
0 ignored issues
–
show
|
|||||
168 | */ |
||||
169 | public function subscribeToApp($appId): PromiseInterface |
||||
170 | { |
||||
171 | return $this->subscribeToTopic($appId) |
||||
172 | ->then(function () use ($appId) { |
||||
173 | return $this->incrementSubscriptionsCount($appId); |
||||
174 | }); |
||||
175 | } |
||||
176 | |||||
177 | /** |
||||
178 | * Unsubscribe the connection from the channel, returning |
||||
179 | * a promise containing the amount of connections after decrement. |
||||
180 | * |
||||
181 | * @param string|int $appId |
||||
182 | * @return PromiseInterface[int] |
||||
0 ignored issues
–
show
|
|||||
183 | */ |
||||
184 | public function unsubscribeFromApp($appId): PromiseInterface |
||||
185 | { |
||||
186 | return $this->unsubscribeFromTopic($appId) |
||||
187 | ->then(function () use ($appId) { |
||||
188 | return $this->decrementSubscriptionsCount($appId); |
||||
189 | }); |
||||
190 | } |
||||
191 | |||||
192 | /** |
||||
193 | * Get the connections count |
||||
194 | * across multiple servers. |
||||
195 | * |
||||
196 | * @param string|int $appId |
||||
197 | * @param string|null $channelName |
||||
198 | * @return PromiseInterface[int] |
||||
0 ignored issues
–
show
|
|||||
199 | */ |
||||
200 | public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface |
||||
201 | { |
||||
202 | return $this->publishClient |
||||
203 | ->hget($this->getStatsRedisHash($appId, $channelName), 'connections') |
||||
204 | ->then(function ($count) { |
||||
205 | return is_null($count) ? 0 : (int) $count; |
||||
206 | }); |
||||
207 | } |
||||
208 | |||||
209 | /** |
||||
210 | * Broadcast the message across multiple servers. |
||||
211 | * |
||||
212 | * @param string|int $appId |
||||
213 | * @param string|null $socketId |
||||
214 | * @param string $channel |
||||
215 | * @param stdClass $payload |
||||
216 | * @param string|null $serverId |
||||
217 | * @return PromiseInterface[bool] |
||||
0 ignored issues
–
show
|
|||||
218 | */ |
||||
219 | public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface |
||||
220 | { |
||||
221 | $payload->appId = $appId; |
||||
222 | $payload->socketId = $socketId; |
||||
223 | $payload->serverId = $serverId ?: $this->getServerId(); |
||||
224 | |||||
225 | return $this->publishClient |
||||
226 | ->publish($this->getRedisTopicName($appId, $channel), json_encode($payload)) |
||||
227 | ->then(function () use ($appId, $socketId, $channel, $payload, $serverId) { |
||||
228 | return parent::broadcastAcrossServers($appId, $socketId, $channel, $payload, $serverId); |
||||
229 | }); |
||||
230 | } |
||||
231 | |||||
232 | /** |
||||
233 | * Handle the user when it joined a presence channel. |
||||
234 | * |
||||
235 | * @param \Ratchet\ConnectionInterface $connection |
||||
236 | * @param stdClass $user |
||||
237 | * @param string $channel |
||||
238 | * @param stdClass $payload |
||||
239 | * @return PromiseInterface |
||||
240 | */ |
||||
241 | public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload): PromiseInterface |
||||
242 | { |
||||
243 | return $this->storeUserData($connection->app->id, $channel, $connection->socketId, json_encode($user)) |
||||
0 ignored issues
–
show
|
|||||
244 | ->then(function () use ($connection, $channel, $user) { |
||||
245 | return $this->addUserSocket($connection->app->id, $channel, $user, $connection->socketId); |
||||
0 ignored issues
–
show
|
|||||
246 | }) |
||||
247 | ->then(function () use ($connection, $user, $channel, $payload) { |
||||
248 | return parent::userJoinedPresenceChannel($connection, $user, $channel, $payload); |
||||
249 | }); |
||||
250 | } |
||||
251 | |||||
252 | /** |
||||
253 | * Handle the user when it left a presence channel. |
||||
254 | * |
||||
255 | * @param \Ratchet\ConnectionInterface $connection |
||||
256 | * @param stdClass $user |
||||
257 | * @param string $channel |
||||
258 | * @param stdClass $payload |
||||
259 | * @return PromiseInterface[bool] |
||||
0 ignored issues
–
show
|
|||||
260 | */ |
||||
261 | public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel): PromiseInterface |
||||
262 | { |
||||
263 | return $this->removeUserData($connection->app->id, $channel, $connection->socketId) |
||||
0 ignored issues
–
show
|
|||||
264 | ->then(function () use ($connection, $channel, $user) { |
||||
265 | return $this->removeUserSocket($connection->app->id, $channel, $user, $connection->socketId); |
||||
0 ignored issues
–
show
|
|||||
266 | }) |
||||
267 | ->then(function () use ($connection, $user, $channel) { |
||||
268 | return parent::userLeftPresenceChannel($connection, $user, $channel); |
||||
269 | }); |
||||
270 | } |
||||
271 | |||||
272 | /** |
||||
273 | * Get the presence channel members. |
||||
274 | * |
||||
275 | * @param string|int $appId |
||||
276 | * @param string $channel |
||||
277 | * @return \React\Promise\PromiseInterface[array] |
||||
0 ignored issues
–
show
|
|||||
278 | */ |
||||
279 | public function getChannelMembers($appId, string $channel): PromiseInterface |
||||
280 | { |
||||
281 | return $this->publishClient |
||||
282 | ->hgetall($this->getUsersRedisHash($appId, $channel)) |
||||
283 | ->then(function ($list) { |
||||
284 | return collect(Helpers::redisListToArray($list))->map(function ($user) { |
||||
285 | return json_decode($user); |
||||
286 | })->unique('user_id')->toArray(); |
||||
287 | }); |
||||
288 | } |
||||
289 | |||||
290 | /** |
||||
291 | * Get a member from a presence channel based on connection. |
||||
292 | * |
||||
293 | * @param \Ratchet\ConnectionInterface $connection |
||||
294 | * @param string $channel |
||||
295 | * @return \React\Promise\PromiseInterface[null|array] |
||||
0 ignored issues
–
show
|
|||||
296 | */ |
||||
297 | public function getChannelMember(ConnectionInterface $connection, string $channel): PromiseInterface |
||||
298 | { |
||||
299 | return $this->publishClient->hget( |
||||
300 | $this->getUsersRedisHash($connection->app->id, $channel), $connection->socketId |
||||
0 ignored issues
–
show
|
|||||
301 | ); |
||||
302 | } |
||||
303 | |||||
304 | /** |
||||
305 | * Get the presence channels total members count. |
||||
306 | * |
||||
307 | * @param string|int $appId |
||||
308 | * @param array $channelNames |
||||
309 | * @return \React\Promise\PromiseInterface[array] |
||||
0 ignored issues
–
show
|
|||||
310 | */ |
||||
311 | public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface |
||||
312 | { |
||||
313 | $this->publishClient->multi(); |
||||
314 | |||||
315 | foreach ($channelNames as $channel) { |
||||
316 | $this->publishClient->hlen( |
||||
317 | $this->getUsersRedisHash($appId, $channel) |
||||
318 | ); |
||||
319 | } |
||||
320 | |||||
321 | return $this->publishClient->exec() |
||||
322 | ->then(function ($data) use ($channelNames) { |
||||
323 | return array_combine($channelNames, $data); |
||||
324 | }); |
||||
325 | } |
||||
326 | |||||
327 | /** |
||||
328 | * Get the socket IDs for a presence channel member. |
||||
329 | * |
||||
330 | * @param string|int $userId |
||||
331 | * @param string|int $appId |
||||
332 | * @param string $channelName |
||||
333 | * @return \React\Promise\PromiseInterface[array] |
||||
0 ignored issues
–
show
|
|||||
334 | */ |
||||
335 | public function getMemberSockets($userId, $appId, $channelName): PromiseInterface |
||||
336 | { |
||||
337 | return $this->publishClient->smembers( |
||||
338 | $this->getUserSocketsRedisHash($appId, $channelName, $userId) |
||||
339 | ); |
||||
340 | } |
||||
341 | |||||
342 | /** |
||||
343 | * Keep tracking the connections availability when they pong. |
||||
344 | * |
||||
345 | * @param \Ratchet\ConnectionInterface $connection |
||||
346 | * @return PromiseInterface[bool] |
||||
0 ignored issues
–
show
|
|||||
347 | */ |
||||
348 | public function connectionPonged(ConnectionInterface $connection): PromiseInterface |
||||
349 | { |
||||
350 | // This will update the score with the current timestamp. |
||||
351 | return $this->addConnectionToSet($connection, Carbon::now()) |
||||
352 | ->then(function () use ($connection) { |
||||
353 | $payload = [ |
||||
354 | 'socketId' => $connection->socketId, |
||||
0 ignored issues
–
show
|
|||||
355 | 'appId' => $connection->app->id, |
||||
0 ignored issues
–
show
|
|||||
356 | 'serverId' => $this->getServerId(), |
||||
357 | ]; |
||||
358 | |||||
359 | return $this->publishClient |
||||
360 | ->publish($this->getPongRedisHash($connection->app->id), json_encode($payload)); |
||||
361 | }) |
||||
362 | ->then(function () use ($connection) { |
||||
363 | return parent::connectionPonged($connection); |
||||
364 | }); |
||||
365 | } |
||||
366 | |||||
367 | /** |
||||
368 | * Remove the obsolete connections that didn't ponged in a while. |
||||
369 | * |
||||
370 | * @return PromiseInterface[bool] |
||||
0 ignored issues
–
show
|
|||||
371 | */ |
||||
372 | public function removeObsoleteConnections(): PromiseInterface |
||||
373 | { |
||||
374 | $lock = $this->lock(); |
||||
375 | try { |
||||
376 | $lock->get(function () { |
||||
377 | $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) |
||||
0 ignored issues
–
show
now()->subMinutes(2)->format('U') of type string is incompatible with the type integer expected by parameter $stop of BeyondCode\LaravelWebSoc...getConnectionsFromSet() .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
378 | ->then(function ($connections) { |
||||
379 | foreach ($connections as $socketId => $appId) { |
||||
380 | $connection = $this->fakeConnectionForApp($appId, $socketId); |
||||
381 | |||||
382 | $this->unsubscribeFromAllChannels($connection); |
||||
383 | } |
||||
384 | }); |
||||
385 | }); |
||||
386 | |||||
387 | return parent::removeObsoleteConnections(); |
||||
388 | } finally { |
||||
389 | optional($lock)->forceRelease(); |
||||
390 | } |
||||
391 | } |
||||
392 | |||||
393 | /** |
||||
394 | * Handle a message received from Redis on a specific channel. |
||||
395 | * |
||||
396 | * @param string $redisChannel |
||||
397 | * @param string $payload |
||||
398 | * @return void |
||||
399 | */ |
||||
400 | public function onMessage(string $redisChannel, string $payload) |
||||
401 | { |
||||
402 | $payload = json_decode($payload); |
||||
403 | |||||
404 | if (isset($payload->serverId) && $this->getServerId() === $payload->serverId) { |
||||
405 | return; |
||||
406 | } |
||||
407 | |||||
408 | if ($redisChannel == $this->getPongRedisHash($payload->appId)) { |
||||
409 | $connection = $this->fakeConnectionForApp($payload->appId, $payload->socketId); |
||||
410 | |||||
411 | return parent::connectionPonged($connection); |
||||
412 | } |
||||
413 | |||||
414 | $payload->channel = Str::after($redisChannel, "{$payload->appId}:"); |
||||
415 | |||||
416 | if (! $channel = $this->find($payload->appId, $payload->channel)) { |
||||
417 | return; |
||||
418 | } |
||||
419 | |||||
420 | $appId = $payload->appId ?? null; |
||||
421 | $socketId = $payload->socketId ?? null; |
||||
422 | $serverId = $payload->serverId ?? null; |
||||
423 | |||||
424 | DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_RECEIVED, [ |
||||
425 | 'fromServerId' => $serverId, |
||||
426 | 'fromSocketId' => $socketId, |
||||
427 | 'receiverServerId' => $this->getServerId(), |
||||
428 | 'channel' => $channel, |
||||
429 | 'payload' => $payload, |
||||
430 | ]); |
||||
431 | |||||
432 | unset($payload->socketId); |
||||
433 | unset($payload->serverId); |
||||
434 | unset($payload->appId); |
||||
435 | |||||
436 | $channel->broadcastLocallyToEveryoneExcept($payload, $socketId, $appId); |
||||
437 | } |
||||
438 | |||||
439 | public function find($appId, string $channel) |
||||
440 | { |
||||
441 | if (! $channelInstance = parent::find($appId, $channel)) { |
||||
0 ignored issues
–
show
|
|||||
442 | $class = $this->getChannelClassName($channel); |
||||
443 | $this->channels[$appId][$channel] = new $class($channel); |
||||
444 | } |
||||
445 | |||||
446 | return parent::find($appId, $channel); |
||||
447 | } |
||||
448 | |||||
449 | /** |
||||
450 | * Build the Redis connection URL from Laravel database config. |
||||
451 | * |
||||
452 | * @return string |
||||
453 | */ |
||||
454 | protected function getConnectionUri() |
||||
455 | { |
||||
456 | $name = config('websockets.replication.modes.redis.connection', 'default'); |
||||
457 | $config = config("database.redis.{$name}"); |
||||
458 | |||||
459 | $host = $config['host']; |
||||
460 | $port = $config['port'] ?: 6379; |
||||
461 | |||||
462 | $query = []; |
||||
463 | |||||
464 | if ($config['password']) { |
||||
465 | $query['password'] = $config['password']; |
||||
466 | } |
||||
467 | |||||
468 | if ($config['database']) { |
||||
469 | $query['db'] = $config['database']; |
||||
470 | } |
||||
471 | |||||
472 | $query = http_build_query($query); |
||||
473 | |||||
474 | return "redis://{$host}:{$port}".($query ? "?{$query}" : ''); |
||||
475 | } |
||||
476 | |||||
477 | /** |
||||
478 | * Get the Subscribe client instance. |
||||
479 | * |
||||
480 | * @return Client |
||||
481 | */ |
||||
482 | public function getSubscribeClient() |
||||
483 | { |
||||
484 | return $this->subscribeClient; |
||||
485 | } |
||||
486 | |||||
487 | /** |
||||
488 | * Get the Publish client instance. |
||||
489 | * |
||||
490 | * @return Client |
||||
491 | */ |
||||
492 | public function getPublishClient() |
||||
493 | { |
||||
494 | return $this->publishClient; |
||||
495 | } |
||||
496 | |||||
497 | /** |
||||
498 | * Get the Redis client used by other classes. |
||||
499 | * |
||||
500 | * @return Client |
||||
501 | */ |
||||
502 | public function getRedisClient() |
||||
503 | { |
||||
504 | return $this->getPublishClient(); |
||||
505 | } |
||||
506 | |||||
507 | /** |
||||
508 | * Increment the subscribed count number. |
||||
509 | * |
||||
510 | * @param string|int $appId |
||||
511 | * @param string|null $channel |
||||
512 | * @param int $increment |
||||
513 | * @return PromiseInterface[int] |
||||
0 ignored issues
–
show
|
|||||
514 | */ |
||||
515 | public function incrementSubscriptionsCount($appId, string $channel = null, int $increment = 1): PromiseInterface |
||||
516 | { |
||||
517 | return $this->publishClient->hincrby( |
||||
518 | $this->getStatsRedisHash($appId, $channel), 'connections', $increment |
||||
519 | ); |
||||
520 | } |
||||
521 | |||||
522 | /** |
||||
523 | * Decrement the subscribed count number. |
||||
524 | * |
||||
525 | * @param string|int $appId |
||||
526 | * @param string|null $channel |
||||
527 | * @param int $decrement |
||||
528 | * @return PromiseInterface[int] |
||||
0 ignored issues
–
show
|
|||||
529 | */ |
||||
530 | public function decrementSubscriptionsCount($appId, string $channel = null, int $increment = 1): PromiseInterface |
||||
531 | { |
||||
532 | return $this->incrementSubscriptionsCount($appId, $channel, $increment * -1); |
||||
533 | } |
||||
534 | |||||
535 | /** |
||||
536 | * Add the connection to the sorted list. |
||||
537 | * |
||||
538 | * @param \Ratchet\ConnectionInterface $connection |
||||
539 | * @param \DateTime|string|null $moment |
||||
540 | * @return PromiseInterface |
||||
541 | */ |
||||
542 | public function addConnectionToSet(ConnectionInterface $connection, $moment = null): PromiseInterface |
||||
543 | { |
||||
544 | $moment = $moment ? Carbon::parse($moment) : Carbon::now(); |
||||
545 | |||||
546 | return $this->publishClient->zadd( |
||||
547 | $this->getSocketsRedisHash(), |
||||
548 | $moment->format('U'), "{$connection->app->id}:{$connection->socketId}" |
||||
0 ignored issues
–
show
|
|||||
549 | ); |
||||
550 | } |
||||
551 | |||||
552 | /** |
||||
553 | * Remove the connection from the sorted list. |
||||
554 | * |
||||
555 | * @param \Ratchet\ConnectionInterface $connection |
||||
556 | * @return PromiseInterface |
||||
557 | */ |
||||
558 | public function removeConnectionFromSet(ConnectionInterface $connection): PromiseInterface |
||||
559 | { |
||||
560 | return $this->publishClient->zrem( |
||||
561 | $this->getSocketsRedisHash(), |
||||
562 | "{$connection->app->id}:{$connection->socketId}" |
||||
0 ignored issues
–
show
|
|||||
563 | ); |
||||
564 | } |
||||
565 | |||||
566 | /** |
||||
567 | * Get the connections from the sorted list, with last |
||||
568 | * connection between certain timestamps. |
||||
569 | * |
||||
570 | * @param int $start |
||||
571 | * @param int $stop |
||||
572 | * @param bool $strict |
||||
573 | * @return PromiseInterface[array] |
||||
0 ignored issues
–
show
|
|||||
574 | */ |
||||
575 | public function getConnectionsFromSet(int $start = 0, int $stop = 0, bool $strict = true): PromiseInterface |
||||
576 | { |
||||
577 | if ($strict) { |
||||
578 | $start = "({$start}"; |
||||
579 | $stop = "({$stop}"; |
||||
580 | } |
||||
581 | |||||
582 | return $this->publishClient |
||||
583 | ->zrangebyscore($this->getSocketsRedisHash(), $start, $stop) |
||||
584 | ->then(function ($list) { |
||||
585 | return collect($list)->mapWithKeys(function ($appWithSocket) { |
||||
586 | [$appId, $socketId] = explode(':', $appWithSocket); |
||||
587 | |||||
588 | return [$socketId => $appId]; |
||||
589 | })->toArray(); |
||||
590 | }); |
||||
591 | } |
||||
592 | |||||
593 | /** |
||||
594 | * Add a channel to the set list. |
||||
595 | * |
||||
596 | * @param string|int $appId |
||||
597 | * @param string $channel |
||||
598 | * @return PromiseInterface |
||||
599 | */ |
||||
600 | public function addChannelToSet($appId, string $channel): PromiseInterface |
||||
601 | { |
||||
602 | return $this->publishClient->sadd( |
||||
603 | $this->getChannelsRedisHash($appId), $channel |
||||
604 | ); |
||||
605 | } |
||||
606 | |||||
607 | /** |
||||
608 | * Remove a channel from the set list. |
||||
609 | * |
||||
610 | * @param string|int $appId |
||||
611 | * @param string $channel |
||||
612 | * @return PromiseInterface |
||||
613 | */ |
||||
614 | public function removeChannelFromSet($appId, string $channel): PromiseInterface |
||||
615 | { |
||||
616 | return $this->publishClient->srem( |
||||
617 | $this->getChannelsRedisHash($appId), $channel |
||||
618 | ); |
||||
619 | } |
||||
620 | |||||
621 | /** |
||||
622 | * Check if channel is on the list. |
||||
623 | * |
||||
624 | * @param string|int $appId |
||||
625 | * @param string $channel |
||||
626 | * @return PromiseInterface |
||||
627 | */ |
||||
628 | public function isChannelInSet($appId, string $channel): PromiseInterface |
||||
629 | { |
||||
630 | return $this->publishClient->sismember( |
||||
631 | $this->getChannelsRedisHash($appId), $channel |
||||
632 | ); |
||||
633 | } |
||||
634 | |||||
635 | /** |
||||
636 | * Set data for a topic. Might be used for the presence channels. |
||||
637 | * |
||||
638 | * @param string|int $appId |
||||
639 | * @param string|null $channel |
||||
640 | * @param string $key |
||||
641 | * @param string $data |
||||
642 | * @return PromiseInterface |
||||
643 | */ |
||||
644 | public function storeUserData($appId, string $channel = null, string $key, $data): PromiseInterface |
||||
645 | { |
||||
646 | return $this->publishClient->hset( |
||||
647 | $this->getUsersRedisHash($appId, $channel), $key, $data |
||||
648 | ); |
||||
649 | } |
||||
650 | |||||
651 | /** |
||||
652 | * Remove data for a topic. Might be used for the presence channels. |
||||
653 | * |
||||
654 | * @param string|int $appId |
||||
655 | * @param string|null $channel |
||||
656 | * @param string $key |
||||
657 | * @return PromiseInterface |
||||
658 | */ |
||||
659 | public function removeUserData($appId, string $channel = null, string $key): PromiseInterface |
||||
660 | { |
||||
661 | return $this->publishClient->hdel( |
||||
662 | $this->getUsersRedisHash($appId, $channel), $key |
||||
663 | ); |
||||
664 | } |
||||
665 | |||||
666 | /** |
||||
667 | * Subscribe to the topic for the app, or app and channel. |
||||
668 | * |
||||
669 | * @param string|int $appId |
||||
670 | * @param string|null $channel |
||||
671 | * @return PromiseInterface |
||||
672 | */ |
||||
673 | public function subscribeToTopic($appId, string $channel = null): PromiseInterface |
||||
674 | { |
||||
675 | $topic = $this->getRedisTopicName($appId, $channel); |
||||
676 | |||||
677 | DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [ |
||||
678 | 'serverId' => $this->getServerId(), |
||||
679 | 'pubsubTopic' => $topic, |
||||
680 | ]); |
||||
681 | |||||
682 | return $this->subscribeClient->subscribe($topic); |
||||
683 | } |
||||
684 | |||||
685 | /** |
||||
686 | * Unsubscribe from the topic for the app, or app and channel. |
||||
687 | * |
||||
688 | * @param string|int $appId |
||||
689 | * @param string|null $channel |
||||
690 | * @return PromiseInterface |
||||
691 | */ |
||||
692 | public function unsubscribeFromTopic($appId, string $channel = null): PromiseInterface |
||||
693 | { |
||||
694 | $topic = $this->getRedisTopicName($appId, $channel); |
||||
695 | |||||
696 | DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [ |
||||
697 | 'serverId' => $this->getServerId(), |
||||
698 | 'pubsubTopic' => $topic, |
||||
699 | ]); |
||||
700 | |||||
701 | return $this->subscribeClient->unsubscribe($topic); |
||||
702 | } |
||||
703 | |||||
704 | /** |
||||
705 | * Add the Presence Channel's User's Socket ID to a list. |
||||
706 | * |
||||
707 | * @param string|int $appId |
||||
708 | * @param string $channel |
||||
709 | * @param stdClass $user |
||||
710 | * @param string $socketId |
||||
711 | * @return PromiseInterface |
||||
712 | */ |
||||
713 | protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface |
||||
714 | { |
||||
715 | return $this->publishClient->sadd( |
||||
716 | $this->getUserSocketsRedisHash($appId, $channel, $user->user_id), $socketId |
||||
717 | ); |
||||
718 | } |
||||
719 | |||||
720 | /** |
||||
721 | * Remove the Presence Channel's User's Socket ID from the list. |
||||
722 | * |
||||
723 | * @param string|int $appId |
||||
724 | * @param string $channel |
||||
725 | * @param stdClass $user |
||||
726 | * @param string $socketId |
||||
727 | * @return PromiseInterface |
||||
728 | */ |
||||
729 | protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface |
||||
730 | { |
||||
731 | return $this->publishClient->srem( |
||||
732 | $this->getUserSocketsRedisHash($appId, $channel, $user->user_id), $socketId |
||||
733 | ); |
||||
734 | } |
||||
735 | |||||
736 | /** |
||||
737 | * Get the Redis Keyspace name to handle subscriptions |
||||
738 | * and other key-value sets. |
||||
739 | * |
||||
740 | * @param string|int|null $appId |
||||
741 | * @param string|null $channel |
||||
742 | * @return string |
||||
743 | */ |
||||
744 | public function getRedisKey($appId = null, string $channel = null, array $suffixes = []): string |
||||
745 | { |
||||
746 | $prefix = config('database.redis.options.prefix', null); |
||||
747 | |||||
748 | $hash = "{$prefix}{$appId}"; |
||||
749 | |||||
750 | if ($channel) { |
||||
751 | $suffixes = array_merge([$channel], $suffixes); |
||||
752 | } |
||||
753 | |||||
754 | $suffixes = implode(':', $suffixes); |
||||
755 | |||||
756 | if ($suffixes) { |
||||
757 | $hash .= ":{$suffixes}"; |
||||
758 | } |
||||
759 | |||||
760 | return $hash; |
||||
761 | } |
||||
762 | |||||
763 | /** |
||||
764 | * Get the pong Redis hash. |
||||
765 | * |
||||
766 | * @param string|int $appId |
||||
767 | */ |
||||
768 | public function getPongRedisHash($appId): string |
||||
769 | { |
||||
770 | return $this->getRedisKey($appId, null, ['pong']); |
||||
771 | } |
||||
772 | |||||
773 | /** |
||||
774 | * Get the statistics Redis hash. |
||||
775 | * |
||||
776 | * @param string|int $appId |
||||
777 | * @param string|null $channel |
||||
778 | * @return string |
||||
779 | */ |
||||
780 | public function getStatsRedisHash($appId, string $channel = null): string |
||||
781 | { |
||||
782 | return $this->getRedisKey($appId, $channel, ['stats']); |
||||
783 | } |
||||
784 | |||||
785 | /** |
||||
786 | * Get the sockets Redis hash used to store all sockets ids. |
||||
787 | * |
||||
788 | * @return string |
||||
789 | */ |
||||
790 | public function getSocketsRedisHash(): string |
||||
791 | { |
||||
792 | return $this->getRedisKey(null, null, ['sockets']); |
||||
793 | } |
||||
794 | |||||
795 | /** |
||||
796 | * Get the channels Redis hash for a specific app id, used |
||||
797 | * to store existing channels. |
||||
798 | * |
||||
799 | * @param string|int $appId |
||||
800 | * @return string |
||||
801 | */ |
||||
802 | public function getChannelsRedisHash($appId): string |
||||
803 | { |
||||
804 | return $this->getRedisKey($appId, null, ['channels']); |
||||
805 | } |
||||
806 | |||||
807 | /** |
||||
808 | * Get the Redis hash for storing presence channels users. |
||||
809 | * |
||||
810 | * @param string|int $appId |
||||
811 | * @param string|null $channel |
||||
812 | * @return string |
||||
813 | */ |
||||
814 | public function getUsersRedisHash($appId, string $channel = null): string |
||||
815 | { |
||||
816 | return $this->getRedisKey($appId, $channel, ['users']); |
||||
817 | } |
||||
818 | |||||
819 | /** |
||||
820 | * Get the Redis hash for storing socket ids |
||||
821 | * for a specific presence channels user. |
||||
822 | * |
||||
823 | * @param string|int $appId |
||||
824 | * @param string|null $channel |
||||
825 | * @param string|int|null $userId |
||||
826 | * @return string |
||||
827 | */ |
||||
828 | public function getUserSocketsRedisHash($appId, string $channel = null, $userId = null): string |
||||
829 | { |
||||
830 | return $this->getRedisKey($appId, $channel, [$userId, 'userSockets']); |
||||
831 | } |
||||
832 | |||||
833 | /** |
||||
834 | * Get the Redis topic name for PubSub |
||||
835 | * used to transfer info between servers. |
||||
836 | * |
||||
837 | * @param string|int $appId |
||||
838 | * @param string|null $channel |
||||
839 | * @return string |
||||
840 | */ |
||||
841 | public function getRedisTopicName($appId, string $channel = null): string |
||||
842 | { |
||||
843 | return $this->getRedisKey($appId, $channel); |
||||
844 | } |
||||
845 | |||||
846 | /** |
||||
847 | * Get a new RedisLock instance to avoid race conditions. |
||||
848 | * |
||||
849 | * @return \Illuminate\Cache\CacheLock |
||||
850 | */ |
||||
851 | protected function lock() |
||||
852 | { |
||||
853 | return new RedisLock($this->redis, static::$lockName, 0); |
||||
0 ignored issues
–
show
$this->redis of type Illuminate\Redis\RedisManager is incompatible with the type Illuminate\Redis\Connections\Connection expected by parameter $redis of Illuminate\Cache\RedisLock::__construct() .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
854 | } |
||||
855 | |||||
856 | /** |
||||
857 | * Create a fake connection for app that will mimick a connection |
||||
858 | * by app ID and Socket ID to be able to be passed to the methods |
||||
859 | * that accepts a connection class. |
||||
860 | * |
||||
861 | * @param string|int $appId |
||||
862 | * @param string $socketId |
||||
863 | * @return ConnectionInterface |
||||
864 | */ |
||||
865 | public function fakeConnectionForApp($appId, string $socketId) |
||||
866 | { |
||||
867 | return new MockableConnection($appId, $socketId); |
||||
868 | } |
||||
869 | } |
||||
870 |
Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.
Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..