RedisChannelManager   F
last analyzed

Complexity

Total Complexity 67

Size/Duplication

Total Lines 849
Duplicated Lines 0 %

Importance

Changes 13
Bugs 0 Features 0
Metric Value
eloc 207
c 13
b 0
f 0
dl 0
loc 849
rs 3.04
wmc 67

47 Methods

Rating   Name   Duplication   Size   Complexity  
A addUserSocket() 0 4 1
A getChannelsRedisHash() 0 3 1
A getChannelMembers() 0 8 1
A addChannelToSet() 0 4 1
A getSubscribeClient() 0 3 1
A unsubscribeFromChannel() 0 13 2
A unsubscribeFromTopic() 0 10 1
A addConnectionToSet() 0 7 2
A getStatsRedisHash() 0 3 1
A getRedisKey() 0 17 3
A getUserSocketsRedisHash() 0 3 1
A isChannelInSet() 0 4 1
A unsubscribeFromApp() 0 5 1
A getRedisTopicName() 0 3 1
A removeChannelFromSet() 0 4 1
A removeObsoleteConnections() 0 18 2
A getChannelsMembersCount() 0 13 2
A __construct() 0 20 2
A fakeConnectionForApp() 0 3 1
A incrementSubscriptionsCount() 0 4 1
A decrementSubscriptionsCount() 0 3 1
A getChannelMember() 0 4 1
A getConnectionUri() 0 21 5
A getSocketsRedisHash() 0 3 1
A userLeftPresenceChannel() 0 8 1
A getGlobalConnectionsCount() 0 6 2
A removeConnectionFromSet() 0 5 1
A removeUserData() 0 4 1
A removeUserSocket() 0 4 1
A subscribeToApp() 0 5 1
A lock() 0 3 1
A getConnectionsFromSet() 0 15 2
A find() 0 8 2
A storeUserData() 0 4 1
A getMemberSockets() 0 4 1
A unsubscribeFromAllChannels() 0 10 2
A getPublishClient() 0 3 1
A broadcastAcrossServers() 0 10 2
A getRedisClient() 0 3 1
A subscribeToChannel() 0 14 1
A onMessage() 0 37 5
A getUsersRedisHash() 0 3 1
A getGlobalChannels() 0 4 1
A userJoinedPresenceChannel() 0 8 1
A connectionPonged() 0 16 1
A subscribeToTopic() 0 10 1
A getPongRedisHash() 0 3 1

How to fix   Complexity   

Complex Class

