Completed
Pull Request — master (#447)
by Alexandru
10:08
created

RedisClient   A

Complexity

Total Complexity 31

Size/Duplication

Total Lines 422
Duplicated Lines 9.24 %

Coupling/Cohesion

Components 1
Dependencies 6

Importance

Changes 0
Metric Value
wmc 31
lcom 1
cbo 6
dl 39
loc 422
rs 9.92
c 0
b 0
f 0

18 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A boot() 0 19 2
A publish() 0 18 1
A subscribe() 0 19 2
A unsubscribe() 0 24 3
A subscribeToApp() 8 8 1
A unsubscribeFromApp() 8 8 1
A joinChannel() 12 12 1
A leaveChannel() 11 11 1
A channelMembers() 0 10 1
A channelMemberCounts() 0 13 2
A getGlobalConnectionsCount() 0 4 1
A onMessage() 0 48 4
A getConnectionUri() 0 22 5
A getSubscribeClient() 0 4 1
A getPublishClient() 0 4 1
A getServerId() 0 4 1
A getTopicName() 0 12 2

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
3
namespace BeyondCode\LaravelWebSockets\PubSub\Drivers;
4
5
use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger;
6
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
7
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
8
use Clue\React\Redis\Client;
9
use Clue\React\Redis\Factory;
10
use Illuminate\Support\Str;
11
use React\EventLoop\LoopInterface;
12
use React\Promise\PromiseInterface;
13
use stdClass;
14
15
class RedisClient extends LocalClient
16
{
17
    /**
18
     * The running loop.
19
     *
20
     * @var LoopInterface
21
     */
22
    protected $loop;
23
24
    /**
25
     * The unique server identifier.
26
     *
27
     * @var string
28
     */
29
    protected $serverId;
30
31
    /**
32
     * The pub client.
33
     *
34
     * @var Client
35
     */
36
    protected $publishClient;
37
38
    /**
39
     * The sub client.
40
     *
41
     * @var Client
42
     */
43
    protected $subscribeClient;
44
45
    /**
46
     * Mapping of subscribed channels, where the key is the channel name,
47
     * and the value is the amount of connections which are subscribed to
48
     * that channel. Used to keep track of whether we still need to stay
49
     * subscribed to those channels with Redis.
50
     *
51
     * @var int[]
52
     */
53
    protected $subscribedChannels = [];
54
55
    /**
56
     * Create a new Redis client.
57
     *
58
     * @return void
0 ignored issues
show
Comprehensibility Best Practice introduced by
Adding a @return annotation to constructors is generally not recommended as a constructor does not have a meaningful return value.

Adding a @return annotation to a constructor is not recommended, since a constructor does not have a meaningful return value.

Please refer to the PHP core documentation on constructors.

Loading history...
59
     */
60
    public function __construct()
61
    {
62
        $this->serverId = Str::uuid()->toString();
63
    }
64
65
    /**
66
     * Boot the RedisClient, initializing the connections.
67
     *
68
     * @param  LoopInterface  $loop
69
     * @param  string|null  $factoryClass
70
     * @return ReplicationInterface
71
     */
72
    public function boot(LoopInterface $loop, $factoryClass = null): ReplicationInterface
73
    {
74
        $factoryClass = $factoryClass ?: Factory::class;
75
76
        $this->loop = $loop;
77
78
        $connectionUri = $this->getConnectionUri();
79
        $factory = new $factoryClass($this->loop);
80
81
        $this->publishClient = $factory->createLazyClient($connectionUri);
82
        $this->subscribeClient = $factory->createLazyClient($connectionUri);
83
84
        // The subscribed client gets a message, it triggers the onMessage().
85
        $this->subscribeClient->on('message', function ($channel, $payload) {
86
            $this->onMessage($channel, $payload);
87
        });
88
89
        return $this;
90
    }
91
92
    /**
93
     * Publish a message to a channel on behalf of a websocket user.
94
     *
95
     * @param  string  $appId
96
     * @param  string  $channel
97
     * @param  stdClass  $payload
98
     * @return bool
99
     */
100
    public function publish($appId, string $channel, stdClass $payload): bool
101
    {
102
        $payload->appId = $appId;
103
        $payload->serverId = $this->getServerId();
104
105
        $payload = json_encode($payload);
106
107
        $this->publishClient->__call('publish', [$this->getTopicName($appId, $channel), $payload]);
108
109
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_PUBLISHED, [
110
            'channel' => $channel,
111
            'serverId' => $this->getServerId(),
112
            'payload' => $payload,
113
            'pubsub' => $this->getTopicName($appId, $channel),
114
        ]);
115
116
        return true;
117
    }
118
119
    /**
120
     * Subscribe to a channel on behalf of websocket user.
121
     *
122
     * @param  string  $appId
123
     * @param  string  $channel
124
     * @return bool
125
     */
126
    public function subscribe($appId, string $channel): bool
127
    {
128
        if (! isset($this->subscribedChannels["{$appId}:{$channel}"])) {
129
            // We're not subscribed to the channel yet, subscribe and set the count to 1
130
            $this->subscribeClient->__call('subscribe', [$this->getTopicName($appId, $channel)]);
131
            $this->subscribedChannels["{$appId}:{$channel}"] = 1;
132
        } else {
133
            // Increment the subscribe count if we've already subscribed
134
            $this->subscribedChannels["{$appId}:{$channel}"]++;
135
        }
136
137
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [
138
            'channel' => $channel,
139
            'serverId' => $this->getServerId(),
140
            'pubsub' => $this->getTopicName($appId, $channel),
141
        ]);
142
143
        return true;
144
    }
