LocalChannelManager::userLeftPresenceChannel()   A
last analyzed

Complexity

Conditions 3
Paths 3

Size

Total Lines 18
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
eloc 9
c 1
b 0
f 0
nc 3
nop 3
dl 0
loc 18
rs 9.9666
1
<?php
2
3
namespace BeyondCode\LaravelWebSockets\ChannelManagers;
4
5
use BeyondCode\LaravelWebSockets\Channels\Channel;
6
use BeyondCode\LaravelWebSockets\Channels\PresenceChannel;
7
use BeyondCode\LaravelWebSockets\Channels\PrivateChannel;
8
use BeyondCode\LaravelWebSockets\Contracts\ChannelManager;
9
use BeyondCode\LaravelWebSockets\Helpers;
10
use Carbon\Carbon;
11
use Illuminate\Cache\ArrayLock;
12
use Illuminate\Cache\ArrayStore;
13
use Illuminate\Support\Str;
14
use Ratchet\ConnectionInterface;
15
use React\EventLoop\LoopInterface;
16
use React\Promise\PromiseInterface;
17
use stdClass;
18
19
class LocalChannelManager implements ChannelManager
20
{
21
    /**
22
     * The list of stored channels.
23
     *
24
     * @var array
25
     */
26
    protected $channels = [];
27
28
    /**
29
     * The list of users that joined the presence channel.
30
     *
31
     * @var array
32
     */
33
    protected $users = [];
34
35
    /**
36
     * The list of users by socket and their attached id.
37
     *
38
     * @var array
39
     */
40
    protected $userSockets = [];
41
42
    /**
43
     * Wether the current instance accepts new connections.
44
     *
45
     * @var bool
46
     */
47
    protected $acceptsNewConnections = true;
48
49
    /**
50
     * The ArrayStore instance of locks.
51
     *
52
     * @var \Illuminate\Cache\ArrayStore
53
     */
54
    protected $store;
55
56
    /**
57
     * The unique server identifier.
58
     *
59
     * @var string
60
     */
61
    protected $serverId;
62
63
    /**
64
     * The lock name to use on Array to avoid multiple
65
     * actions that might lead to multiple processings.
66
     *
67
     * @var string
68
     */
69
    protected static $lockName = 'laravel-websockets:channel-manager:lock';
70
71
    /**
72
     * Create a new channel manager instance.
73
     *
74
     * @param  LoopInterface  $loop
75
     * @param  string|null  $factoryClass
76
     * @return void
77
     */
78
    public function __construct(LoopInterface $loop, $factoryClass = null)
79
    {
80
        $this->store = new ArrayStore;
81
        $this->serverId = Str::uuid()->toString();
82
    }
83
84
    /**
85
     * Find the channel by app & name.
86
     *
87
     * @param  string|int  $appId
88
     * @param  string  $channel
89
     * @return null|BeyondCode\LaravelWebSockets\Channels\Channel
0 ignored issues
show
Bug introduced by
The type BeyondCode\LaravelWebSoc...ockets\Channels\Channel was not found. Did you mean BeyondCode\LaravelWebSockets\Channels\Channel? If so, make sure to prefix the type with \.
Loading history...
90
     */
91
    public function find($appId, string $channel)
92
    {
93
        return $this->channels[$appId][$channel] ?? null;
94
    }
95
96
    /**
97
     * Find a channel by app & name or create one.
98
     *
99
     * @param  string|int  $appId
100
     * @param  string  $channel
101
     * @return BeyondCode\LaravelWebSockets\Channels\Channel
102
     */
103
    public function findOrCreate($appId, string $channel)
104
    {
105
        if (! $channelInstance = $this->find($appId, $channel)) {
0 ignored issues
show
Unused Code introduced by
The assignment to $channelInstance is dead and can be removed.
Loading history...
106
            $class = $this->getChannelClassName($channel);
107
108
            $this->channels[$appId][$channel] = new $class($channel);
109
        }
110
111
        return $this->channels[$appId][$channel];
112
    }
113
114
    /**
115
     * Get the local connections, regardless of the channel
116
     * they are connected to.
117
     *
118
     * @return \React\Promise\PromiseInterface
119
     */
120
    public function getLocalConnections(): PromiseInterface
121
    {
122
        $connections = collect($this->channels)
123
            ->map(function ($channelsWithConnections, $appId) {
0 ignored issues
show
Unused Code introduced by
The parameter $appId is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

123
            ->map(function ($channelsWithConnections, /** @scrutinizer ignore-unused */ $appId) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
124
                return collect($channelsWithConnections)->values();
125
            })
126
            ->values()->collapse()
127
            ->map(function ($channel) {
128
                return collect($channel->getConnections());
129
            })
130
            ->values()->collapse()
131
            ->toArray();
132
133
        return Helpers::createFulfilledPromise($connections);
134
    }
135
136
    /**
137
     * Get all channels for a specific app
138
     * for the current instance.
139
     *
140
     * @param  string|int  $appId
141
     * @return \React\Promise\PromiseInterface[array]
0 ignored issues
show
Documentation Bug introduced by
The doc comment \React\Promise\PromiseInterface[array] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
142
     */
143
    public function getLocalChannels($appId): PromiseInterface
144
    {
145
        return Helpers::createFulfilledPromise(
146
            $this->channels[$appId] ?? []
147
        );
148
    }
149
150
    /**
151
     * Get all channels for a specific app
152
     * across multiple servers.
153
     *
154
     * @param  string|int  $appId
155
     * @return \React\Promise\PromiseInterface[array]
0 ignored issues
show
Documentation Bug introduced by
The doc comment \React\Promise\PromiseInterface[array] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
156
     */
157
    public function getGlobalChannels($appId): PromiseInterface
158
    {
159
        return $this->getLocalChannels($appId);
160
    }
161
162
    /**
163
     * Remove connection from all channels.
164
     *
165
     * @param  \Ratchet\ConnectionInterface  $connection
166
     * @return PromiseInterface[bool]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[bool] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
167
     */
168
    public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface
169
    {
170
        if (! isset($connection->app)) {
0 ignored issues
show
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
171
            return Helpers::createFulfilledPromise(false);
172
        }
173
174
        $this->getLocalChannels($connection->app->id)
175
            ->then(function ($channels) use ($connection) {
176
                collect($channels)
177
                    ->each(function (Channel $channel) use ($connection) {
178
                        $channel->unsubscribe($connection);
179
                    });
180
181
                collect($channels)
182
                    ->reject(function ($channel) {
183
                        return $channel->hasConnections();
184
                    })
185
                    ->each(function (Channel $channel, string $channelName) use ($connection) {
186
                        unset($this->channels[$connection->app->id][$channelName]);
187
                    });
188
            });
189
190
        $this->getLocalChannels($connection->app->id)
191
            ->then(function ($channels) use ($connection) {
192
                if (count($channels) === 0) {
193
                    unset($this->channels[$connection->app->id]);
194
                }
195
            });
196
197
        return Helpers::createFulfilledPromise(true);
198
    }
199
200
    /**
201
     * Subscribe the connection to a specific channel.
202
     *
203
     * @param  \Ratchet\ConnectionInterface  $connection
204
     * @param  string  $channelName
205
     * @param  stdClass  $payload
206
     * @return PromiseInterface[bool]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[bool] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
207
     */
208
    public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
209
    {
210
        $channel = $this->findOrCreate($connection->app->id, $channelName);
0 ignored issues
show
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
211
212
        return Helpers::createFulfilledPromise(
213
            $channel->subscribe($connection, $payload)
214
        );
215
    }
216
217
    /**
218
     * Unsubscribe the connection from the channel.
219
     *
220
     * @param  \Ratchet\ConnectionInterface  $connection
221
     * @param  string  $channelName
222
     * @param  stdClass  $payload
223
     * @return PromiseInterface[bool]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[bool] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
224
     */
225
    public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
226
    {
227
        $channel = $this->findOrCreate($connection->app->id, $channelName);
0 ignored issues
show
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
228
229
        return Helpers::createFulfilledPromise(
230
            $channel->unsubscribe($connection, $payload)
231
        );
232
    }
233
234
    /**
235
     * Subscribe the connection to a specific channel, returning
236
     * a promise containing the amount of connections.
237
     *
238
     * @param  string|int  $appId
239
     * @return PromiseInterface[int]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[int] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
240
     */
241
    public function subscribeToApp($appId): PromiseInterface
242
    {
243
        return Helpers::createFulfilledPromise(0);
244
    }
245
246
    /**
247
     * Unsubscribe the connection from the channel, returning
248
     * a promise containing the amount of connections after decrement.
249
     *
250
     * @param  string|int  $appId
251
     * @return PromiseInterface[int]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[int] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
252
     */
253
    public function unsubscribeFromApp($appId): PromiseInterface
254
    {
255
        return Helpers::createFulfilledPromise(0);
256
    }
257
258
    /**
259
     * Get the connections count on the app
260
     * for the current server instance.
261
     *
262
     * @param  string|int  $appId
263
     * @param  string|null  $channelName
264
     * @return PromiseInterface[int]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[int] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
265
     */
266
    public function getLocalConnectionsCount($appId, string $channelName = null): PromiseInterface
267
    {
268
        return $this->getLocalChannels($appId)
269
            ->then(function ($channels) use ($channelName) {
270
                return collect($channels)->when(! is_null($channelName), function ($collection) use ($channelName) {
271
                    return $collection->filter(function (Channel $channel) use ($channelName) {
272
                        return $channel->getName() === $channelName;
273
                    });
274
                })
275
                    ->flatMap(function (Channel $channel) {
276
                        return collect($channel->getConnections())->pluck('socketId');
277
                    })
278
                    ->unique()->count();
279
            });
280
    }
281
282
    /**
283
     * Get the connections count
284
     * across multiple servers.
285
     *
286
     * @param  string|int  $appId
287
     * @param  string|null  $channelName
288
     * @return PromiseInterface[int]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[int] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
289
     */
290
    public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface
291
    {
292
        return $this->getLocalConnectionsCount($appId, $channelName);
293
    }
294
295
    /**
296
     * Broadcast the message across multiple servers.
297
     *
298
     * @param  string|int  $appId
299
     * @param  string|null  $socketId
300
     * @param  string  $channel
301
     * @param  stdClass  $payload
302
     * @param  string|null  $serverId
303
     * @return PromiseInterface[bool]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[bool] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
304
     */
305
    public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface
306
    {
307
        return Helpers::createFulfilledPromise(true);
308
    }
309
310
    /**
311
     * Handle the user when it joined a presence channel.
312
     *
313
     * @param  \Ratchet\ConnectionInterface  $connection
314
     * @param  stdClass  $user
315
     * @param  string  $channel
316
     * @param  stdClass  $payload
317
     * @return PromiseInterface[bool]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[bool] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
318
     */
319
    public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload): PromiseInterface
320
    {
321
        $this->users["{$connection->app->id}:{$channel}"][$connection->socketId] = json_encode($user);
0 ignored issues
show
Bug introduced by
Accessing socketId on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
322
        $this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][] = $connection->socketId;
323
324
        return Helpers::createFulfilledPromise(true);
325
    }