Complex classes like RedisChannelManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use RedisChannelManager, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace BeyondCode\LaravelWebSockets\ChannelManagers;
4
5
use BeyondCode\LaravelWebSockets\DashboardLogger;
6
use BeyondCode\LaravelWebSockets\Helpers;
7
use BeyondCode\LaravelWebSockets\Server\MockableConnection;
8
use Carbon\Carbon;
9
use Clue\React\Redis\Client;
10
use Clue\React\Redis\Factory;
11
use Illuminate\Cache\RedisLock;
12
use Illuminate\Support\Facades\Redis;
13
use Illuminate\Support\Str;
14
use Ratchet\ConnectionInterface;
15
use React\EventLoop\LoopInterface;
16
use React\Promise\PromiseInterface;
17
use stdClass;
18
19
class RedisChannelManager extends LocalChannelManager
20
{
21
    /**
22
     * The running loop.
23
     *
24
     * @var LoopInterface
25
     */
26
    protected $loop;
27
28
    /**
29
     * The pub client.
30
     *
31
     * @var Client
32
     */
33
    protected $publishClient;
34
35
    /**
36
     * The sub client.
37
     *
38
     * @var Client
39
     */
40
    protected $subscribeClient;
41
42
    /**
43
     * The Redis manager instance.
44
     *
45
     * @var \Illuminate\Redis\RedisManager
46
     */
47
    protected $redis;
48
49
    /**
50
     * Create a new channel manager instance.
51
     *
52
     * @param  LoopInterface  $loop
53
     * @param  string|null  $factoryClass
54
     * @return void
55
     */
56
    public function __construct(LoopInterface $loop, $factoryClass = null)
57
    {
58
        parent::__construct($loop, $factoryClass);
59
60
        $this->loop = $loop;
61
62
        $this->redis = Redis::connection(
0 ignored issues
show
Documentation Bug introduced by
It seems like Illuminate\Support\Facad...onnection', 'default')) of type Illuminate\Redis\Connections\Connection is incompatible with the declared type Illuminate\Redis\RedisManager of property $redis.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
63
            config('websockets.replication.modes.redis.connection', 'default')
64
        );
65
66
        $connectionUri = $this->getConnectionUri();
67
68
        $factoryClass = $factoryClass ?: Factory::class;
69
        $factory = new $factoryClass($this->loop);
70
71
        $this->publishClient = $factory->createLazyClient($connectionUri);
72
        $this->subscribeClient = $factory->createLazyClient($connectionUri);
73
74
        $this->subscribeClient->on('message', function ($channel, $payload) {
75
            $this->onMessage($channel, $payload);
76
        });
77
    }
78
79
    /**
80
     * Get all channels for a specific app
81
     * across multiple servers.
82
     *
83
     * @param  string|int  $appId
84
     * @return \React\Promise\PromiseInterface[array]
0 ignored issues
show
Documentation Bug introduced by
The doc comment \React\Promise\PromiseInterface[array] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
85
     */
86
    public function getGlobalChannels($appId): PromiseInterface
87
    {
88
        return $this->publishClient->smembers(
89
            $this->getChannelsRedisHash($appId)
90
        );
91
    }
92
93
    /**
94
     * Remove connection from all channels.
95
     *
96
     * @param  \Ratchet\ConnectionInterface  $connection
97
     * @return PromiseInterface[bool]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[bool] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
98
     */
99
    public function unsubscribeFromAllChannels(ConnectionInterface $connection): PromiseInterface
100
    {
101
        return $this->getGlobalChannels($connection->app->id)
0 ignored issues
show
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
102
            ->then(function ($channels) use ($connection) {
103
                foreach ($channels as $channel) {
104
                    $this->unsubscribeFromChannel($connection, $channel, new stdClass);
105
                }
106
            })
107
            ->then(function () use ($connection) {
108
                return parent::unsubscribeFromAllChannels($connection);
109
            });
110
    }
111
112
    /**
113
     * Subscribe the connection to a specific channel.
114
     *
115
     * @param  \Ratchet\ConnectionInterface  $connection
116
     * @param  string  $channelName
117
     * @param  stdClass  $payload
118
     * @return PromiseInterface[bool]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[bool] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
119
     */
120
    public function subscribeToChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
121
    {
122
        return $this->subscribeToTopic($connection->app->id, $channelName)
0 ignored issues
show
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
123
            ->then(function () use ($connection) {
124
                return $this->addConnectionToSet($connection, Carbon::now());
125
            })
126
            ->then(function () use ($connection, $channelName) {
127
                return $this->addChannelToSet($connection->app->id, $channelName);
0 ignored issues
show
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
128
            })
129
            ->then(function () use ($connection, $channelName) {
130
                return $this->incrementSubscriptionsCount($connection->app->id, $channelName, 1);
0 ignored issues
show
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
131
            })
132
            ->then(function () use ($connection, $channelName, $payload) {
133
                return parent::subscribeToChannel($connection, $channelName, $payload);
134
            });
135
    }
136
137
    /**
138
     * Unsubscribe the connection from the channel.
139
     *
140
     * @param  \Ratchet\ConnectionInterface  $connection
141
     * @param  string  $channelName
142
     * @param  stdClass  $payload
143
     * @return PromiseInterface[bool]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[bool] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
144
     */
145
    public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
146
    {
147
        return parent::unsubscribeFromChannel($connection, $channelName, $payload)
148
            ->then(function () use ($connection, $channelName) {
149
                return $this->decrementSubscriptionsCount($connection->app->id, $channelName);
0 ignored issues
show
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
150
            })
151
            ->then(function ($count) use ($connection, $channelName) {
152
                $this->removeConnectionFromSet($connection);
153
                // If the total connections count gets to 0 after unsubscribe,
154
                // try again to check & unsubscribe from the PubSub topic if needed.
155
                if ($count < 1) {
156
                    $this->removeChannelFromSet($connection->app->id, $channelName);
0 ignored issues
show
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
157
                    $this->unsubscribeFromTopic($connection->app->id, $channelName);
158
                }
159
            });
160
    }
161
162
    /**
163
     * Subscribe the connection to a specific channel, returning
164
     * a promise containing the amount of connections.
165
     *
166
     * @param  string|int  $appId
167
     * @return PromiseInterface[int]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[int] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
168
     */
169
    public function subscribeToApp($appId): PromiseInterface
170
    {
171
        return $this->subscribeToTopic($appId)
172
            ->then(function () use ($appId) {
173
                return $this->incrementSubscriptionsCount($appId);
174
            });
175
    }
176
177
    /**
178
     * Unsubscribe the connection from the channel, returning
179
     * a promise containing the amount of connections after decrement.
180
     *
181
     * @param  string|int  $appId
182
     * @return PromiseInterface[int]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[int] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
183
     */
184
    public function unsubscribeFromApp($appId): PromiseInterface
185
    {
186
        return $this->unsubscribeFromTopic($appId)
187
            ->then(function () use ($appId) {
188
                return $this->decrementSubscriptionsCount($appId);
189
            });
190
    }
191
192
    /**
193
     * Get the connections count
194
     * across multiple servers.
195
     *
196
     * @param  string|int  $appId
197
     * @param  string|null  $channelName
198
     * @return PromiseInterface[int]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[int] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
199
     */
200
    public function getGlobalConnectionsCount($appId, string $channelName = null): PromiseInterface
201
    {
202
        return $this->publishClient
203
            ->hget($this->getStatsRedisHash($appId, $channelName), 'connections')
204
            ->then(function ($count) {
205
                return is_null($count) ? 0 : (int) $count;
206
            });
207
    }
208
209
    /**
210
     * Broadcast the message across multiple servers.
211
     *
212
     * @param  string|int  $appId
213
     * @param  string|null  $socketId
214
     * @param  string  $channel
215
     * @param  stdClass  $payload
216
     * @param  string|null  $serverId
217
     * @return PromiseInterface[bool]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[bool] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
218
     */
219
    public function broadcastAcrossServers($appId, ?string $socketId, string $channel, stdClass $payload, string $serverId = null): PromiseInterface
220
    {
221
        $payload->appId = $appId;
222
        $payload->socketId = $socketId;
223
        $payload->serverId = $serverId ?: $this->getServerId();
224
225
        return $this->publishClient
226
            ->publish($this->getRedisTopicName($appId, $channel), json_encode($payload))
227
            ->then(function () use ($appId, $socketId, $channel, $payload, $serverId) {
228
                return parent::broadcastAcrossServers($appId, $socketId, $channel, $payload, $serverId);
229
            });
230
    }
231
232
    /**
233
     * Handle the user when it joined a presence channel.
234
     *
235
     * @param  \Ratchet\ConnectionInterface  $connection
236
     * @param  stdClass  $user
237
     * @param  string  $channel
238
     * @param  stdClass  $payload
239
     * @return PromiseInterface
240
     */
241
    public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload): PromiseInterface
