Completed
Pull Request — master (#497)
by Alexandru
01:27
created

RedisClient::publish()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 18
rs 9.6666
c 0
b 0
f 0
cc 1
nc 1
nop 3
1
<?php
2
3
namespace BeyondCode\LaravelWebSockets\PubSub\Drivers;
4
5
use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger;
6
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
7
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
8
use Clue\React\Redis\Client;
9
use Clue\React\Redis\Factory;
10
use Illuminate\Support\Facades\Cache;
11
use Illuminate\Support\Str;
12
use React\EventLoop\LoopInterface;
13
use React\Promise\PromiseInterface;
14
use stdClass;
15
16
class RedisClient extends LocalClient
17
{
18
    /**
19
     * The running loop.
20
     *
21
     * @var LoopInterface
22
     */
23
    protected $loop;
24
25
    /**
26
     * The unique server identifier.
27
     *
28
     * @var string
29
     */
30
    protected $serverId;
31
32
    /**
33
     * The pub client.
34
     *
35
     * @var Client
36
     */
37
    protected $publishClient;
38
39
    /**
40
     * The sub client.
41
     *
42
     * @var Client
43
     */
44
    protected $subscribeClient;
45
46
    /**
47
     * The Redis manager instance.
48
     *
49
     * @var \Illuminate\Redis\RedisManager
50
     */
51
    protected $redis;
52
53
    /**
54
     * Mapping of subscribed channels, where the key is the channel name,
55
     * and the value is the amount of connections which are subscribed to
56
     * that channel. Used to keep track of whether we still need to stay
57
     * subscribed to those channels with Redis.
58
     *
59
     * @var int[]
60
     */
61
    protected $subscribedChannels = [];
62
63
    /**
64
     * Create a new Redis client.
65
     *
66
     * @return void
0 ignored issues
show
Comprehensibility Best Practice introduced by
Adding a @return annotation to constructors is generally not recommended as a constructor does not have a meaningful return value.

Adding a @return annotation to a constructor is not recommended, since a constructor does not have a meaningful return value.

Please refer to the PHP core documentation on constructors.

Loading history...
67
     */
68
    public function __construct()
69
    {
70
        $this->serverId = Str::uuid()->toString();
71
        $this->redis = Cache::getRedis();
72
    }
73
74
    /**
75
     * Boot the RedisClient, initializing the connections.
76
     *
77
     * @param  LoopInterface  $loop
78
     * @param  string|null  $factoryClass
79
     * @return ReplicationInterface
80
     */
81
    public function boot(LoopInterface $loop, $factoryClass = null): ReplicationInterface
82
    {
83
        $factoryClass = $factoryClass ?: Factory::class;
84
85
        $this->loop = $loop;
86
87
        $connectionUri = $this->getConnectionUri();
88
        $factory = new $factoryClass($this->loop);
89
90
        $this->publishClient = $factory->createLazyClient($connectionUri);
91
        $this->subscribeClient = $factory->createLazyClient($connectionUri);
92
93
        // The subscribed client gets a message, it triggers the onMessage().
94
        $this->subscribeClient->on('message', function ($channel, $payload) {
95
            $this->onMessage($channel, $payload);
96
        });
97
98
        return $this;
99
    }
100
101
    /**
102
     * Publish a message to a channel on behalf of a websocket user.
103
     *
104
     * @param  string  $appId
105
     * @param  string  $channel
106
     * @param  stdClass  $payload
107
     * @return bool
108
     */
109
    public function publish($appId, string $channel, stdClass $payload): bool
110
    {
111
        $payload->appId = $appId;
112
        $payload->serverId = $this->getServerId();
113
114
        $payload = json_encode($payload);
115
116
        $this->publishClient->__call('publish', [$this->getTopicName($appId, $channel), $payload]);
117
118
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_PUBLISHED, [
119
            'channel' => $channel,
120
            'serverId' => $this->getServerId(),
121
            'payload' => $payload,
122
            'pubsub' => $this->getTopicName($appId, $channel),
123
        ]);
124
125
        return true;
126
    }
127
128
    /**
129
     * Subscribe to a channel on behalf of websocket user.
130
     *
131
     * @param  string  $appId
132
     * @param  string  $channel
133
     * @return bool
134
     */
135
    public function subscribe($appId, string $channel): bool
136
    {
137
        if (! isset($this->subscribedChannels["{$appId}:{$channel}"])) {
138
            // We're not subscribed to the channel yet, subscribe and set the count to 1
139
            $this->subscribeClient->__call('subscribe', [$this->getTopicName($appId, $channel)]);
140
            $this->subscribedChannels["{$appId}:{$channel}"] = 1;
141
        } else {
142
            // Increment the subscribe count if we've already subscribed
143
            $this->subscribedChannels["{$appId}:{$channel}"]++;
144
        }
145
146
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [
147
            'channel' => $channel,
148
            'serverId' => $this->getServerId(),
149
            'pubsub' => $this->getTopicName($appId, $channel),
150
        ]);
151
152
        return true;
153
    }