326
327
    /**
328
     * Handle the user when it left a presence channel.
329
     *
330
     * @param  \Ratchet\ConnectionInterface  $connection
331
     * @param  stdClass  $user
332
     * @param  string  $channel
333
     * @param  stdClass  $payload
334
     * @return PromiseInterface[bool]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[bool] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
335
     */
336
    public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel): PromiseInterface
337
    {
338
        unset($this->users["{$connection->app->id}:{$channel}"][$connection->socketId]);
0 ignored issues
show
Bug introduced by
Accessing socketId on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
339
340
        $deletableSocketKey = array_search(
341
            $connection->socketId,
342
            $this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]
343
        );
344
345
        if ($deletableSocketKey !== false) {
346
            unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][$deletableSocketKey]);
347
348
            if (count($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]) === 0) {
349
                unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]);
350
            }
351
        }
352
353
        return Helpers::createFulfilledPromise(true);
354
    }
355
356
    /**
357
     * Get the presence channel members.
358
     *
359
     * @param  string|int  $appId
360
     * @param  string  $channel
361
     * @return \React\Promise\PromiseInterface
362
     */
363
    public function getChannelMembers($appId, string $channel): PromiseInterface
364
    {
365
        $members = $this->users["{$appId}:{$channel}"] ?? [];
366
367
        $members = collect($members)->map(function ($user) {
368
            return json_decode($user);
369
        })->unique('user_id')->toArray();
370
371
        return Helpers::createFulfilledPromise($members);
372
    }