242
    {
243
        return $this->storeUserData($connection->app->id, $channel, $connection->socketId, json_encode($user))
0 ignored issues
show
Bug introduced by
Accessing socketId on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
244
            ->then(function () use ($connection, $channel, $user) {
245
                return $this->addUserSocket($connection->app->id, $channel, $user, $connection->socketId);
0 ignored issues
show
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
Bug introduced by
Accessing socketId on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
246
            })
247
            ->then(function () use ($connection, $user, $channel, $payload) {
248
                return parent::userJoinedPresenceChannel($connection, $user, $channel, $payload);
249
            });
250
    }
251
252
    /**
253
     * Handle the user when it left a presence channel.
254
     *
255
     * @param  \Ratchet\ConnectionInterface  $connection
256
     * @param  stdClass  $user
257
     * @param  string  $channel
258
     * @param  stdClass  $payload
259
     * @return PromiseInterface[bool]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[bool] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
260
     */
261
    public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel): PromiseInterface
262
    {
263
        return $this->removeUserData($connection->app->id, $channel, $connection->socketId)
0 ignored issues
show
Bug introduced by
Accessing socketId on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
264
            ->then(function () use ($connection, $channel, $user) {
265
                return $this->removeUserSocket($connection->app->id, $channel, $user, $connection->socketId);
0 ignored issues
show
Bug introduced by
Accessing socketId on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
266
            })
267
            ->then(function () use ($connection, $user, $channel) {
268
                return parent::userLeftPresenceChannel($connection, $user, $channel);
269
            });
270
    }
