Completed
Pull Request — master (#447)
by Alexandru
01:33
created

RedisClient::channelMemberCounts()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 14
rs 9.7998
c 0
b 0
f 0
cc 2
nc 2
nop 2
1
<?php
2
3
namespace BeyondCode\LaravelWebSockets\PubSub\Drivers;
4
5
use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger;
6
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
7
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
8
use Clue\React\Redis\Client;
9
use Clue\React\Redis\Factory;
10
use Illuminate\Support\Str;
11
use React\EventLoop\LoopInterface;
12
use React\Promise\PromiseInterface;
13
use stdClass;
14
15
class RedisClient extends LocalClient
16
{
17
    /**
18
     * The running loop.
19
     *
20
     * @var LoopInterface
21
     */
22
    protected $loop;
23
24
    /**
25
     * The unique server identifier.
26
     *
27
     * @var string
28
     */
29
    protected $serverId;
30
31
    /**
32
     * The pub client.
33
     *
34
     * @var Client
35
     */
36
    protected $publishClient;
37
38
    /**
39
     * The sub client.
40
     *
41
     * @var Client
42
     */
43
    protected $subscribeClient;
44
45
    /**
46
     * Mapping of subscribed channels, where the key is the channel name,
47
     * and the value is the amount of connections which are subscribed to
48
     * that channel. Used to keep track of whether we still need to stay
49
     * subscribed to those channels with Redis.
50
     *
51
     * @var int[]
52
     */
53
    protected $subscribedChannels = [];
54
55
    /**
56
     * Create a new Redis client.
57
     *
58
     * @return void
0 ignored issues
show
Comprehensibility Best Practice introduced by
Adding a @return annotation to constructors is generally not recommended as a constructor does not have a meaningful return value.

Adding a @return annotation to a constructor is not recommended, since a constructor does not have a meaningful return value.

Please refer to the PHP core documentation on constructors.

Loading history...
59
     */
60
    public function __construct()
61
    {
62
        $this->serverId = Str::uuid()->toString();
63
    }
64
65
    /**
66
     * Boot the RedisClient, initializing the connections.
67
     *
68
     * @param  LoopInterface  $loop
69
     * @param  string|null  $factoryClass
70
     * @return ReplicationInterface
71
     */
72
    public function boot(LoopInterface $loop, $factoryClass = null): ReplicationInterface
73
    {
74
        $factoryClass = $factoryClass ?: Factory::class;
75
76
        $this->loop = $loop;
77
78
        $connectionUri = $this->getConnectionUri();
79
        $factory = new $factoryClass($this->loop);
80
81
        $this->publishClient = $factory->createLazyClient($connectionUri);
82
        $this->subscribeClient = $factory->createLazyClient($connectionUri);
83
84
        // The subscribed client gets a message, it triggers the onMessage().
85
        $this->subscribeClient->on('message', function ($channel, $payload) {
86
            $this->onMessage($channel, $payload);
87
        });
88
89
        return $this;
90
    }
91
92
    /**
93
     * Publish a message to a channel on behalf of a websocket user.
94
     *
95
     * @param  string  $appId
96
     * @param  string  $channel
97
     * @param  stdClass  $payload
98
     * @return bool
99
     */
100
    public function publish($appId, string $channel, stdClass $payload): bool
101
    {
102
        $payload->appId = $appId;
103
        $payload->serverId = $this->getServerId();
104
105
        $payload = json_encode($payload);
106
107
        $this->publishClient->publish($this->getTopicName($appId, $channel), $payload);
108
109
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_PUBLISHED, [
110
            'channel' => $channel,
111
            'serverId' => $this->getServerId(),
112
            'payload' => $payload,
113
            'pubsub' => $this->getTopicName($appId, $channel),
114
        ]);
115
116
        return true;
117
    }
118
119
    /**
120
     * Subscribe to a channel on behalf of websocket user.
121
     *
122
     * @param  string  $appId
123
     * @param  string  $channel
124
     * @return bool
125
     */
126
    public function subscribe($appId, string $channel): bool
127
    {
128
        if (! isset($this->subscribedChannels["{$appId}:{$channel}"])) {
129
            // We're not subscribed to the channel yet, subscribe and set the count to 1
130
            $this->subscribeClient->subscribe($this->getTopicName($appId, $channel));
131
            $this->subscribedChannels["{$appId}:{$channel}"] = 1;
132
        } else {
133
            // Increment the subscribe count if we've already subscribed
134
            $this->subscribedChannels["{$appId}:{$channel}"]++;
135
        }
136
137
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [
138
            'channel' => $channel,
139
            'serverId' => $this->getServerId(),
140
            'pubsub' => $this->getTopicName($appId, $channel),
141
        ]);
142
143
        return true;
144
    }