373
374
    /**
375
     * Get a member from a presence channel based on connection.
376
     *
377
     * @param  \Ratchet\ConnectionInterface  $connection
378
     * @param  string  $channel
379
     * @return \React\Promise\PromiseInterface
380
     */
381
    public function getChannelMember(ConnectionInterface $connection, string $channel): PromiseInterface
382
    {
383
        $member = $this->users["{$connection->app->id}:{$channel}"][$connection->socketId] ?? null;
0 ignored issues
show
Bug introduced by
Accessing socketId on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
384
385
        return Helpers::createFulfilledPromise($member);
386
    }
387
388
    /**
389
     * Get the presence channels total members count.
390
     *
391
     * @param  string|int  $appId
392
     * @param  array  $channelNames
393
     * @return \React\Promise\PromiseInterface
394
     */
395
    public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface
396
    {
397
        $results = collect($channelNames)
398
            ->reduce(function ($results, $channel) use ($appId) {
399
                $results[$channel] = isset($this->users["{$appId}:{$channel}"])
400
                    ? count($this->users["{$appId}:{$channel}"])
401
                    : 0;
402
403
                return $results;
404
            }, []);
405
406
        return Helpers::createFulfilledPromise($results);
407
    }
408
409
    /**
410
     * Get the socket IDs for a presence channel member.
411
     *
412
     * @param  string|int  $userId
413
     * @param  string|int  $appId
414
     * @param  string  $channelName
415
     * @return \React\Promise\PromiseInterface
416
     */