271
272
    /**
273
     * Get the presence channel members.
274
     *
275
     * @param  string|int  $appId
276
     * @param  string  $channel
277
     * @return \React\Promise\PromiseInterface[array]
0 ignored issues
show
Documentation Bug introduced by
The doc comment \React\Promise\PromiseInterface[array] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
278
     */
279
    public function getChannelMembers($appId, string $channel): PromiseInterface
280
    {
281
        return $this->publishClient
282
            ->hgetall($this->getUsersRedisHash($appId, $channel))
283
            ->then(function ($list) {
284
                return collect(Helpers::redisListToArray($list))->map(function ($user) {
285
                    return json_decode($user);
286
                })->unique('user_id')->toArray();
287
            });
288
    }
289
290
    /**
291
     * Get a member from a presence channel based on connection.
292
     *
293
     * @param  \Ratchet\ConnectionInterface  $connection
294
     * @param  string  $channel
295
     * @return \React\Promise\PromiseInterface[null|array]
0 ignored issues
show
Documentation Bug introduced by
The doc comment \React\Promise\PromiseInterface[null|array] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
296
     */
297
    public function getChannelMember(ConnectionInterface $connection, string $channel): PromiseInterface
298
    {
299
        return $this->publishClient->hget(
300
            $this->getUsersRedisHash($connection->app->id, $channel), $connection->socketId
0 ignored issues
show
Bug introduced by
Accessing socketId on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
301
        );
302
    }
303
304
    /**
305
     * Get the presence channels total members count.
306
     *
307
     * @param  string|int  $appId
308
     * @param  array  $channelNames
309
     * @return \React\Promise\PromiseInterface[array]
0 ignored issues
show
Documentation Bug introduced by
The doc comment \React\Promise\PromiseInterface[array] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
310
     */
311
    public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface
312
    {
313
        $this->publishClient->multi();
314
315
        foreach ($channelNames as $channel) {
316
            $this->publishClient->hlen(
317
                $this->getUsersRedisHash($appId, $channel)
318
            );
319
        }
320
321
        return $this->publishClient->exec()
322
            ->then(function ($data) use ($channelNames) {
323
                return array_combine($channelNames, $data);
324
            });
325
    }
326
327
    /**
328
     * Get the socket IDs for a presence channel member.
329
     *
330
     * @param  string|int  $userId
331
     * @param  string|int  $appId
332
     * @param  string  $channelName
333
     * @return \React\Promise\PromiseInterface[array]
0 ignored issues
show
Documentation Bug introduced by
The doc comment \React\Promise\PromiseInterface[array] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
334
     */
335
    public function getMemberSockets($userId, $appId, $channelName): PromiseInterface
336
    {
337
        return $this->publishClient->smembers(
338
            $this->getUserSocketsRedisHash($appId, $channelName, $userId)
339
        );
340
    }
341
342
    /**
343
     * Keep tracking the connections availability when they pong.
344
     *
345
     * @param  \Ratchet\ConnectionInterface  $connection
346
     * @return PromiseInterface[bool]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[bool] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
347
     */
348
    public function connectionPonged(ConnectionInterface $connection): PromiseInterface
349
    {
350
        // This will update the score with the current timestamp.
351
        return $this->addConnectionToSet($connection, Carbon::now())
352
            ->then(function () use ($connection) {
353
                $payload = [
354
                    'socketId' => $connection->socketId,
0 ignored issues
show
Bug introduced by
Accessing socketId on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
355
                    'appId' => $connection->app->id,
0 ignored issues
show
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
356
                    'serverId' => $this->getServerId(),
357
                ];
358
359
                return $this->publishClient
360
                    ->publish($this->getPongRedisHash($connection->app->id), json_encode($payload));
361
            })
362
            ->then(function () use ($connection) {
363
                return parent::connectionPonged($connection);
364
            });
365
    }
