Total Complexity | 67 |
Total Lines | 849 |
Duplicated Lines | 0 % |
Changes | 13 | ||
Bugs | 0 | Features | 0 |
Complex classes like RedisChannelManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
While breaking up the class, it is a good idea to analyze how other classes use RedisChannelManager, and based on these observations, apply Extract Interface, too.
1 | <?php |
||
19 | class RedisChannelManager extends LocalChannelManager |
||
20 | { |
||
21 | /** |
||
22 | * The running loop. |
||
23 | * |
||
24 | * @var LoopInterface |
||
25 | */ |
||
26 | protected $loop; |
||
27 | |||
28 | /** |
||
29 | * The pub client. |
||
30 | * |
||
31 | * @var Client |
||
32 | */ |
||
33 | protected $publishClient; |
||
34 | |||
35 | /** |
||
36 | * The sub client. |
||
37 | * |
||
38 | * @var Client |
||
39 | */ |
||
40 | protected $subscribeClient; |
||
41 | |||
42 | /** |
||
43 | * The Redis manager instance. |
||
44 | * |
||
45 | * @var \Illuminate\Redis\RedisManager |
||
46 | */ |
||
47 | protected $redis; |
||
48 | |||
49 | /** |
||
50 | * Create a new channel manager instance. |
||
51 | * |
||
52 | * @param LoopInterface $loop |
||
53 | * @param string|null $factoryClass |
||
54 | * @return void |
||
55 | */ |
||
56 | public function __construct(LoopInterface $loop, $factoryClass = null) |
||
57 | { |
||
58 | parent::__construct($loop, $factoryClass); |
||
59 | |||
60 | $this->loop = $loop; |
||
61 | |||
62 | $this->redis = Redis::connection( |
||
|
|||
63 | config('websockets.replication.modes.redis.connection', 'default') |
||
64 | ); |
||
65 | |||
66 | $connectionUri = $this->getConnectionUri(); |
||
67 | |||
68 | $factoryClass = $factoryClass ?: Factory::class; |
||
69 | $factory = new $factoryClass($this->loop); |
||
70 | |||
71 | $this->publishClient = $factory->createLazyClient($connectionUri); |
||
72 | $this->subscribeClient = $factory->createLazyClient($connectionUri); |
||
73 | |||
74 | $this->subscribeClient->on('message', function ($channel, $payload) { |
||
75 | $this->onMessage($channel, $payload); |
||
76 | }); |
||
77 | } |
||
78 | |||
79 | /** |
||
80 | * Get all channels for a specific app |
||
81 | * across multiple servers. |
||
82 | * |
||
83 | * @param string|int $appId |
||
84 | * @return \React\Promise\PromiseInterface[array] |
||
85 | */ |
||
86 | public function getGlobalChannels($appId): PromiseInterface |
||
87 | { |
||
88 | return $this->publishClient->smembers( |
||
89 | $this->getChannelsRedisHash($appId) |
||
90 | ); |
||
91 | } |
||
92 | |||
93 | /** |
||
94 | * Remove connection from all channels. |
||
95 | * |
||
96 | * @param \Ratchet\ConnectionInterface $connection |
||
97 | * @return PromiseInterface[bool] |
||
98 | */ |
||
99 | public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface |
||
100 | { |
||
101 | return $this->getGlobalChannels($connection->app->id) |
||
102 | ->then(function ($channels) use ($connection) { |
||
103 | foreach ($channels as $channel) { |
||
104 | $this->unsubscribeFromChannel($connection, $channel, new stdClass); |
||
105 | } |
||
106 | }) |
||
107 | ->then(function () use ($connection) { |
||
108 | return parent::unsubscribeFromAllChannels($connection); |
||
109 | }); |
||
110 | } |
||
111 | |||
112 | /** |
||
113 | * Subscribe the connection to a specific channel. |
||
114 | * |
||
115 | * @param \Ratchet\ConnectionInterface $connection |
||
116 | * @param string $channelName |
||
117 | * @param stdClass $payload |
||
118 | * @return PromiseInterface[bool] |
||
119 | */ |
||
120 | public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface |
||
121 | { |
||
122 | return $this->subscribeToTopic($connection->app->id, $channelName) |
||
123 | ->then(function () use ($connection) { |
||
124 | return $this->addConnectionToSet($connection, Carbon::now()); |
||
125 | }) |
||
126 | ->then(function () use ($connection, $channelName) { |
||
127 | return $this->addChannelToSet($connection->app->id, $channelName); |
||
128 | }) |
||
129 | ->then(function () use ($connection, $channelName) { |
||
130 | return $this->incrementSubscriptionsCount($connection->app->id, $channelName, 1); |
||
131 | }) |
||
132 | ->then(function () use ($connection, $channelName, $payload) { |
||
133 | return parent::subscribeToChannel($connection, $channelName, $payload); |
||
134 | }); |
||
135 | } |
||
136 | |||
137 | /** |
||
138 | * Unsubscribe the connection from the channel. |
||
139 | * |
||
140 | * @param \Ratchet\ConnectionInterface $connection |
||
141 | * @param string $channelName |
||
142 | * @param stdClass $payload |
||
143 | * @return PromiseInterface[bool] |
||
144 | */ |
||
145 | public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface |
||
146 | { |
||
147 | return parent::unsubscribeFromChannel($connection, $channelName, $payload) |
||
148 | ->then(function () use ($connection, $channelName) { |
||
149 | return $this->decrementSubscriptionsCount($connection->app->id, $channelName); |
||
150 | }) |
||
151 | ->then(function ($count) use ($connection, $channelName) { |
||
152 | $this->removeConnectionFromSet($connection); |
||
153 | // If the total connections count gets to 0 after unsubscribe, |
||
154 | // try again to check & unsubscribe from the PubSub topic if needed. |
||
155 | if ($count < 1) { |
||
156 | $this->removeChannelFromSet($connection->app->id, $channelName); |
||
157 | $this->unsubscribeFromTopic($connection->app->id, $channelName); |
||
158 | } |
||
159 | }); |
||
160 | } |
||
161 | |||
162 | /** |
||
163 | * Subscribe the connection to a specific channel, returning |
||
164 | * a promise containing the amount of connections. |
||
165 | * |
||
166 | * @param string|int $appId |
||
167 | * @return PromiseInterface[int] |
||
168 | */ |
||
169 | public function subscribeToApp($appId): PromiseInterface |
||
170 | { |
||
171 | return $this->subscribeToTopic($appId) |
||
172 | ->then(function () use ($appId) { |
||
173 | return $this->incrementSubscriptionsCount($appId); |
||
174 | }); |
||
175 | } |
||
176 | |||
177 | /** |
||
178 | * Unsubscribe the connection from the channel, returning |
||
179 | * a promise containing the amount of connections after decrement. |
||
180 | * |
||
181 | * @param string|int $appId |
||
182 | * @return PromiseInterface[int] |
||
183 | */ |
||
184 | public function unsubscribeFromApp($appId): PromiseInterface |
||
185 | { |
||
186 | return $this->unsubscribeFromTopic($appId) |
||
187 | ->then(function () use ($appId) { |
||
188 | return $this->decrementSubscriptionsCount($appId); |
||
189 | }); |
||
190 | } |
||
191 | |||
192 | /** |
||
193 | * Get the connections count |
||
194 | * across multiple servers. |
||
195 | * |
||
196 | * @param string|int $appId |
||
197 | * @param string|null $channelName |
||
198 | * @return PromiseInterface[int] |
||
199 | */ |
||
200 | public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface |
||
201 | { |
||
202 | return $this->publishClient |
||
203 | ->hget($this->getStatsRedisHash($appId, $channelName), 'connections') |
||
204 | ->then(function ($count) { |
||
205 | return is_null($count) ? 0 : (int) $count; |
||
206 | }); |
||
207 | } |
||
208 | |||
209 | /** |
||
210 | * Broadcast the message across multiple servers. |
||
211 | * |
||
212 | * @param string|int $appId |
||
213 | * @param string|null $socketId |
||
214 | * @param string $channel |
||
215 | * @param stdClass $payload |
||
216 | * @param string|null $serverId |
||
217 | * @return PromiseInterface[bool] |
||
218 | */ |
||
219 | public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface |
||
220 | { |
||
221 | $payload->appId = $appId; |
||
222 | $payload->socketId = $socketId; |
||
223 | $payload->serverId = $serverId ?: $this->getServerId(); |
||
224 | |||
225 | return $this->publishClient |
||
226 | ->publish($this->getRedisTopicName($appId, $channel), json_encode($payload)) |
||
227 | ->then(function () use ($appId, $socketId, $channel, $payload, $serverId) { |
||
228 | return parent::broadcastAcrossServers($appId, $socketId, $channel, $payload, $serverId); |
||
229 | }); |
||
230 | } |
||
231 | |||
232 | /** |
||
233 | * Handle the user when it joined a presence channel. |
||
234 | * |
||
235 | * @param \Ratchet\ConnectionInterface $connection |
||
236 | * @param stdClass $user |
||
237 | * @param string $channel |
||
238 | * @param stdClass $payload |
||
239 | * @return PromiseInterface |
||
240 | */ |
||
241 | public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload): PromiseInterface |
||
242 | { |
||
243 | return $this->storeUserData($connection->app->id, $channel, $connection->socketId, json_encode($user)) |
||
244 | ->then(function () use ($connection, $channel, $user) { |
||
245 | return $this->addUserSocket($connection->app->id, $channel, $user, $connection->socketId); |
||
246 | }) |
||
247 | ->then(function () use ($connection, $user, $channel, $payload) { |
||
248 | return parent::userJoinedPresenceChannel($connection, $user, $channel, $payload); |
||
249 | }); |
||
250 | } |
||
251 | |||
252 | /** |
||
253 | * Handle the user when it left a presence channel. |
||
254 | * |
||
255 | * @param \Ratchet\ConnectionInterface $connection |
||
256 | * @param stdClass $user |
||
257 | * @param string $channel |
||
258 | * @param stdClass $payload |
||
259 | * @return PromiseInterface[bool] |
||
260 | */ |
||
261 | public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel): PromiseInterface |
||
262 | { |
||
263 | return $this->removeUserData($connection->app->id, $channel, $connection->socketId) |
||
264 | ->then(function () use ($connection, $channel, $user) { |
||
265 | return $this->removeUserSocket($connection->app->id, $channel, $user, $connection->socketId); |
||
266 | }) |
||
267 | ->then(function () use ($connection, $user, $channel) { |
||
268 | return parent::userLeftPresenceChannel($connection, $user, $channel); |
||
269 | }); |
||
270 | } |
||
271 | |||
272 | /** |
||
273 | * Get the presence channel members. |
||
274 | * |
||
275 | * @param string|int $appId |
||
276 | * @param string $channel |
||
277 | * @return \React\Promise\PromiseInterface[array] |
||
278 | */ |
||
279 | public function getChannelMembers($appId, string $channel): PromiseInterface |
||
280 | { |
||
281 | return $this->publishClient |
||
282 | ->hgetall($this->getUsersRedisHash($appId, $channel)) |
||
283 | ->then(function ($list) { |
||
284 | return collect(Helpers::redisListToArray($list))->map(function ($user) { |
||
285 | return json_decode($user); |
||
286 | })->unique('user_id')->toArray(); |
||
287 | }); |
||
288 | } |
||
289 | |||
290 | /** |
||
291 | * Get a member from a presence channel based on connection. |
||
292 | * |
||
293 | * @param \Ratchet\ConnectionInterface $connection |
||
294 | * @param string $channel |
||
295 | * @return \React\Promise\PromiseInterface[null|array] |
||
296 | */ |
||
297 | public function getChannelMember(ConnectionInterface $connection, string $channel): PromiseInterface |
||
298 | { |
||
299 | return $this->publishClient->hget( |
||
300 | $this->getUsersRedisHash($connection->app->id, $channel), $connection->socketId |
||
301 | ); |
||
302 | } |
||
303 | |||
304 | /** |
||
305 | * Get the presence channels total members count. |
||
306 | * |
||
307 | * @param string|int $appId |
||
308 | * @param array $channelNames |
||
309 | * @return \React\Promise\PromiseInterface[array] |
||
310 | */ |
||
311 | public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface |
||
312 | { |
||
313 | $this->publishClient->multi(); |
||
314 | |||
315 | foreach ($channelNames as $channel) { |
||
316 | $this->publishClient->hlen( |
||
317 | $this->getUsersRedisHash($appId, $channel) |
||
318 | ); |
||
319 | } |
||
320 | |||
321 | return $this->publishClient->exec() |
||
322 | ->then(function ($data) use ($channelNames) { |
||
323 | return array_combine($channelNames, $data); |
||
324 | }); |
||
325 | } |
||
326 | |||
327 | /** |
||
328 | * Get the socket IDs for a presence channel member. |
||
329 | * |
||
330 | * @param string|int $userId |
||
331 | * @param string|int $appId |
||
332 | * @param string $channelName |
||
333 | * @return \React\Promise\PromiseInterface[array] |
||
334 | */ |
||
335 | public function getMemberSockets($userId, $appId, $channelName): PromiseInterface |
||
336 | { |
||
337 | return $this->publishClient->smembers( |
||
338 | $this->getUserSocketsRedisHash($appId, $channelName, $userId) |
||
339 | ); |
||
340 | } |
||
341 | |||
342 | /** |
||
343 | * Keep tracking the connections availability when they pong. |
||
344 | * |
||
345 | * @param \Ratchet\ConnectionInterface $connection |
||
346 | * @return PromiseInterface[bool] |
||
347 | */ |
||
348 | public function connectionPonged(ConnectionInterface $connection): PromiseInterface |
||
349 | { |
||
350 | // This will update the score with the current timestamp. |
||
351 | return $this->addConnectionToSet($connection, Carbon::now()) |
||
352 | ->then(function () use ($connection) { |
||
353 | $payload = [ |
||
354 | 'socketId' => $connection->socketId, |
||
355 | 'appId' => $connection->app->id, |
||
356 | 'serverId' => $this->getServerId(), |
||
357 | ]; |
||
358 | |||
359 | return $this->publishClient |
||
360 | ->publish($this->getPongRedisHash($connection->app->id), json_encode($payload)); |
||
361 | }) |
||
362 | ->then(function () use ($connection) { |
||
363 | return parent::connectionPonged($connection); |
||
364 | }); |
||
365 | } |
||
366 | |||
367 | /** |
||
368 | * Remove the obsolete connections that didn't ponged in a while. |
||
369 | * |
||
370 | * @return PromiseInterface[bool] |
||
371 | */ |
||
372 | public function removeObsoleteConnections(): PromiseInterface |
||
373 | { |
||
374 | $lock = $this->lock(); |
||
375 | try { |
||
376 | $lock->get(function () { |
||
377 | $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) |
||
378 | ->then(function ($connections) { |
||
379 | foreach ($connections as $socketId => $appId) { |
||
380 | $connection = $this->fakeConnectionForApp($appId, $socketId); |
||
381 | |||
382 | $this->unsubscribeFromAllChannels($connection); |
||
383 | } |
||
384 | }); |
||
385 | }); |
||
386 | |||
387 | return parent::removeObsoleteConnections(); |
||
388 | } finally { |
||
389 | optional($lock)->forceRelease(); |
||
390 | } |
||
391 | } |
||
392 | |||
393 | /** |
||
394 | * Handle a message received from Redis on a specific channel. |
||
395 | * |
||
396 | * @param string $redisChannel |
||
397 | * @param string $payload |
||
398 | * @return void |
||
399 | */ |
||
400 | public function onMessage(string $redisChannel, string $payload) |
||
401 | { |
||
402 | $payload = json_decode($payload); |
||
403 | |||
404 | if (isset($payload->serverId) && $this->getServerId() === $payload->serverId) { |
||
405 | return; |
||
406 | } |
||
407 | |||
408 | if ($redisChannel == $this->getPongRedisHash($payload->appId)) { |
||
409 | $connection = $this->fakeConnectionForApp($payload->appId, $payload->socketId); |
||
410 | |||
411 | return parent::connectionPonged($connection); |
||
412 | } |
||
413 | |||
414 | $payload->channel = Str::after($redisChannel, "{$payload->appId}:"); |
||
415 | |||
416 | if (! $channel = $this->find($payload->appId, $payload->channel)) { |
||
417 | return; |
||
418 | } |
||
419 | |||
420 | $appId = $payload->appId ?? null; |
||
421 | $socketId = $payload->socketId ?? null; |
||
422 | $serverId = $payload->serverId ?? null; |
||
423 | |||
424 | DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_RECEIVED, [ |
||
425 | 'fromServerId' => $serverId, |
||
426 | 'fromSocketId' => $socketId, |
||
427 | 'receiverServerId' => $this->getServerId(), |
||
428 | 'channel' => $channel, |
||
429 | 'payload' => $payload, |
||
430 | ]); |
||
431 | |||
432 | unset($payload->socketId); |
||
433 | unset($payload->serverId); |
||
434 | unset($payload->appId); |
||
435 | |||
436 | $channel->broadcastLocallyToEveryoneExcept($payload, $socketId, $appId); |
||
437 | } |
||
438 | |||
439 | public function find($appId, string $channel) |
||
440 | { |
||
441 | if (! $channelInstance = parent::find($appId, $channel)) { |
||
442 | $class = $this->getChannelClassName($channel); |
||
443 | $this->channels[$appId][$channel] = new $class($channel); |
||
444 | } |
||
445 | |||
446 | return parent::find($appId, $channel); |
||
447 | } |
||
448 | |||
449 | /** |
||
450 | * Build the Redis connection URL from Laravel database config. |
||
451 | * |
||
452 | * @return string |
||
453 | */ |
||
454 | protected function getConnectionUri() |
||
455 | { |
||
456 | $name = config('websockets.replication.modes.redis.connection', 'default'); |
||
457 | $config = config("database.redis.{$name}"); |
||
458 | |||
459 | $host = $config['host']; |
||
460 | $port = $config['port'] ?: 6379; |
||
461 | |||
462 | $query = []; |
||
463 | |||
464 | if ($config['password']) { |
||
465 | $query['password'] = $config['password']; |
||
466 | } |
||
467 | |||
468 | if ($config['database']) { |
||
469 | $query['db'] = $config['database']; |
||
470 | } |
||
471 | |||
472 | $query = http_build_query($query); |
||
473 | |||
474 | return "redis://{$host}:{$port}".($query ? "?{$query}" : ''); |
||
475 | } |
||
476 | |||
477 | /** |
||
478 | * Get the Subscribe client instance. |
||
479 | * |
||
480 | * @return Client |
||
481 | */ |
||
482 | public function getSubscribeClient() |
||
483 | { |
||
484 | return $this->subscribeClient; |
||
485 | } |
||
486 | |||
487 | /** |
||
488 | * Get the Publish client instance. |
||
489 | * |
||
490 | * @return Client |
||
491 | */ |
||
492 | public function getPublishClient() |
||
493 | { |
||
494 | return $this->publishClient; |
||
495 | } |
||
496 | |||
497 | /** |
||
498 | * Get the Redis client used by other classes. |
||
499 | * |
||
500 | * @return Client |
||
501 | */ |
||
502 | public function getRedisClient() |
||
503 | { |
||
504 | return $this->getPublishClient(); |
||
505 | } |
||
506 | |||
507 | /** |
||
508 | * Increment the subscribed count number. |
||
509 | * |
||
510 | * @param string|int $appId |
||
511 | * @param string|null $channel |
||
512 | * @param int $increment |
||
513 | * @return PromiseInterface[int] |
||
514 | */ |
||
515 | public function incrementSubscriptionsCount($appId, string $channel = null, int $increment = 1): PromiseInterface |
||
516 | { |
||
517 | return $this->publishClient->hincrby( |
||
518 | $this->getStatsRedisHash($appId, $channel), 'connections', $increment |
||
519 | ); |
||
520 | } |
||
521 | |||
522 | /** |
||
523 | * Decrement the subscribed count number. |
||
524 | * |
||
525 | * @param string|int $appId |
||
526 | * @param string|null $channel |
||
527 | * @param int $decrement |
||
528 | * @return PromiseInterface[int] |
||
529 | */ |
||
530 | public function decrementSubscriptionsCount($appId, string $channel = null, int $increment = 1): PromiseInterface |
||
531 | { |
||
532 | return $this->incrementSubscriptionsCount($appId, $channel, $increment * -1); |
||
533 | } |
||
534 | |||
535 | /** |
||
536 | * Add the connection to the sorted list. |
||
537 | * |
||
538 | * @param \Ratchet\ConnectionInterface $connection |
||
539 | * @param \DateTime|string|null $moment |
||
540 | * @return PromiseInterface |
||
541 | */ |
||
542 | public function addConnectionToSet(ConnectionInterface $connection, $moment = null): PromiseInterface |
||
543 | { |
||
544 | $moment = $moment ? Carbon::parse($moment) : Carbon::now(); |
||
545 | |||
546 | return $this->publishClient->zadd( |
||
547 | $this->getSocketsRedisHash(), |
||
548 | $moment->format('U'), "{$connection->app->id}:{$connection->socketId}" |
||
549 | ); |
||
550 | } |
||
551 | |||
552 | /** |
||
553 | * Remove the connection from the sorted list. |
||
554 | * |
||
555 | * @param \Ratchet\ConnectionInterface $connection |
||
556 | * @return PromiseInterface |
||
557 | */ |
||
558 | public function removeConnectionFromSet(ConnectionInterface $connection): PromiseInterface |
||
559 | { |
||
560 | return $this->publishClient->zrem( |
||
561 | $this->getSocketsRedisHash(), |
||
562 | "{$connection->app->id}:{$connection->socketId}" |
||
563 | ); |
||
564 | } |
||
565 | |||
566 | /** |
||
567 | * Get the connections from the sorted list, with last |
||
568 | * connection between certain timestamps. |
||
569 | * |
||
570 | * @param int $start |
||
571 | * @param int $stop |
||
572 | * @param bool $strict |
||
573 | * @return PromiseInterface[array] |
||
574 | */ |
||
575 | public function getConnectionsFromSet(int $start = 0, int $stop = 0, bool $strict = true): PromiseInterface |
||
576 | { |
||
577 | if ($strict) { |
||
578 | $start = "({$start}"; |
||
579 | $stop = "({$stop}"; |
||
580 | } |
||
581 | |||
582 | return $this->publishClient |
||
583 | ->zrangebyscore($this->getSocketsRedisHash(), $start, $stop) |
||
584 | ->then(function ($list) { |
||
585 | return collect($list)->mapWithKeys(function ($appWithSocket) { |
||
586 | [$appId, $socketId] = explode(':', $appWithSocket); |
||
587 | |||
588 | return [$socketId => $appId]; |
||
589 | })->toArray(); |
||
590 | }); |
||
591 | } |
||
592 | |||
593 | /** |
||
594 | * Add a channel to the set list. |
||
595 | * |
||
596 | * @param string|int $appId |
||
597 | * @param string $channel |
||
598 | * @return PromiseInterface |
||
599 | */ |
||
600 | public function addChannelToSet($appId, string $channel): PromiseInterface |
||
601 | { |
||
602 | return $this->publishClient->sadd( |
||
603 | $this->getChannelsRedisHash($appId), $channel |
||
604 | ); |
||
605 | } |
||
606 | |||
607 | /** |
||
608 | * Remove a channel from the set list. |
||
609 | * |
||
610 | * @param string|int $appId |
||
611 | * @param string $channel |
||
612 | * @return PromiseInterface |
||
613 | */ |
||
614 | public function removeChannelFromSet($appId, string $channel): PromiseInterface |
||
615 | { |
||
616 | return $this->publishClient->srem( |
||
617 | $this->getChannelsRedisHash($appId), $channel |
||
618 | ); |
||
619 | } |
||
620 | |||
621 | /** |
||
622 | * Check if channel is on the list. |
||
623 | * |
||
624 | * @param string|int $appId |
||
625 | * @param string $channel |
||
626 | * @return PromiseInterface |
||
627 | */ |
||
628 | public function isChannelInSet($appId, string $channel): PromiseInterface |
||
629 | { |
||
630 | return $this->publishClient->sismember( |
||
631 | $this->getChannelsRedisHash($appId), $channel |
||
632 | ); |
||
633 | } |
||
634 | |||
635 | /** |
||
636 | * Set data for a topic. Might be used for the presence channels. |
||
637 | * |
||
638 | * @param string|int $appId |
||
639 | * @param string|null $channel |
||
640 | * @param string $key |
||
641 | * @param string $data |
||
642 | * @return PromiseInterface |
||
643 | */ |
||
644 | public function storeUserData($appId, string $channel = null, string $key, $data): PromiseInterface |
||
645 | { |
||
646 | return $this->publishClient->hset( |
||
647 | $this->getUsersRedisHash($appId, $channel), $key, $data |
||
648 | ); |
||
649 | } |
||
650 | |||
651 | /** |
||
652 | * Remove data for a topic. Might be used for the presence channels. |
||
653 | * |
||
654 | * @param string|int $appId |
||
655 | * @param string|null $channel |
||
656 | * @param string $key |
||
657 | * @return PromiseInterface |
||
658 | */ |
||
659 | public function removeUserData($appId, string $channel = null, string $key): PromiseInterface |
||
660 | { |
||
661 | return $this->publishClient->hdel( |
||
662 | $this->getUsersRedisHash($appId, $channel), $key |
||
663 | ); |
||
664 | } |
||
665 | |||
666 | /** |
||
667 | * Subscribe to the topic for the app, or app and channel. |
||
668 | * |
||
669 | * @param string|int $appId |
||
670 | * @param string|null $channel |
||
671 | * @return PromiseInterface |
||
672 | */ |
||
673 | public function subscribeToTopic($appId, string $channel = null): PromiseInterface |
||
674 | { |
||
675 | $topic = $this->getRedisTopicName($appId, $channel); |
||
676 | |||
677 | DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [ |
||
678 | 'serverId' => $this->getServerId(), |
||
679 | 'pubsubTopic' => $topic, |
||
680 | ]); |
||
681 | |||
682 | return $this->subscribeClient->subscribe($topic); |
||
683 | } |
||
684 | |||
685 | /** |
||
686 | * Unsubscribe from the topic for the app, or app and channel. |
||
687 | * |
||
688 | * @param string|int $appId |
||
689 | * @param string|null $channel |
||
690 | * @return PromiseInterface |
||
691 | */ |
||
692 | public function unsubscribeFromTopic($appId, string $channel = null): PromiseInterface |
||
693 | { |
||
694 | $topic = $this->getRedisTopicName($appId, $channel); |
||
695 | |||
696 | DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [ |
||
697 | 'serverId' => $this->getServerId(), |
||
698 | 'pubsubTopic' => $topic, |
||
699 | ]); |
||
700 | |||
701 | return $this->subscribeClient->unsubscribe($topic); |
||
702 | } |
||
703 | |||
704 | /** |
||
705 | * Add the Presence Channel's User's Socket ID to a list. |
||
706 | * |
||
707 | * @param string|int $appId |
||
708 | * @param string $channel |
||
709 | * @param stdClass $user |
||
710 | * @param string $socketId |
||
711 | * @return PromiseInterface |
||
712 | */ |
||
713 | protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface |
||
717 | ); |
||
718 | } |
||
719 | |||
720 | /** |
||
721 | * Remove the Presence Channel's User's Socket ID from the list. |
||
722 | * |
||
723 | * @param string|int $appId |
||
724 | * @param string $channel |
||
725 | * @param stdClass $user |
||
726 | * @param string $socketId |
||
727 | * @return PromiseInterface |
||
728 | */ |
||
729 | protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface |
||
730 | { |
||
731 | return $this->publishClient->srem( |
||
732 | $this->getUserSocketsRedisHash($appId, $channel, $user->user_id), $socketId |
||
733 | ); |
||
734 | } |
||
735 | |||
736 | /** |
||
737 | * Get the Redis Keyspace name to handle subscriptions |
||
738 | * and other key-value sets. |
||
739 | * |
||
740 | * @param string|int|null $appId |
||
741 | * @param string|null $channel |
||
742 | * @return string |
||
743 | */ |
||
744 | public function getRedisKey($appId = null, string $channel = null, array $suffixes = []): string |
||
745 | { |
||
746 | $prefix = config('database.redis.options.prefix', null); |
||
747 | |||
748 | $hash = "{$prefix}{$appId}"; |
||
749 | |||
750 | if ($channel) { |
||
751 | $suffixes = array_merge([$channel], $suffixes); |
||
752 | } |
||
753 | |||
754 | $suffixes = implode(':', $suffixes); |
||
755 | |||
756 | if ($suffixes) { |
||
757 | $hash .= ":{$suffixes}"; |
||
758 | } |
||
759 | |||
760 | return $hash; |
||
761 | } |
||
762 | |||
763 | /** |
||
764 | * Get the pong Redis hash. |
||
765 | * |
||
766 | * @param string|int $appId |
||
767 | */ |
||
768 | public function getPongRedisHash($appId): string |
||
769 | { |
||
770 | return $this->getRedisKey($appId, null, ['pong']); |
||
771 | } |
||
772 | |||
773 | /** |
||
774 | * Get the statistics Redis hash. |
||
775 | * |
||
776 | * @param string|int $appId |
||
777 | * @param string|null $channel |
||
778 | * @return string |
||
779 | */ |
||
780 | public function getStatsRedisHash($appId, string $channel = null): string |
||
781 | { |
||
782 | return $this->getRedisKey($appId, $channel, ['stats']); |
||
783 | } |
||
784 | |||
785 | /** |
||
786 | * Get the sockets Redis hash used to store all sockets ids. |
||
787 | * |
||
788 | * @return string |
||
789 | */ |
||
790 | public function getSocketsRedisHash(): string |
||
791 | { |
||
792 | return $this->getRedisKey(null, null, ['sockets']); |
||
793 | } |
||
794 | |||
795 | /** |
||
796 | * Get the channels Redis hash for a specific app id, used |
||
797 | * to store existing channels. |
||
798 | * |
||
799 | * @param string|int $appId |
||
800 | * @return string |
||
801 | */ |
||
802 | public function getChannelsRedisHash($appId): string |
||
805 | } |
||
806 | |||
807 | /** |
||
808 | * Get the Redis hash for storing presence channels users. |
||
809 | * |
||
810 | * @param string|int $appId |
||
811 | * @param string|null $channel |
||
812 | * @return string |
||
813 | */ |
||
814 | public function getUsersRedisHash($appId, string $channel = null): string |
||
815 | { |
||
816 | return $this->getRedisKey($appId, $channel, ['users']); |
||
817 | } |
||
818 | |||
819 | /** |
||
820 | * Get the Redis hash for storing socket ids |
||
821 | * for a specific presence channels user. |
||
822 | * |
||
823 | * @param string|int $appId |
||
824 | * @param string|null $channel |
||
825 | * @param string|int|null $userId |
||
826 | * @return string |
||
827 | */ |
||
828 | public function getUserSocketsRedisHash($appId, string $channel = null, $userId = null): string |
||
831 | } |
||
832 | |||
833 | /** |
||
834 | * Get the Redis topic name for PubSub |
||
835 | * used to transfer info between servers. |
||
836 | * |
||
837 | * @param string|int $appId |
||
838 | * @param string|null $channel |
||
839 | * @return string |
||
840 | */ |
||
841 | public function getRedisTopicName($appId, string $channel = null): string |
||
844 | } |
||
845 | |||
846 | /** |
||
847 | * Get a new RedisLock instance to avoid race conditions. |
||
848 | * |
||
849 | * @return \Illuminate\Cache\CacheLock |
||
850 | */ |
||
851 | protected function lock() |
||
852 | { |
||
853 | return new RedisLock($this->redis, static::$lockName, 0); |
||
854 | } |
||
855 | |||
856 | /** |
||
857 | * Create a fake connection for app that will mimick a connection |
||
858 | * by app ID and Socket ID to be able to be passed to the methods |
||
859 | * that accepts a connection class. |
||
860 | * |
||
861 | * @param string|int $appId |
||
862 | * @param string $socketId |
||
863 | * @return ConnectionInterface |
||
864 | */ |
||
865 | public function fakeConnectionForApp($appId, string $socketId) |
||
868 | } |
||
869 | } |
||
870 |
Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.
Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..