145
146
    /**
147
     * Unsubscribe from a channel on behalf of a websocket user.
148
     *
149
     * @param  string  $appId
150
     * @param  string  $channel
151
     * @return bool
152
     */
153
    public function unsubscribe($appId, string $channel): bool
154
    {
155
        if (! isset($this->subscribedChannels["{$appId}:{$channel}"])) {
156
            return false;
157
        }
158
159
        // Decrement the subscription count for this channel
160
        $this->subscribedChannels["{$appId}:{$channel}"]--;
161
162
        // If we no longer have subscriptions to that channel, unsubscribe
163
        if ($this->subscribedChannels["{$appId}:{$channel}"] < 1) {
164
            $this->subscribeClient->__call('unsubscribe', [$this->getTopicName($appId, $channel)]);
165
166
            unset($this->subscribedChannels["{$appId}:{$channel}"]);
167
        }
168
169
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [
170
            'channel' => $channel,
171
            'serverId' => $this->getServerId(),
172
            'pubsub' => $this->getTopicName($appId, $channel),
173
        ]);
174
175
        return true;
176
    }
177
178
    /**
179
     * Subscribe to the app's pubsub keyspace.
180
     *
181
     * @param  mixed  $appId
182
     * @return bool
183
     */
184 View Code Duplication
    public function subscribeToApp($appId): bool
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
185
    {
186
        $this->subscribeClient->__call('subscribe', [$this->getTopicName($appId)]);
187
188
        $this->publishClient->__call('hincrby', [$this->getTopicName($appId), 'connections', 1]);
189
190
        return true;
191
    }
192
193
    /**
194
     * Unsubscribe from the app's pubsub keyspace.
195
     *
196
     * @param  mixed  $appId
197
     * @return bool
198
     */
199 View Code Duplication
    public function unsubscribeFromApp($appId): bool
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
200
    {
201
        $this->subscribeClient->__call('unsubscribe', [$this->getTopicName($appId)]);
202
203
        $this->publishClient->__call('hincrby', [$this->getTopicName($appId), 'connections', -1]);
204
205
        return true;
206
    }
207
208
    /**
209
     * Add a member to a channel. To be called when they have
210
     * subscribed to the channel.
211
     *
212
     * @param  string  $appId
213
     * @param  string  $channel
214
     * @param  string  $socketId
215
     * @param  string  $data
216
     * @return void
217
     */
218 View Code Duplication
    public function joinChannel($appId, string $channel, string $socketId, string $data)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
219
    {
220
        $this->publishClient->__call('hset', [$this->getTopicName($appId, $channel), $socketId, $data]);
221
222
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_JOINED_CHANNEL, [
223
            'channel' => $channel,
224
            'serverId' => $this->getServerId(),
225
            'socketId' => $socketId,
226
            'data' => $data,
227
            'pubsub' => $this->getTopicName($appId, $channel),
228
        ]);
229
    }
230
231
    /**
232
     * Remove a member from the channel. To be called when they have
233
     * unsubscribed from the channel.
234
     *
235
     * @param  string  $appId
236
     * @param  string  $channel
237
     * @param  string  $socketId
238
     * @return void
239
     */
240 View Code Duplication
    public function leaveChannel($appId, string $channel, string $socketId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
241
    {
242
        $this->publishClient->__call('hdel', [$this->getTopicName($appId, $channel), $socketId]);
243
244
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_LEFT_CHANNEL, [
245
            'channel' => $channel,
246
            'serverId' => $this->getServerId(),
247
            'socketId' => $socketId,
248
            'pubsub' => $this->getTopicName($appId, $channel),
249
        ]);
250
    }
251
252
    /**
253
     * Retrieve the full information about the members in a presence channel.
254
     *
255
     * @param  string  $appId
256
     * @param  string  $channel
257
     * @return PromiseInterface
258
     */
259
    public function channelMembers($appId, string $channel): PromiseInterface
260
    {
261
        return $this->publishClient->__call('hgetall', [$this->getTopicName($appId, $channel)])
262
            ->then(function ($members) {
263
                // The data is expected as objects, so we need to JSON decode
264
                return array_map(function ($user) {
265
                    return json_decode($user);
266
                }, $members);
267
            });
268
    }