366
367
    /**
368
     * Remove the obsolete connections that didn't ponged in a while.
369
     *
370
     * @return PromiseInterface[bool]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[bool] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
371
     */
372
    public function removeObsoleteConnections(): PromiseInterface
373
    {
374
        $lock = $this->lock();
375
        try {
376
            $lock->get(function () {
377
                $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U'))
0 ignored issues
show
Bug introduced by
now()->subMinutes(2)->format('U') of type string is incompatible with the type integer expected by parameter $stop of BeyondCode\LaravelWebSoc...getConnectionsFromSet(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

377
                $this->getConnectionsFromSet(0, /** @scrutinizer ignore-type */ now()->subMinutes(2)->format('U'))
Loading history...
378
                    ->then(function ($connections) {
379
                        foreach ($connections as $socketId => $appId) {
380
                            $connection = $this->fakeConnectionForApp($appId, $socketId);
381
382
                            $this->unsubscribeFromAllChannels($connection);
383
                        }
384
                    });
385
            });
386
387
            return parent::removeObsoleteConnections();
388
        } finally {
389
            optional($lock)->forceRelease();
390
        }
391
    }
392
393
    /**
394
     * Handle a message received from Redis on a specific channel.
395
     *
396
     * @param  string  $redisChannel
397
     * @param  string  $payload
398
     * @return void
399
     */
400
    public function onMessage(string $redisChannel, string $payload)
401
    {
402
        $payload = json_decode($payload);
403
404
        if (isset($payload->serverId) && $this->getServerId() === $payload->serverId) {
405
            return;
406
        }
407
408
        if ($redisChannel == $this->getPongRedisHash($payload->appId)) {
409
            $connection = $this->fakeConnectionForApp($payload->appId, $payload->socketId);
410
411
            return parent::connectionPonged($connection);
412
        }
413
414
        $payload->channel = Str::after($redisChannel, "{$payload->appId}:");
415
416
        if (! $channel = $this->find($payload->appId, $payload->channel)) {
417
            return;
418
        }
419
420
        $appId = $payload->appId ?? null;
421
        $socketId = $payload->socketId ?? null;
422
        $serverId = $payload->serverId ?? null;
423
424
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_RECEIVED, [
425
            'fromServerId' => $serverId,
426
            'fromSocketId' => $socketId,
427
            'receiverServerId' => $this->getServerId(),
428
            'channel' => $channel,
429
            'payload' => $payload,
430
        ]);
431
432
        unset($payload->socketId);
433
        unset($payload->serverId);
434
        unset($payload->appId);
435
436
        $channel->broadcastLocallyToEveryoneExcept($payload, $socketId, $appId);
437
    }
438
439
    public function find($appId, string $channel)
440
    {
441
        if (! $channelInstance = parent::find($appId, $channel)) {
0 ignored issues
show
Unused Code introduced by
The assignment to $channelInstance is dead and can be removed.
Loading history...
442
            $class = $this->getChannelClassName($channel);
443
            $this->channels[$appId][$channel] = new $class($channel);
444
        }
445
446
        return parent::find($appId, $channel);
447
    }
448
449
    /**
450
     * Build the Redis connection URL from Laravel database config.
451
     *
452
     * @return string
453
     */
454
    protected function getConnectionUri()
455
    {
456
        $name = config('websockets.replication.modes.redis.connection', 'default');
457
        $config = config("database.redis.{$name}");
458
459
        $host = $config['host'];
460
        $port = $config['port'] ?: 6379;
461
462
        $query = [];
463
464
        if ($config['password']) {
465
            $query['password'] = $config['password'];
466
        }
467
468
        if ($config['database']) {
469
            $query['db'] = $config['database'];
470
        }
471
472
        $query = http_build_query($query);
473
474
        return "redis://{$host}:{$port}".($query ? "?{$query}" : '');
475
    }
476
477
    /**
478
     * Get the Subscribe client instance.
479
     *
480
     * @return Client
481
     */
482
    public function getSubscribeClient()
483
    {
484
        return $this->subscribeClient;
485
    }