145
146
    /**
147
     * Unsubscribe from a channel on behalf of a websocket user.
148
     *
149
     * @param  string  $appId
150
     * @param  string  $channel
151
     * @return bool
152
     */
153
    public function unsubscribe($appId, string $channel): bool
154
    {
155
        if (! isset($this->subscribedChannels["{$appId}:{$channel}"])) {
156
            return false;
157
        }
158
159
        // Decrement the subscription count for this channel
160
        $this->subscribedChannels["{$appId}:{$channel}"]--;
161
162
        // If we no longer have subscriptions to that channel, unsubscribe
163
        if ($this->subscribedChannels["{$appId}:{$channel}"] < 1) {
164
            $this->subscribeClient->unsubscribe($this->getTopicName($appId, $channel));
165
166
            unset($this->subscribedChannels["{$appId}:{$channel}"]);
167
        }
168
169
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [
170
            'channel' => $channel,
171
            'serverId' => $this->getServerId(),
172
            'pubsub' => $this->getTopicName($appId, $channel),
173
        ]);
174
175
        return true;
176
    }
177
178
    /**
179
     * Subscribe to the app's pubsub keyspace.
180
     *
181
     * @param  mixed  $appId
182
     * @return bool
183
     */
184 View Code Duplication
    public function subscribeToApp($appId): bool
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
185
    {
186
        $this->subscribeClient->subscribe($this->getTopicName($appId));
187
188
        $this->publishClient->hincrby($this->getTopicName($appId), 'connections', 1);
189
190
        return true;
191
    }
192
193
    /**
194
     * Unsubscribe from the app's pubsub keyspace.
195
     *
196
     * @param  mixed  $appId
197
     * @return bool
198
     */
199 View Code Duplication
    public function unsubscribeFromApp($appId): bool
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
200
    {
201
        $this->subscribeClient->unsubscribe($this->getTopicName($appId));
202
203
        $this->publishClient->hincrby($this->getTopicName($appId), 'connections', -1);
204
205
        return true;
206
    }
207
208
    /**
209
     * Add a member to a channel. To be called when they have
210
     * subscribed to the channel.
211
     *
212
     * @param  string  $appId
213
     * @param  string  $channel
214
     * @param  string  $socketId
215
     * @param  string  $data
216
     * @return void
217
     */
218 View Code Duplication
    public function joinChannel($appId, string $channel, string $socketId, string $data)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
219
    {
220
        $this->publishClient->hset($this->getTopicName($appId, $channel), $socketId, $data);
221
222
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_JOINED_CHANNEL, [
223
            'channel' => $channel,
224
            'serverId' => $this->getServerId(),
225
            'socketId' => $socketId,
226
            'data' => $data,
227
            'pubsub' => $this->getTopicName($appId, $channel),
228
        ]);
229
    }
230
231
    /**
232
     * Remove a member from the channel. To be called when they have
233
     * unsubscribed from the channel.
234
     *
235
     * @param  string  $appId
236
     * @param  string  $channel
237
     * @param  string  $socketId
238
     * @return void
239
     */
240 View Code Duplication
    public function leaveChannel($appId, string $channel, string $socketId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
241
    {
242
        $this->publishClient->hdel($this->getTopicName($appId, $channel), $socketId);
243
244
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_LEFT_CHANNEL, [
245
            'channel' => $channel,
246
            'serverId' => $this->getServerId(),
247
            'socketId' => $socketId,
248
            'pubsub' => $this->getTopicName($appId, $channel),
249
        ]);
250
    }
251
252
    /**
253
     * Retrieve the full information about the members in a presence channel.
254
     *
255
     * @param  string  $appId
256
     * @param  string  $channel
257
     * @return PromiseInterface
258
     */
259
    public function channelMembers($appId, string $channel): PromiseInterface
260
    {
261
        return $this->publishClient->hgetall($this->getTopicName($appId, $channel))
262
            ->then(function ($members) {
263
                // The data is expected as objects, so we need to JSON decode
264
                return array_map(function ($user) {
265
                    return json_decode($user);
266
                }, $members);
267
            });
268
    }
269
270
    /**
271
     * Get the amount of users subscribed for each presence channel.
272
     *
273
     * @param  string  $appId
274
     * @param  array  $channelNames
275
     * @return PromiseInterface
276
     */
277
    public function channelMemberCounts($appId, array $channelNames): PromiseInterface