269
270
    /**
271
     * Get the amount of users subscribed for each presence channel.
272
     *
273
     * @param  string  $appId
274
     * @param  array  $channelNames
275
     * @return PromiseInterface
276
     */
277
    public function channelMemberCounts($appId, array $channelNames): PromiseInterface
278
    {
279
        $this->publishClient->__call('multi', []);
280
281
        foreach ($channelNames as $channel) {
282
            $this->publishClient->__call('hlen', [$this->getTopicName($appId, $channel)]);
283
        }
284
285
        return $this->publishClient->__call('exec', [])
286
            ->then(function ($data) use ($channelNames) {
287
                return array_combine($channelNames, $data);
288
            });
289
    }
290
291
    /**
292
     * Get the amount of connections aggregated on multiple instances.
293
     *
294
     * @param  mixed  $appId
295
     * @return null|int|\React\Promise\PromiseInterface
296
     */
297
    public function getGlobalConnectionsCount($appId)
298
    {
299
        return $this->publishClient->hget($this->getTopicName($appId), 'connections');
300
    }
301
302
    /**
303
     * Handle a message received from Redis on a specific channel.
304
     *
305
     * @param  string  $redisChannel
306
     * @param  string  $payload
307
     * @return void
308
     */
309
    public function onMessage(string $redisChannel, string $payload)
310
    {
311
        $payload = json_decode($payload);
312
313
        // Ignore messages sent by ourselves.
314
        if (isset($payload->serverId) && $this->getServerId() === $payload->serverId) {
315
            return;
316
        }
317
318
        // Pull out the app ID. See RedisPusherBroadcaster
319
        $appId = $payload->appId;
320
321
        // We need to put the channel name in the payload.
322
        // We strip the app ID from the channel name, websocket clients
323
        // expect the channel name to not include the app ID.
324
        $payload->channel = Str::after($redisChannel, "{$appId}:");
325
326
        $channelManager = app(ChannelManager::class);
327
328
        // Load the Channel instance to sync.
329
        $channel = $channelManager->find($appId, $payload->channel);
330
331
        // If no channel is found, none of our connections want to
332
        // receive this message, so we ignore it.
333
        if (! $channel) {
334
            return;
335
        }
336
337
        $socketId = $payload->socketId ?? null;
338
        $serverId = $payload->serverId ?? null;
339
340
        // Remove fields intended for internal use from the payload.
341
        unset($payload->socketId);
342
        unset($payload->serverId);
343
        unset($payload->appId);
344
345
        // Push the message out to connected websocket clients.
346
        $channel->broadcastToEveryoneExcept($payload, $socketId, $appId, false);
347
348
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_RECEIVED, [
349
            'channel' => $channel->getChannelName(),
350
            'redisChannel' => $redisChannel,
351
            'serverId' => $this->getServerId(),
352
            'incomingServerId' => $serverId,
353
            'incomingSocketId' => $socketId,
354
            'payload' => $payload,
355
        ]);
356
    }
357
358
    /**
359
     * Build the Redis connection URL from Laravel database config.
360
     *
361
     * @return string
362
     */
363
    protected function getConnectionUri()
364
    {
365
        $name = config('websockets.replication.redis.connection', 'default');
366
        $config = config("database.redis.{$name}");
367
368
        $host = $config['host'];
369
        $port = $config['port'] ?: 6379;
370
371
        $query = [];
372
373
        if ($config['password']) {
374
            $query['password'] = $config['password'];
375
        }
376
377
        if ($config['database']) {
378
            $query['database'] = $config['database'];
379
        }
380
381
        $query = http_build_query($query);
382
383
        return "redis://{$host}:{$port}".($query ? "?{$query}" : '');
384
    }
385
386
    /**
387
     * Get the Subscribe client instance.
388
     *
389
     * @return Client
390
     */
391
    public function getSubscribeClient()
392
    {
393
        return $this->subscribeClient;
394
    }
395
396
    /**
397
     * Get the Publish client instance.
398
     *
399
     * @return Client
400
     */
401
    public function getPublishClient()
402
    {
403
        return $this->publishClient;
404
    }
405
406
    /**
407
     * Get the unique identifier for the server.
408
     *
409
     * @return string
410
     */
411
    public function getServerId()
412
    {
413
        return $this->serverId;
414
    }
415
416
    /**
417
     * Get the Pub/Sub Topic name to subscribe based on the
418
     * app ID and channel name.
419
     *
420
     * @param  mixed  $appId
421
     * @param  string|null  $channel
422
     * @return string
423
     */
424
    protected function getTopicName($appId, string $channel = null): string
425
    {
426
        $prefix = config('database.redis.options.prefix', null);
427
428
        $hash = "{$prefix}{$appId}";
429
430
        if ($channel) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $channel of type null|string is loosely compared to true; this is ambiguous if the string can be empty. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
431
            $hash .= ":{$channel}";
432
        }
433
434
        return $hash;
435
    }
436
}
437