486
487
    /**
488
     * Get the Publish client instance.
489
     *
490
     * @return Client
491
     */
492
    public function getPublishClient()
493
    {
494
        return $this->publishClient;
495
    }
496
497
    /**
498
     * Get the Redis client used by other classes.
499
     *
500
     * @return Client
501
     */
502
    public function getRedisClient()
503
    {
504
        return $this->getPublishClient();
505
    }
506
507
    /**
508
     * Increment the subscribed count number.
509
     *
510
     * @param  string|int  $appId
511
     * @param  string|null  $channel
512
     * @param  int  $increment
513
     * @return PromiseInterface[int]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[int] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
514
     */
515
    public function incrementSubscriptionsCount($appId, string $channel = null, int $increment = 1): PromiseInterface
516
    {
517
        return $this->publishClient->hincrby(
518
            $this->getStatsRedisHash($appId, $channel), 'connections', $increment
519
        );
520
    }
521
522
    /**
523
     * Decrement the subscribed count number.
524
     *
525
     * @param  string|int  $appId
526
     * @param  string|null  $channel
527
     * @param  int  $decrement
528
     * @return PromiseInterface[int]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[int] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
529
     */
530
    public function decrementSubscriptionsCount($appId, string $channel = null, int $increment = 1): PromiseInterface
531
    {
532
        return $this->incrementSubscriptionsCount($appId, $channel, $increment * -1);
533
    }
534
535
    /**
536
     * Add the connection to the sorted list.
537
     *
538
     * @param  \Ratchet\ConnectionInterface  $connection
539
     * @param  \DateTime|string|null  $moment
540
     * @return PromiseInterface
541
     */
542
    public function addConnectionToSet(ConnectionInterface $connection, $moment = null): PromiseInterface
543
    {
544
        $moment = $moment ? Carbon::parse($moment) : Carbon::now();
545
546
        return $this->publishClient->zadd(
547
            $this->getSocketsRedisHash(),
548
            $moment->format('U'), "{$connection->app->id}:{$connection->socketId}"
0 ignored issues
show
Bug introduced by
Accessing socketId on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
549
        );
550
    }
551
552
    /**
553
     * Remove the connection from the sorted list.
554
     *
555
     * @param  \Ratchet\ConnectionInterface  $connection
556
     * @return PromiseInterface
557
     */
558
    public function removeConnectionFromSet(ConnectionInterface $connection): PromiseInterface
559
    {
560
        return $this->publishClient->zrem(
561
            $this->getSocketsRedisHash(),
562
            "{$connection->app->id}:{$connection->socketId}"
0 ignored issues
show
Bug introduced by
Accessing app on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
Bug introduced by
Accessing socketId on the interface Ratchet\ConnectionInterface suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
563
        );
564
    }
565
566
    /**
567
     * Get the connections from the sorted list, with last
568
     * connection between certain timestamps.
569
     *
570
     * @param  int  $start
571
     * @param  int  $stop
572
     * @param  bool  $strict
573
     * @return PromiseInterface[array]
0 ignored issues
show
Documentation Bug introduced by
The doc comment PromiseInterface[array] at position 1 could not be parsed: Expected ']' at position 1, but found '['.
Loading history...
574
     */
575
    public function getConnectionsFromSet(int $start = 0, int $stop = 0, bool $strict = true): PromiseInterface
576
    {
577
        if ($strict) {
578
            $start = "({$start}";
579
            $stop = "({$stop}";
580
        }
581
582
        return $this->publishClient
583
            ->zrangebyscore($this->getSocketsRedisHash(), $start, $stop)
584
            ->then(function ($list) {
585
                return collect($list)->mapWithKeys(function ($appWithSocket) {
586
                    [$appId, $socketId] = explode(':', $appWithSocket);
587
588
                    return [$socketId => $appId];
589
                })->toArray();
590
            });
591
    }
592
593
    /**
594
     * Add a channel to the set list.
595
     *
596
     * @param  string|int  $appId
597
     * @param  string  $channel
598
     * @return PromiseInterface
599
     */
600
    public function addChannelToSet($appId, string $channel): PromiseInterface