154
155
    /**
156
     * Unsubscribe from a channel on behalf of a websocket user.
157
     *
158
     * @param  string  $appId
159
     * @param  string  $channel
160
     * @return bool
161
     */
162
    public function unsubscribe($appId, string $channel): bool
163
    {
164
        if (! isset($this->subscribedChannels["{$appId}:{$channel}"])) {
165
            return false;
166
        }
167
168
        // Decrement the subscription count for this channel
169
        $this->subscribedChannels["{$appId}:{$channel}"]--;
170
171
        // If we no longer have subscriptions to that channel, unsubscribe
172
        if ($this->subscribedChannels["{$appId}:{$channel}"] < 1) {
173
            $this->subscribeClient->__call('unsubscribe', [$this->getTopicName($appId, $channel)]);
174
175
            unset($this->subscribedChannels["{$appId}:{$channel}"]);
176
        }
177
178
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [
179
            'channel' => $channel,
180
            'serverId' => $this->getServerId(),
181
            'pubsub' => $this->getTopicName($appId, $channel),
182
        ]);
183
184
        return true;
185
    }
186
187
    /**
188
     * Subscribe to the app's pubsub keyspace.
189
     *
190
     * @param  mixed  $appId
191
     * @return bool
192
     */
193 View Code Duplication
    public function subscribeToApp($appId): bool
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
194
    {
195
        $this->subscribeClient->__call('subscribe', [$this->getTopicName($appId)]);
196
197
        $this->redis->hincrby($this->getTopicName($appId), 'connections', 1);
198
199
        return true;
200
    }
201
202
    /**
203
     * Unsubscribe from the app's pubsub keyspace.
204
     *
205
     * @param  mixed  $appId
206
     * @return bool
207
     */
208 View Code Duplication
    public function unsubscribeFromApp($appId): bool
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
209
    {
210
        $this->subscribeClient->__call('unsubscribe', [$this->getTopicName($appId)]);
211
212
        $this->redis->hincrby($this->getTopicName($appId), 'connections', -1);
213
214
        return true;
215
    }
216
217
    /**
218
     * Add a member to a channel. To be called when they have
219
     * subscribed to the channel.
220
     *
221
     * @param  string  $appId
222
     * @param  string  $channel
223
     * @param  string  $socketId
224
     * @param  string  $data
225
     * @return void
226
     */
227 View Code Duplication
    public function joinChannel($appId, string $channel, string $socketId, string $data)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
228
    {
229
        $this->publishClient->__call('hset', [$this->getTopicName($appId, $channel), $socketId, $data]);
230
231
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_JOINED_CHANNEL, [
232
            'channel' => $channel,
233
            'serverId' => $this->getServerId(),
234
            'socketId' => $socketId,
235
            'data' => $data,
236
            'pubsub' => $this->getTopicName($appId, $channel),
237
        ]);
238
    }
239
240
    /**
241
     * Remove a member from the channel. To be called when they have
242
     * unsubscribed from the channel.
243
     *
244
     * @param  string  $appId
245
     * @param  string  $channel
246
     * @param  string  $socketId
247
     * @return void
248
     */
249 View Code Duplication
    public function leaveChannel($appId, string $channel, string $socketId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
250
    {
251
        $this->publishClient->__call('hdel', [$this->getTopicName($appId, $channel), $socketId]);
252
253
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_LEFT_CHANNEL, [
254
            'channel' => $channel,
255
            'serverId' => $this->getServerId(),
256
            'socketId' => $socketId,
257
            'pubsub' => $this->getTopicName($appId, $channel),
258
        ]);
259
    }
260
261
    /**
262
     * Retrieve the full information about the members in a presence channel.
263
     *
264
     * @param  string  $appId
265
     * @param  string  $channel
266
     * @return PromiseInterface
267
     */
268
    public function channelMembers($appId, string $channel): PromiseInterface
269
    {
270
        return $this->publishClient->__call('hgetall', [$this->getTopicName($appId, $channel)])
271
            ->then(function ($members) {
272
                // The data is expected as objects, so we need to JSON decode
273
                return array_map(function ($user) {
274
                    return json_decode($user);
275
                }, $members);
276
            });
277
    }
278
279
    /**
280
     * Get the amount of users subscribed for each presence channel.
281
     *
282
     * @param  string  $appId
283
     * @param  array  $channelNames
284
     * @return PromiseInterface
285
     */