278
    {
279
        $this->publishClient->multi();
280
281
        foreach ($channelNames as $channel) {
282
            $this->publishClient->hlen($this->getTopicName($appId, $channel));
283
        }
284
285
        return $this->publishClient
286
            ->exec()
287
            ->then(function ($data) use ($channelNames) {
288
                return array_combine($channelNames, $data);
289
            });
290
    }
291
292
    /**
293
     * Get the amount of connections aggregated on multiple instances.
294
     *
295
     * @param  mixed  $appId
296
     * @return null|int|\React\Promise\PromiseInterface
297
     */
298
    public function getGlobalConnectionsCount($appId)
299
    {
300
        return $this->publishClient->hget($this->getTopicName($appId), 'connections');
301
    }
302
303
    /**
304
     * Handle a message received from Redis on a specific channel.
305
     *
306
     * @param  string  $redisChannel
307
     * @param  string  $payload
308
     * @return void
309
     */
310
    public function onMessage(string $redisChannel, string $payload)
311
    {
312
        $payload = json_decode($payload);
313
314
        // Ignore messages sent by ourselves.
315
        if (isset($payload->serverId) && $this->getServerId() === $payload->serverId) {
316
            return;
317
        }
318
319
        // Pull out the app ID. See RedisPusherBroadcaster
320
        $appId = $payload->appId;
321
322
        // We need to put the channel name in the payload.
323
        // We strip the app ID from the channel name, websocket clients
324
        // expect the channel name to not include the app ID.
325
        $payload->channel = Str::after($redisChannel, "{$appId}:");
326
327
        $channelManager = app(ChannelManager::class);
328
329
        // Load the Channel instance to sync.
330
        $channel = $channelManager->find($appId, $payload->channel);
331
332
        // If no channel is found, none of our connections want to
333
        // receive this message, so we ignore it.
334
        if (! $channel) {
335
            return;
336
        }
337
338
        $socketId = $payload->socketId ?? null;
339
        $serverId = $payload->serverId ?? null;
340
341
        // Remove fields intended for internal use from the payload.
342
        unset($payload->socketId);
343
        unset($payload->serverId);
344
        unset($payload->appId);
345
346
        // Push the message out to connected websocket clients.
347
        $channel->broadcastToEveryoneExcept($payload, $socketId, $appId, false);
348
349
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_RECEIVED, [
350
            'channel' => $channel->getChannelName(),
351
            'redisChannel' => $redisChannel,
352
            'serverId' => $this->getServerId(),
353
            'incomingServerId' => $serverId,
354
            'incomingSocketId' => $socketId,
355
            'payload' => $payload,
356
        ]);
357
    }
358
359
    /**
360
     * Build the Redis connection URL from Laravel database config.
361
     *
362
     * @return string
363
     */
364
    protected function getConnectionUri()
365
    {
366
        $name = config('websockets.replication.redis.connection', 'default');
367
        $config = config("database.redis.{$name}");
368
369
        $host = $config['host'];
370
        $port = $config['port'] ?: 6379;
371
372
        $query = [];
373
374
        if ($config['password']) {
375
            $query['password'] = $config['password'];
376
        }
377
378
        if ($config['database']) {
379
            $query['database'] = $config['database'];
380
        }
381
382
        $query = http_build_query($query);
383
384
        return "redis://{$host}:{$port}".($query ? "?{$query}" : '');
385
    }
386
387
    /**
388
     * Get the Subscribe client instance.
389
     *
390
     * @return Client
391
     */
392
    public function getSubscribeClient()
393
    {
394
        return $this->subscribeClient;
395
    }
396
397
    /**
398
     * Get the Publish client instance.
399
     *
400
     * @return Client
401
     */
402
    public function getPublishClient()
403
    {
404
        return $this->publishClient;
405
    }
406
407
    /**
408
     * Get the unique identifier for the server.
409
     *
410
     * @return string
411
     */
412
    public function getServerId()
413
    {
414
        return $this->serverId;
415
    }
416
417
    /**
418
     * Get the Pub/Sub Topic name to subscribe based on the
419
     * app ID and channel name.
420
     *
421
     * @param  mixed  $appId
422
     * @param  string|null  $channel
423
     * @return string
424
     */
425
    protected function getTopicName($appId, string $channel = null): string
426
    {
427
        $prefix = config('database.redis.options.prefix', null);
428
429
        $hash = "{$prefix}{$appId}";
430
431
        if ($channel) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $channel of type null|string is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
432
            $hash .= ":{$channel}";
433
        }
434
435
        return $hash;
436
    }
437
}
438