417
    public function getMemberSockets($userId, $appId, $channelName): PromiseInterface
418
    {
419
        return Helpers::createFulfilledPromise(
420
            $this->userSockets["{$appId}:{$channelName}:{$userId}"] ?? []
421
        );
422
    }
423
424
    /**
425
     * Keep tracking the connections availability when they pong.
426
     *
427
     * @param  \Ratchet\ConnectionInterface  $connection
428
     * @return PromiseInterface[bool]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[bool] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
429
     */
430
    public function connectionPonged(ConnectionInterface $connection): PromiseInterface
431
    {
432
        return $this->pongConnectionInChannels($connection);
433
    }
434
435
    /**
436
     * Remove the obsolete connections that didn't ponged in a while.
437
     *
438
     * @return PromiseInterface[bool]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[bool] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
439
     */
440
    public function removeObsoleteConnections(): PromiseInterface
441
    {
442
        $lock = $this->lock();
443
        try {
444
            if (! $lock->acquire()) {
445
                return Helpers::createFulfilledPromise(false);
446
            }
447
448
            $this->getLocalConnections()->then(function ($connections) {
449
                foreach ($connections as $connection) {
450
                    $differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now());
451
452
                    if ($differenceInSeconds > 120) {
453
                        $this->unsubscribeFromAllChannels($connection);
454
                    }
455
                }
456
            });
457
458
            return Helpers::createFulfilledPromise(true);
459
        } finally {
460
            optional($lock)->forceRelease();
461
        }
462
    }
463
464
    /**
465
     * Pong connection in channels.
466
     *
467
     * @param  ConnectionInterface  $connection
468
     * @return PromiseInterface[bool]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[bool] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
469
     */
470
    public function pongConnectionInChannels(ConnectionInterface $connection): PromiseInterface
471
    {
472
        return $this->getLocalChannels($connection->app->id)
0 ignored issues
show
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
473
            ->then(function ($channels) use ($connection) {
474
                foreach ($channels as $channel) {
475
                    if ($conn = $channel->getConnection($connection->socketId)) {
0 ignored issues
show
Bug introduced by
Accessing socketId on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
476
                        $conn->lastPongedAt = Carbon::now();
477
                        $channel->saveConnection($conn);
478
                    }
479
                }
480
481
                return true;
482
            });
483
    }
484
485
    /**
486
     * Update the connection in all channels.
487
     *
488
     * @param  ConnectionInterface  $connection
489
     * @return PromiseInterface[bool]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[bool] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
490
     */
491
    public function updateConnectionInChannels($connection): PromiseInterface
492
    {
493
        return $this->getLocalChannels($connection->app->id)
0 ignored issues
show
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
494
            ->then(function ($channels) use ($connection) {
495
                foreach ($channels as $channel) {
496
                    if ($channel->hasConnection($connection)) {
497
                        $channel->saveConnection($connection);
498
                    }
499
                }
500
501
                return true;
502
            });
503
    }
504
505
    /**
506
     * Mark the current instance as unable to accept new connections.
507
     *
508
     * @return $this
509
     */
510
    public function declineNewConnections()
511
    {
512
        $this->acceptsNewConnections = false;
513
514
        return $this;
515
    }
516
517
    /**
518
     * Check if the current server instance
519
     * accepts new connections.
520
     *
521
     * @return bool
522
     */
523
    public function acceptsNewConnections(): bool
524
    {
525
        return $this->acceptsNewConnections;
526
    }
527
528
    /**
529
     * Get the channel class by the channel name.
530
     *
531
     * @param  string  $channelName
532
     * @return string
533
     */
534
    protected function getChannelClassName(string $channelName): string
535
    {
536
        if (Str::startsWith($channelName, 'private-')) {
537
            return PrivateChannel::class;
538
        }
539
540
        if (Str::startsWith($channelName, 'presence-')) {
541
            return PresenceChannel::class;
542
        }
543
544
        return Channel::class;
545
    }
546
547
    /**
548
     * Get the unique identifier for the server.
549
     *
550
     * @return string
551
     */
552
    public function getServerId(): string
553
    {
554
        return $this->serverId;
555
    }
556
557
    /**
558
     * Get a new ArrayLock instance to avoid race conditions.
559
     *
560
     * @return \Illuminate\Cache\CacheLock
561
     */
562
    protected function lock()
563
    {
564
        return new ArrayLock($this->store, static::$lockName, 0);
565
    }
566
}
567