286
    public function channelMemberCounts($appId, array $channelNames): PromiseInterface
287
    {
288
        $this->publishClient->__call('multi', []);
289
290
        foreach ($channelNames as $channel) {
291
            $this->publishClient->__call('hlen', [$this->getTopicName($appId, $channel)]);
292
        }
293
294
        return $this->publishClient->__call('exec', [])
295
            ->then(function ($data) use ($channelNames) {
296
                return array_combine($channelNames, $data);
297
            });
298
    }
299
300
    /**
301
     * Get the amount of unique connections.
302
     *
303
     * @param  mixed  $appId
304
     * @return null|int|\React\Promise\PromiseInterface
305
     */
306
    public function appConnectionsCount($appId)
307
    {
308
        // Use the in-built Redis manager to avoid async run.
309
310
        return $this->redis->hget($this->getTopicName($appId), 'connections') ?: 0;
311
    }
312
313
    /**
314
     * Handle a message received from Redis on a specific channel.
315
     *
316
     * @param  string  $redisChannel
317
     * @param  string  $payload
318
     * @return void
319
     */
320
    public function onMessage(string $redisChannel, string $payload)
321
    {
322
        $payload = json_decode($payload);
323
324
        // Ignore messages sent by ourselves.
325
        if (isset($payload->serverId) && $this->getServerId() === $payload->serverId) {
326
            return;
327
        }
328
329
        // Pull out the app ID. See RedisPusherBroadcaster
330
        $appId = $payload->appId;
331
332
        // We need to put the channel name in the payload.
333
        // We strip the app ID from the channel name, websocket clients
334
        // expect the channel name to not include the app ID.
335
        $payload->channel = Str::after($redisChannel, "{$appId}:");
336
337
        $channelManager = app(ChannelManager::class);
338
339
        // Load the Channel instance to sync.
340
        $channel = $channelManager->find($appId, $payload->channel);
341
342
        // If no channel is found, none of our connections want to
343
        // receive this message, so we ignore it.
344
        if (! $channel) {
345
            return;
346
        }
347
348
        $socketId = $payload->socketId ?? null;
349
        $serverId = $payload->serverId ?? null;
350
351
        // Remove fields intended for internal use from the payload.
352
        unset($payload->socketId);
353
        unset($payload->serverId);
354
        unset($payload->appId);
355
356
        // Push the message out to connected websocket clients.
357
        $channel->broadcastToEveryoneExcept($payload, $socketId, $appId, false);
358
359
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_RECEIVED, [
360
            'channel' => $channel->getChannelName(),
361
            'redisChannel' => $redisChannel,
362
            'serverId' => $this->getServerId(),
363
            'incomingServerId' => $serverId,
364
            'incomingSocketId' => $socketId,
365
            'payload' => $payload,
366
        ]);
367
    }
368
369
    /**
370
     * Build the Redis connection URL from Laravel database config.
371
     *
372
     * @return string
373
     */
374
    protected function getConnectionUri()
375
    {
376
        $name = config('websockets.replication.redis.connection') ?: 'default';
377
        $config = config('database.redis')[$name];
378
379
        $host = $config['host'];
380
        $port = $config['port'] ?: 6379;
381
382
        $query = [];
383
384
        if ($config['password']) {
385
            $query['password'] = $config['password'];
386
        }
387
388
        if ($config['database']) {
389
            $query['database'] = $config['database'];
390
        }
391
392
        $query = http_build_query($query);
393
394
        return "redis://{$host}:{$port}".($query ? "?{$query}" : '');
395
    }
396
397
    /**
398
     * Get the Subscribe client instance.
399
     *
400
     * @return Client
401
     */
402
    public function getSubscribeClient()
403
    {
404
        return $this->subscribeClient;
405
    }
406
407
    /**
408
     * Get the Publish client instance.
409
     *
410
     * @return Client
411
     */
412
    public function getPublishClient()
413
    {
414
        return $this->publishClient;
415
    }
416
417
    /**
418
     * Get the unique identifier for the server.
419
     *
420
     * @return string
421
     */
422
    public function getServerId()
423
    {
424
        return $this->serverId;
425
    }
426
427
    /**
428
     * Get the Pub/Sub Topic name to subscribe based on the
429
     * app ID and channel name.
430
     *
431
     * @param  mixed  $appId
432
     * @param  string|null  $channel
433
     * @return string
434
     */
435
    protected function getTopicName($appId, string $channel = null): string
436
    {
437
        $prefix = config('database.redis.options.prefix', null);
438
439
        $hash = "{$prefix}{$appId}";
440
441
        if ($channel) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $channel of type null|string is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
442
            $hash .= ":{$channel}";
443
        }
444
445
        return $hash;
446
    }
447
}
448