1 | <?php |
||||
2 | |||||
3 | namespace BeyondCode\LaravelWebSockets\ChannelManagers; |
||||
4 | |||||
5 | use BeyondCode\LaravelWebSockets\Channels\Channel; |
||||
6 | use BeyondCode\LaravelWebSockets\Channels\PresenceChannel; |
||||
7 | use BeyondCode\LaravelWebSockets\Channels\PrivateChannel; |
||||
8 | use BeyondCode\LaravelWebSockets\Contracts\ChannelManager; |
||||
9 | use BeyondCode\LaravelWebSockets\Helpers; |
||||
10 | use Carbon\Carbon; |
||||
11 | use Illuminate\Cache\ArrayLock; |
||||
12 | use Illuminate\Cache\ArrayStore; |
||||
13 | use Illuminate\Support\Str; |
||||
14 | use Ratchet\ConnectionInterface; |
||||
15 | use React\EventLoop\LoopInterface; |
||||
16 | use React\Promise\PromiseInterface; |
||||
17 | use stdClass; |
||||
18 | |||||
19 | class LocalChannelManager implements ChannelManager |
||||
20 | { |
||||
21 | /** |
||||
22 | * The list of stored channels. |
||||
23 | * |
||||
24 | * @var array |
||||
25 | */ |
||||
26 | protected $channels = []; |
||||
27 | |||||
28 | /** |
||||
29 | * The list of users that joined the presence channel. |
||||
30 | * |
||||
31 | * @var array |
||||
32 | */ |
||||
33 | protected $users = []; |
||||
34 | |||||
35 | /** |
||||
36 | * The list of users by socket and their attached id. |
||||
37 | * |
||||
38 | * @var array |
||||
39 | */ |
||||
40 | protected $userSockets = []; |
||||
41 | |||||
42 | /** |
||||
43 | * Wether the current instance accepts new connections. |
||||
44 | * |
||||
45 | * @var bool |
||||
46 | */ |
||||
47 | protected $acceptsNewConnections = true; |
||||
48 | |||||
49 | /** |
||||
50 | * The ArrayStore instance of locks. |
||||
51 | * |
||||
52 | * @var \Illuminate\Cache\ArrayStore |
||||
53 | */ |
||||
54 | protected $store; |
||||
55 | |||||
56 | /** |
||||
57 | * The unique server identifier. |
||||
58 | * |
||||
59 | * @var string |
||||
60 | */ |
||||
61 | protected $serverId; |
||||
62 | |||||
63 | /** |
||||
64 | * The lock name to use on Array to avoid multiple |
||||
65 | * actions that might lead to multiple processings. |
||||
66 | * |
||||
67 | * @var string |
||||
68 | */ |
||||
69 | protected static $lockName = 'laravel-websockets:channel-manager:lock'; |
||||
70 | |||||
71 | /** |
||||
72 | * Create a new channel manager instance. |
||||
73 | * |
||||
74 | * @param LoopInterface $loop |
||||
75 | * @param string|null $factoryClass |
||||
76 | * @return void |
||||
77 | */ |
||||
78 | public function __construct(LoopInterface $loop, $factoryClass = null) |
||||
79 | { |
||||
80 | $this->store = new ArrayStore; |
||||
81 | $this->serverId = Str::uuid()->toString(); |
||||
82 | } |
||||
83 | |||||
84 | /** |
||||
85 | * Find the channel by app & name. |
||||
86 | * |
||||
87 | * @param string|int $appId |
||||
88 | * @param string $channel |
||||
89 | * @return null|BeyondCode\LaravelWebSockets\Channels\Channel |
||||
0 ignored issues
–
show
Bug
introduced
by
![]() |
|||||
90 | */ |
||||
91 | public function find($appId, string $channel) |
||||
92 | { |
||||
93 | return $this->channels[$appId][$channel] ?? null; |
||||
94 | } |
||||
95 | |||||
96 | /** |
||||
97 | * Find a channel by app & name or create one. |
||||
98 | * |
||||
99 | * @param string|int $appId |
||||
100 | * @param string $channel |
||||
101 | * @return BeyondCode\LaravelWebSockets\Channels\Channel |
||||
102 | */ |
||||
103 | public function findOrCreate($appId, string $channel) |
||||
104 | { |
||||
105 | if (! $channelInstance = $this->find($appId, $channel)) { |
||||
0 ignored issues
–
show
|
|||||
106 | $class = $this->getChannelClassName($channel); |
||||
107 | |||||
108 | $this->channels[$appId][$channel] = new $class($channel); |
||||
109 | } |
||||
110 | |||||
111 | return $this->channels[$appId][$channel]; |
||||
112 | } |
||||
113 | |||||
114 | /** |
||||
115 | * Get the local connections, regardless of the channel |
||||
116 | * they are connected to. |
||||
117 | * |
||||
118 | * @return \React\Promise\PromiseInterface |
||||
119 | */ |
||||
120 | public function getLocalConnections(): PromiseInterface |
||||
121 | { |
||||
122 | $connections = collect($this->channels) |
||||
123 | ->map(function ($channelsWithConnections, $appId) { |
||||
0 ignored issues
–
show
The parameter
$appId is not used and could be removed.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for parameters that have been defined for a function or method, but which are not used in the method body. ![]() |
|||||
124 | return collect($channelsWithConnections)->values(); |
||||
125 | }) |
||||
126 | ->values()->collapse() |
||||
127 | ->map(function ($channel) { |
||||
128 | return collect($channel->getConnections()); |
||||
129 | }) |
||||
130 | ->values()->collapse() |
||||
131 | ->toArray(); |
||||
132 | |||||
133 | return Helpers::createFulfilledPromise($connections); |
||||
134 | } |
||||
135 | |||||
136 | /** |
||||
137 | * Get all channels for a specific app |
||||
138 | * for the current instance. |
||||
139 | * |
||||
140 | * @param string|int $appId |
||||
141 | * @return \React\Promise\PromiseInterface[array] |
||||
0 ignored issues
–
show
|
|||||
142 | */ |
||||
143 | public function getLocalChannels($appId): PromiseInterface |
||||
144 | { |
||||
145 | return Helpers::createFulfilledPromise( |
||||
146 | $this->channels[$appId] ?? [] |
||||
147 | ); |
||||
148 | } |
||||
149 | |||||
150 | /** |
||||
151 | * Get all channels for a specific app |
||||
152 | * across multiple servers. |
||||
153 | * |
||||
154 | * @param string|int $appId |
||||
155 | * @return \React\Promise\PromiseInterface[array] |
||||
0 ignored issues
–
show
|
|||||
156 | */ |
||||
157 | public function getGlobalChannels($appId): PromiseInterface |
||||
158 | { |
||||
159 | return $this->getLocalChannels($appId); |
||||
160 | } |
||||
161 | |||||
162 | /** |
||||
163 | * Remove connection from all channels. |
||||
164 | * |
||||
165 | * @param \Ratchet\ConnectionInterface $connection |
||||
166 | * @return PromiseInterface[bool] |
||||
0 ignored issues
–
show
|
|||||
167 | */ |
||||
168 | public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface |
||||
169 | { |
||||
170 | if (! isset($connection->app)) { |
||||
0 ignored issues
–
show
|
|||||
171 | return Helpers::createFulfilledPromise(false); |
||||
172 | } |
||||
173 | |||||
174 | $this->getLocalChannels($connection->app->id) |
||||
175 | ->then(function ($channels) use ($connection) { |
||||
176 | collect($channels) |
||||
177 | ->each(function (Channel $channel) use ($connection) { |
||||
178 | $channel->unsubscribe($connection); |
||||
179 | }); |
||||
180 | |||||
181 | collect($channels) |
||||
182 | ->reject(function ($channel) { |
||||
183 | return $channel->hasConnections(); |
||||
184 | }) |
||||
185 | ->each(function (Channel $channel, string $channelName) use ($connection) { |
||||
186 | unset($this->channels[$connection->app->id][$channelName]); |
||||
187 | }); |
||||
188 | }); |
||||
189 | |||||
190 | $this->getLocalChannels($connection->app->id) |
||||
191 | ->then(function ($channels) use ($connection) { |
||||
192 | if (count($channels) === 0) { |
||||
193 | unset($this->channels[$connection->app->id]); |
||||
194 | } |
||||
195 | }); |
||||
196 | |||||
197 | return Helpers::createFulfilledPromise(true); |
||||
198 | } |
||||
199 | |||||
200 | /** |
||||
201 | * Subscribe the connection to a specific channel. |
||||
202 | * |
||||
203 | * @param \Ratchet\ConnectionInterface $connection |
||||
204 | * @param string $channelName |
||||
205 | * @param stdClass $payload |
||||
206 | * @return PromiseInterface[bool] |
||||
0 ignored issues
–
show
|
|||||
207 | */ |
||||
208 | public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface |
||||
209 | { |
||||
210 | $channel = $this->findOrCreate($connection->app->id, $channelName); |
||||
0 ignored issues
–
show
|
|||||
211 | |||||
212 | return Helpers::createFulfilledPromise( |
||||
213 | $channel->subscribe($connection, $payload) |
||||
214 | ); |
||||
215 | } |
||||
216 | |||||
217 | /** |
||||
218 | * Unsubscribe the connection from the channel. |
||||
219 | * |
||||
220 | * @param \Ratchet\ConnectionInterface $connection |
||||
221 | * @param string $channelName |
||||
222 | * @param stdClass $payload |
||||
223 | * @return PromiseInterface[bool] |
||||
0 ignored issues
–
show
|
|||||
224 | */ |
||||
225 | public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface |
||||
226 | { |
||||
227 | $channel = $this->findOrCreate($connection->app->id, $channelName); |
||||
0 ignored issues
–
show
|
|||||
228 | |||||
229 | return Helpers::createFulfilledPromise( |
||||
230 | $channel->unsubscribe($connection, $payload) |
||||
231 | ); |
||||
232 | } |
||||
233 | |||||
234 | /** |
||||
235 | * Subscribe the connection to a specific channel, returning |
||||
236 | * a promise containing the amount of connections. |
||||
237 | * |
||||
238 | * @param string|int $appId |
||||
239 | * @return PromiseInterface[int] |
||||
0 ignored issues
–
show
|
|||||
240 | */ |
||||
241 | public function subscribeToApp($appId): PromiseInterface |
||||
242 | { |
||||
243 | return Helpers::createFulfilledPromise(0); |
||||
244 | } |
||||
245 | |||||
246 | /** |
||||
247 | * Unsubscribe the connection from the channel, returning |
||||
248 | * a promise containing the amount of connections after decrement. |
||||
249 | * |
||||
250 | * @param string|int $appId |
||||
251 | * @return PromiseInterface[int] |
||||
0 ignored issues
–
show
|
|||||
252 | */ |
||||
253 | public function unsubscribeFromApp($appId): PromiseInterface |
||||
254 | { |
||||
255 | return Helpers::createFulfilledPromise(0); |
||||
256 | } |
||||
257 | |||||
258 | /** |
||||
259 | * Get the connections count on the app |
||||
260 | * for the current server instance. |
||||
261 | * |
||||
262 | * @param string|int $appId |
||||
263 | * @param string|null $channelName |
||||
264 | * @return PromiseInterface[int] |
||||
0 ignored issues
–
show
|
|||||
265 | */ |
||||
266 | public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface |
||||
267 | { |
||||
268 | return $this->getLocalChannels($appId) |
||||
269 | ->then(function ($channels) use ($channelName) { |
||||
270 | return collect($channels)->when(! is_null($channelName), function ($collection) use ($channelName) { |
||||
271 | return $collection->filter(function (Channel $channel) use ($channelName) { |
||||
272 | return $channel->getName() === $channelName; |
||||
273 | }); |
||||
274 | }) |
||||
275 | ->flatMap(function (Channel $channel) { |
||||
276 | return collect($channel->getConnections())->pluck('socketId'); |
||||
277 | }) |
||||
278 | ->unique()->count(); |
||||
279 | }); |
||||
280 | } |
||||
281 | |||||
282 | /** |
||||
283 | * Get the connections count |
||||
284 | * across multiple servers. |
||||
285 | * |
||||
286 | * @param string|int $appId |
||||
287 | * @param string|null $channelName |
||||
288 | * @return PromiseInterface[int] |
||||
0 ignored issues
–
show
|
|||||
289 | */ |
||||
290 | public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface |
||||
291 | { |
||||
292 | return $this->getLocalConnectionsCount($appId, $channelName); |
||||
293 | } |
||||
294 | |||||
295 | /** |
||||
296 | * Broadcast the message across multiple servers. |
||||
297 | * |
||||
298 | * @param string|int $appId |
||||
299 | * @param string|null $socketId |
||||
300 | * @param string $channel |
||||
301 | * @param stdClass $payload |
||||
302 | * @param string|null $serverId |
||||
303 | * @return PromiseInterface[bool] |
||||
0 ignored issues
–
show
|
|||||
304 | */ |
||||
305 | public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface |
||||
306 | { |
||||
307 | return Helpers::createFulfilledPromise(true); |
||||
308 | } |
||||
309 | |||||
310 | /** |
||||
311 | * Handle the user when it joined a presence channel. |
||||
312 | * |
||||
313 | * @param \Ratchet\ConnectionInterface $connection |
||||
314 | * @param stdClass $user |
||||
315 | * @param string $channel |
||||
316 | * @param stdClass $payload |
||||
317 | * @return PromiseInterface[bool] |
||||
0 ignored issues
–
show
|
|||||
318 | */ |
||||
319 | public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload): PromiseInterface |
||||
320 | { |
||||
321 | $this->users["{$connection->app->id}:{$channel}"][$connection->socketId] = json_encode($user); |
||||
0 ignored issues
–
show
|
|||||
322 | $this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][] = $connection->socketId; |
||||
323 | |||||
324 | return Helpers::createFulfilledPromise(true); |
||||
325 | } |
||||
326 | |||||
327 | /** |
||||
328 | * Handle the user when it left a presence channel. |
||||
329 | * |
||||
330 | * @param \Ratchet\ConnectionInterface $connection |
||||
331 | * @param stdClass $user |
||||
332 | * @param string $channel |
||||
333 | * @param stdClass $payload |
||||
334 | * @return PromiseInterface[bool] |
||||
0 ignored issues
–
show
|
|||||
335 | */ |
||||
336 | public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel): PromiseInterface |
||||
337 | { |
||||
338 | unset($this->users["{$connection->app->id}:{$channel}"][$connection->socketId]); |
||||
0 ignored issues
–
show
|
|||||
339 | |||||
340 | $deletableSocketKey = array_search( |
||||
341 | $connection->socketId, |
||||
342 | $this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"] |
||||
343 | ); |
||||
344 | |||||
345 | if ($deletableSocketKey !== false) { |
||||
346 | unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][$deletableSocketKey]); |
||||
347 | |||||
348 | if (count($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]) === 0) { |
||||
349 | unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]); |
||||
350 | } |
||||
351 | } |
||||
352 | |||||
353 | return Helpers::createFulfilledPromise(true); |
||||
354 | } |
||||
355 | |||||
356 | /** |
||||
357 | * Get the presence channel members. |
||||
358 | * |
||||
359 | * @param string|int $appId |
||||
360 | * @param string $channel |
||||
361 | * @return \React\Promise\PromiseInterface |
||||
362 | */ |
||||
363 | public function getChannelMembers($appId, string $channel): PromiseInterface |
||||
364 | { |
||||
365 | $members = $this->users["{$appId}:{$channel}"] ?? []; |
||||
366 | |||||
367 | $members = collect($members)->map(function ($user) { |
||||
368 | return json_decode($user); |
||||
369 | })->unique('user_id')->toArray(); |
||||
370 | |||||
371 | return Helpers::createFulfilledPromise($members); |
||||
372 | } |
||||
373 | |||||
374 | /** |
||||
375 | * Get a member from a presence channel based on connection. |
||||
376 | * |
||||
377 | * @param \Ratchet\ConnectionInterface $connection |
||||
378 | * @param string $channel |
||||
379 | * @return \React\Promise\PromiseInterface |
||||
380 | */ |
||||
381 | public function getChannelMember(ConnectionInterface $connection, string $channel): PromiseInterface |
||||
382 | { |
||||
383 | $member = $this->users["{$connection->app->id}:{$channel}"][$connection->socketId] ?? null; |
||||
0 ignored issues
–
show
|
|||||
384 | |||||
385 | return Helpers::createFulfilledPromise($member); |
||||
386 | } |
||||
387 | |||||
388 | /** |
||||
389 | * Get the presence channels total members count. |
||||
390 | * |
||||
391 | * @param string|int $appId |
||||
392 | * @param array $channelNames |
||||
393 | * @return \React\Promise\PromiseInterface |
||||
394 | */ |
||||
395 | public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface |
||||
396 | { |
||||
397 | $results = collect($channelNames) |
||||
398 | ->reduce(function ($results, $channel) use ($appId) { |
||||
399 | $results[$channel] = isset($this->users["{$appId}:{$channel}"]) |
||||
400 | ? count($this->users["{$appId}:{$channel}"]) |
||||
401 | : 0; |
||||
402 | |||||
403 | return $results; |
||||
404 | }, []); |
||||
405 | |||||
406 | return Helpers::createFulfilledPromise($results); |
||||
407 | } |
||||
408 | |||||
409 | /** |
||||
410 | * Get the socket IDs for a presence channel member. |
||||
411 | * |
||||
412 | * @param string|int $userId |
||||
413 | * @param string|int $appId |
||||
414 | * @param string $channelName |
||||
415 | * @return \React\Promise\PromiseInterface |
||||
416 | */ |
||||
417 | public function getMemberSockets($userId, $appId, $channelName): PromiseInterface |
||||
418 | { |
||||
419 | return Helpers::createFulfilledPromise( |
||||
420 | $this->userSockets["{$appId}:{$channelName}:{$userId}"] ?? [] |
||||
421 | ); |
||||
422 | } |
||||
423 | |||||
424 | /** |
||||
425 | * Keep tracking the connections availability when they pong. |
||||
426 | * |
||||
427 | * @param \Ratchet\ConnectionInterface $connection |
||||
428 | * @return PromiseInterface[bool] |
||||
0 ignored issues
–
show
|
|||||
429 | */ |
||||
430 | public function connectionPonged(ConnectionInterface $connection): PromiseInterface |
||||
431 | { |
||||
432 | return $this->pongConnectionInChannels($connection); |
||||
433 | } |
||||
434 | |||||
435 | /** |
||||
436 | * Remove the obsolete connections that didn't ponged in a while. |
||||
437 | * |
||||
438 | * @return PromiseInterface[bool] |
||||
0 ignored issues
–
show
|
|||||
439 | */ |
||||
440 | public function removeObsoleteConnections(): PromiseInterface |
||||
441 | { |
||||
442 | $lock = $this->lock(); |
||||
443 | try { |
||||
444 | if (! $lock->acquire()) { |
||||
445 | return Helpers::createFulfilledPromise(false); |
||||
446 | } |
||||
447 | |||||
448 | $this->getLocalConnections()->then(function ($connections) { |
||||
449 | foreach ($connections as $connection) { |
||||
450 | $differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now()); |
||||
451 | |||||
452 | if ($differenceInSeconds > 120) { |
||||
453 | $this->unsubscribeFromAllChannels($connection); |
||||
454 | } |
||||
455 | } |
||||
456 | }); |
||||
457 | |||||
458 | return Helpers::createFulfilledPromise(true); |
||||
459 | } finally { |
||||
460 | optional($lock)->forceRelease(); |
||||
461 | } |
||||
462 | } |
||||
463 | |||||
464 | /** |
||||
465 | * Pong connection in channels. |
||||
466 | * |
||||
467 | * @param ConnectionInterface $connection |
||||
468 | * @return PromiseInterface[bool] |
||||
0 ignored issues
–
show
|
|||||
469 | */ |
||||
470 | public function pongConnectionInChannels(ConnectionInterface $connection): PromiseInterface |
||||
471 | { |
||||
472 | return $this->getLocalChannels($connection->app->id) |
||||
0 ignored issues
–
show
|
|||||
473 | ->then(function ($channels) use ($connection) { |
||||
474 | foreach ($channels as $channel) { |
||||
475 | if ($conn = $channel->getConnection($connection->socketId)) { |
||||
0 ignored issues
–
show
|
|||||
476 | $conn->lastPongedAt = Carbon::now(); |
||||
477 | $channel->saveConnection($conn); |
||||
478 | } |
||||
479 | } |
||||
480 | |||||
481 | return true; |
||||
482 | }); |
||||
483 | } |
||||
484 | |||||
485 | /** |
||||
486 | * Update the connection in all channels. |
||||
487 | * |
||||
488 | * @param ConnectionInterface $connection |
||||
489 | * @return PromiseInterface[bool] |
||||
0 ignored issues
–
show
|
|||||
490 | */ |
||||
491 | public function updateConnectionInChannels($connection): PromiseInterface |
||||
492 | { |
||||
493 | return $this->getLocalChannels($connection->app->id) |
||||
0 ignored issues
–
show
|
|||||
494 | ->then(function ($channels) use ($connection) { |
||||
495 | foreach ($channels as $channel) { |
||||
496 | if ($channel->hasConnection($connection)) { |
||||
497 | $channel->saveConnection($connection); |
||||
498 | } |
||||
499 | } |
||||
500 | |||||
501 | return true; |
||||
502 | }); |
||||
503 | } |
||||
504 | |||||
505 | /** |
||||
506 | * Mark the current instance as unable to accept new connections. |
||||
507 | * |
||||
508 | * @return $this |
||||
509 | */ |
||||
510 | public function declineNewConnections() |
||||
511 | { |
||||
512 | $this->acceptsNewConnections = false; |
||||
513 | |||||
514 | return $this; |
||||
515 | } |
||||
516 | |||||
517 | /** |
||||
518 | * Check if the current server instance |
||||
519 | * accepts new connections. |
||||
520 | * |
||||
521 | * @return bool |
||||
522 | */ |
||||
523 | public function acceptsNewConnections(): bool |
||||
524 | { |
||||
525 | return $this->acceptsNewConnections; |
||||
526 | } |
||||
527 | |||||
528 | /** |
||||
529 | * Get the channel class by the channel name. |
||||
530 | * |
||||
531 | * @param string $channelName |
||||
532 | * @return string |
||||
533 | */ |
||||
534 | protected function getChannelClassName(string $channelName): string |
||||
535 | { |
||||
536 | if (Str::startsWith($channelName, 'private-')) { |
||||
537 | return PrivateChannel::class; |
||||
538 | } |
||||
539 | |||||
540 | if (Str::startsWith($channelName, 'presence-')) { |
||||
541 | return PresenceChannel::class; |
||||
542 | } |
||||
543 | |||||
544 | return Channel::class; |
||||
545 | } |
||||
546 | |||||
547 | /** |
||||
548 | * Get the unique identifier for the server. |
||||
549 | * |
||||
550 | * @return string |
||||
551 | */ |
||||
552 | public function getServerId(): string |
||||
553 | { |
||||
554 | return $this->serverId; |
||||
555 | } |
||||
556 | |||||
557 | /** |
||||
558 | * Get a new ArrayLock instance to avoid race conditions. |
||||
559 | * |
||||
560 | * @return \Illuminate\Cache\CacheLock |
||||
561 | */ |
||||
562 | protected function lock() |
||||
563 | { |
||||
564 | return new ArrayLock($this->store, static::$lockName, 0); |
||||
565 | } |
||||
566 | } |
||||
567 |