601
    {
602
        return $this->publishClient->sadd(
603
            $this->getChannelsRedisHash($appId), $channel
604
        );
605
    }
606
607
    /**
608
     * Remove a channel from the set list.
609
     *
610
     * @param  string|int  $appId
611
     * @param  string  $channel
612
     * @return PromiseInterface
613
     */
614
    public function removeChannelFromSet($appId, string $channel): PromiseInterface
615
    {
616
        return $this->publishClient->srem(
617
            $this->getChannelsRedisHash($appId), $channel
618
        );
619
    }
620
621
    /**
622
     * Check if channel is on the list.
623
     *
624
     * @param  string|int  $appId
625
     * @param  string  $channel
626
     * @return PromiseInterface
627
     */
628
    public function isChannelInSet($appId, string $channel): PromiseInterface
629
    {
630
        return $this->publishClient->sismember(
631
            $this->getChannelsRedisHash($appId), $channel
632
        );
633
    }
634
635
    /**
636
     * Set data for a topic. Might be used for the presence channels.
637
     *
638
     * @param  string|int  $appId
639
     * @param  string|null  $channel
640
     * @param  string  $key
641
     * @param  string  $data
642
     * @return PromiseInterface
643
     */
644
    public function storeUserData($appId, string $channel = null, string $key, $data): PromiseInterface
645
    {
646
        return $this->publishClient->hset(
647
            $this->getUsersRedisHash($appId, $channel), $key, $data
648
        );
649
    }
650
651
    /**
652
     * Remove data for a topic. Might be used for the presence channels.
653
     *
654
     * @param  string|int  $appId
655
     * @param  string|null  $channel
656
     * @param  string  $key
657
     * @return PromiseInterface
658
     */
659
    public function removeUserData($appId, string $channel = null, string $key): PromiseInterface
660
    {
661
        return $this->publishClient->hdel(
662
            $this->getUsersRedisHash($appId, $channel), $key
663
        );
664
    }
665
666
    /**
667
     * Subscribe to the topic for the app, or app and channel.
668
     *
669
     * @param  string|int  $appId
670
     * @param  string|null  $channel
671
     * @return PromiseInterface
672
     */
673
    public function subscribeToTopic($appId, string $channel = null): PromiseInterface
674
    {
675
        $topic = $this->getRedisTopicName($appId, $channel);
676
677
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [
678
            'serverId' => $this->getServerId(),
679
            'pubsubTopic' => $topic,
680
        ]);
681
682
        return $this->subscribeClient->subscribe($topic);
683
    }
684
685
    /**
686
     * Unsubscribe from the topic for the app, or app and channel.
687
     *
688
     * @param  string|int  $appId
689
     * @param  string|null  $channel
690
     * @return PromiseInterface
691
     */
692
    public function unsubscribeFromTopic($appId, string $channel = null): PromiseInterface
693
    {
694
        $topic = $this->getRedisTopicName($appId, $channel);
695
696
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [
697
            'serverId' => $this->getServerId(),
698
            'pubsubTopic' => $topic,
699
        ]);
700
701
        return $this->subscribeClient->unsubscribe($topic);
702
    }
703
704
    /**
705
     * Add the Presence Channel's User's Socket ID to a list.
706
     *
707
     * @param  string|int  $appId
708
     * @param  string  $channel
709
     * @param  stdClass  $user
710
     * @param  string  $socketId
711
     * @return PromiseInterface
712
     */
713
    protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface
714
    {
715
        return $this->publishClient->sadd(
716
            $this->getUserSocketsRedisHash($appId, $channel, $user->user_id), $socketId
717
        );
718
    }
719
720
    /**
721
     * Remove the Presence Channel's User's Socket ID from the list.
722
     *
723
     * @param  string|int  $appId
724
     * @param  string  $channel
725
     * @param  stdClass  $user
726
     * @param  string  $socketId
727
     * @return PromiseInterface
728
     */
729
    protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId): PromiseInterface
730
    {
731
        return $this->publishClient->srem(
732
            $this->getUserSocketsRedisHash($appId, $channel, $user->user_id), $socketId
733
        );
734
    }
735
736
    /**
737
     * Get the Redis Keyspace name to handle subscriptions
738
     * and other key-value sets.
739
     *
740
     * @param  string|int|null  $appId
741
     * @param  string|null  $channel
742
     * @return string
743
     */
744
    public function getRedisKey($appId = null, string $channel = null, array $suffixes = []): string
745
    {
746
        $prefix = config('database.redis.options.prefix', null);
747
748
        $hash = "{$prefix}{$appId}";
749
750
        if ($channel) {
751
            $suffixes = array_merge([$channel], $suffixes);
752
        }
753
754
        $suffixes = implode(':', $suffixes);
755
756
        if ($suffixes) {
757
            $hash .= ":{$suffixes}";
758
        }
759
760
        return $hash;
761
    }
762
763
    /**
764
     * Get the pong Redis hash.
765
     *
766
     * @param  string|int  $appId
767
     */
768
    public function getPongRedisHash($appId): string
769
    {
770
        return $this->getRedisKey($appId, null, ['pong']);
771
    }
772
773
    /**
774
     * Get the statistics Redis hash.
775
     *
776
     * @param  string|int  $appId
777
     * @param  string|null  $channel
778
     * @return string
779
     */
780
    public function getStatsRedisHash($appId, string $channel = null): string
781
    {
782
        return $this->getRedisKey($appId, $channel, ['stats']);
783
    }
784
785
    /**
786
     * Get the sockets Redis hash used to store all sockets ids.
787
     *
788
     * @return string
789
     */
790
    public function getSocketsRedisHash(): string
791
    {
792
        return $this->getRedisKey(null, null, ['sockets']);
793
    }
794
795
    /**
796
     * Get the channels Redis hash for a specific app id, used
797
     * to store existing channels.
798
     *
799
     * @param  string|int  $appId
800
     * @return string
801
     */
802
    public function getChannelsRedisHash($appId): string
803
    {
804
        return $this->getRedisKey($appId, null, ['channels']);
805
    }
806
807
    /**
808
     * Get the Redis hash for storing presence channels users.
809
     *
810
     * @param  string|int  $appId
811
     * @param  string|null  $channel
812
     * @return string
813
     */
814
    public function getUsersRedisHash($appId, string $channel = null): string
815
    {
816
        return $this->getRedisKey($appId, $channel, ['users']);
817
    }
818
819
    /**
820
     * Get the Redis hash for storing socket ids
821
     * for a specific presence channels user.
822
     *
823
     * @param  string|int  $appId
824
     * @param  string|null  $channel
825
     * @param  string|int|null  $userId
826
     * @return string
827
     */
828
    public function getUserSocketsRedisHash($appId, string $channel = null, $userId = null): string
829
    {
830
        return $this->getRedisKey($appId, $channel, [$userId, 'userSockets']);
831
    }
832
833
    /**
834
     * Get the Redis topic name for PubSub
835
     * used to transfer info between servers.
836
     *
837
     * @param  string|int  $appId
838
     * @param  string|null  $channel
839
     * @return string
840
     */
841
    public function getRedisTopicName($appId, string $channel = null): string
842
    {
843
        return $this->getRedisKey($appId, $channel);
844
    }
845
846
    /**
847
     * Get a new RedisLock instance to avoid race conditions.
848
     *
849
     * @return \Illuminate\Cache\CacheLock
850
     */
851
    protected function lock()
852
    {
853
        return new RedisLock($this->redis, static::$lockName, 0);
0 ignored issues
show
Bug introduced by
$this->redis of type Illuminate\Redis\RedisManager is incompatible with the type Illuminate\Redis\Connections\Connection expected by parameter $redis of Illuminate\Cache\RedisLock::__construct(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

853
        return new RedisLock(/** @scrutinizer ignore-type */ $this->redis, static::$lockName, 0);
Loading history...
854
    }
855
856
    /**
857
     * Create a fake connection for app that will mimick a connection
858
     * by app ID and Socket ID to be able to be passed to the methods
859
     * that accepts a connection class.
860
     *
861
     * @param  string|int  $appId
862
     * @param  string  $socketId
863
     * @return ConnectionInterface
864
     */
865
    public function fakeConnectionForApp($appId, string $socketId)
866
    {
867
        return new MockableConnection($appId, $socketId);
868
    }
869
}
870