Completed
Pull Request — master (#447)
by Marcel
03:06 queued 01:18
created

RedisClient::getConnectionUri()   B

Complexity

Conditions 6
Paths 32

Size

Total Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 22
rs 8.9457
c 0
b 0
f 0
cc 6
nc 32
nop 0
1
<?php
2
3
namespace BeyondCode\LaravelWebSockets\PubSub\Drivers;
4
5
use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger;
6
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
7
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
8
use Clue\React\Redis\Client;
9
use Clue\React\Redis\Factory;
10
use Illuminate\Support\Str;
11
use React\EventLoop\LoopInterface;
12
use React\Promise\PromiseInterface;
13
use stdClass;
14
15
class RedisClient extends LocalClient
16
{
17
    /**
18
     * The running loop.
19
     *
20
     * @var LoopInterface
21
     */
22
    protected $loop;
23
24
    /**
25
     * The unique server identifier.
26
     *
27
     * @var string
28
     */
29
    protected $serverId;
30
31
    /**
32
     * The pub client.
33
     *
34
     * @var Client
35
     */
36
    protected $publishClient;
37
38
    /**
39
     * The sub client.
40
     *
41
     * @var Client
42
     */
43
    protected $subscribeClient;
44
45
    /**
46
     * Mapping of subscribed channels, where the key is the channel name,
47
     * and the value is the amount of connections which are subscribed to
48
     * that channel. Used to keep track of whether we still need to stay
49
     * subscribed to those channels with Redis.
50
     *
51
     * @var int[]
52
     */
53
    protected $subscribedChannels = [];
54
55
    /**
56
     * Create a new Redis client.
57
     *
58
     * @return void
0 ignored issues
show
Comprehensibility Best Practice introduced by
Adding a @return annotation to constructors is generally not recommended as a constructor does not have a meaningful return value.

Adding a @return annotation to a constructor is not recommended, since a constructor does not have a meaningful return value.

Please refer to the PHP core documentation on constructors.

Loading history...
59
     */
60
    public function __construct()
61
    {
62
        $this->serverId = Str::uuid()->toString();
63
    }
64
65
    /**
66
     * Boot the RedisClient, initializing the connections.
67
     *
68
     * @param  LoopInterface  $loop
69
     * @param  string|null  $factoryClass
70
     * @return ReplicationInterface
71
     */
72
    public function boot(LoopInterface $loop, $factoryClass = null): ReplicationInterface
73
    {
74
        $factoryClass = $factoryClass ?: Factory::class;
75
76
        $this->loop = $loop;
77
78
        $connectionUri = $this->getConnectionUri();
79
        $factory = new $factoryClass($this->loop);
80
81
        $this->publishClient = $factory->createLazyClient($connectionUri);
82
        $this->subscribeClient = $factory->createLazyClient($connectionUri);
83
84
        // The subscribed client gets a message, it triggers the onMessage().
85
        $this->subscribeClient->on('message', function ($channel, $payload) {
86
            $this->onMessage($channel, $payload);
87
        });
88
89
        return $this;
90
    }
91
92
    /**
93
     * Publish a message to a channel on behalf of a websocket user.
94
     *
95
     * @param  string  $appId
96
     * @param  string  $channel
97
     * @param  stdClass  $payload
98
     * @return bool
99
     */
100
    public function publish($appId, string $channel, stdClass $payload): bool
101
    {
102
        $payload->appId = $appId;
103
        $payload->serverId = $this->getServerId();
104
105
        $payload = json_encode($payload);
106
107
        $this->publishClient->__call('publish', [$this->getTopicName($appId, $channel), $payload]);
108
109
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_PUBLISHED, [
110
            'channel' => $channel,
111
            'serverId' => $this->getServerId(),
112
            'payload' => $payload,
113
            'pubsub' => $this->getTopicName($appId, $channel),
114
        ]);
115
116
        return true;
117
    }
118
119
    /**
120
     * Subscribe to a channel on behalf of websocket user.
121
     *
122
     * @param  string  $appId
123
     * @param  string  $channel
124
     * @return bool
125
     */
126
    public function subscribe($appId, string $channel): bool
127
    {
128
        if (! isset($this->subscribedChannels["{$appId}:{$channel}"])) {
129
            // We're not subscribed to the channel yet, subscribe and set the count to 1
130
            $this->subscribeClient->__call('subscribe', [$this->getTopicName($appId, $channel)]);
131
            $this->subscribedChannels["{$appId}:{$channel}"] = 1;
132
        } else {
133
            // Increment the subscribe count if we've already subscribed
134
            $this->subscribedChannels["{$appId}:{$channel}"]++;
135
        }
136
137
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [
138
            'channel' => $channel,
139
            'serverId' => $this->getServerId(),
140
            'pubsub' => $this->getTopicName($appId, $channel),
141
        ]);
142
143
        return true;
144
    }
145
146
    /**
147
     * Unsubscribe from a channel on behalf of a websocket user.
148
     *
149
     * @param  string  $appId
150
     * @param  string  $channel
151
     * @return bool
152
     */
153
    public function unsubscribe($appId, string $channel): bool
154
    {
155
        if (! isset($this->subscribedChannels["{$appId}:{$channel}"])) {
156
            return false;
157
        }
158
159
        // Decrement the subscription count for this channel
160
        $this->subscribedChannels["{$appId}:{$channel}"]--;
161
162
        // If we no longer have subscriptions to that channel, unsubscribe
163
        if ($this->subscribedChannels["{$appId}:{$channel}"] < 1) {
164
            $this->subscribeClient->__call('unsubscribe', ["{$appId}:{$channel}"]);
165
166
            unset($this->subscribedChannels["{$appId}:{$channel}"]);
167
        }
168
169
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [
170
            'channel' => $channel,
171
            'serverId' => $this->getServerId(),
172
            'pubsub' => $this->getTopicName($appId, $channel),
173
        ]);
174
175
        return true;
176
    }
177
178
    /**
179
     * Add a member to a channel. To be called when they have
180
     * subscribed to the channel.
181
     *
182
     * @param  string  $appId
183
     * @param  string  $channel
184
     * @param  string  $socketId
185
     * @param  string  $data
186
     * @return void
187
     */
188
    public function joinChannel($appId, string $channel, string $socketId, string $data)
189
    {
190
        $this->publishClient->__call('hset', ["{$appId}:{$channel}", $socketId, $data]);
191
192
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_JOINED_CHANNEL, [
193
            'channel' => $channel,
194
            'serverId' => $this->getServerId(),
195
            'socketId' => $socketId,
196
            'data' => $data,
197
            'pubsub' => $this->getTopicName($appId, $channel),
198
        ]);
199
    }
200
201
    /**
202
     * Remove a member from the channel. To be called when they have
203
     * unsubscribed from the channel.
204
     *
205
     * @param  string  $appId
206
     * @param  string  $channel
207
     * @param  string  $socketId
208
     * @return void
209
     */
210
    public function leaveChannel($appId, string $channel, string $socketId)
211
    {
212
        $this->publishClient->__call('hdel', [$this->getTopicName($appId, $channel), $socketId]);
213
214
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_LEFT_CHANNEL, [
215
            'channel' => $channel,
216
            'serverId' => $this->getServerId(),
217
            'socketId' => $socketId,
218
            'pubsub' => $this->getTopicName($appId, $channel),
219
        ]);
220
    }
221
222
    /**
223
     * Retrieve the full information about the members in a presence channel.
224
     *
225
     * @param  string  $appId
226
     * @param  string  $channel
227
     * @return PromiseInterface
228
     */
229
    public function channelMembers($appId, string $channel): PromiseInterface
230
    {
231
        return $this->publishClient->__call('hgetall', [$this->getTopicName($appId, $channel)])
232
            ->then(function ($members) {
233
                // The data is expected as objects, so we need to JSON decode
234
                return array_map(function ($user) {
235
                    return json_decode($user);
236
                }, $members);
237
            });
238
    }
239
240
    /**
241
     * Get the amount of users subscribed for each presence channel.
242
     *
243
     * @param  string  $appId
244
     * @param  array  $channelNames
245
     * @return PromiseInterface
246
     */
247
    public function channelMemberCounts($appId, array $channelNames): PromiseInterface
248
    {
249
        $this->publishClient->__call('multi', []);
250
251
        foreach ($channelNames as $channel) {
252
            $this->publishClient->__call('hlen', [$this->getTopicName($appId, $channel)]);
253
        }
254
255
        return $this->publishClient->__call('exec', [])
256
            ->then(function ($data) use ($channelNames) {
257
                return array_combine($channelNames, $data);
258
            });
259
    }
260
261
    /**
262
     * Handle a message received from Redis on a specific channel.
263
     *
264
     * @param  string  $redisChannel
265
     * @param  string  $payload
266
     * @return void
267
     */
268
    public function onMessage(string $redisChannel, string $payload)
269
    {
270
        $payload = json_decode($payload);
271
272
        // Ignore messages sent by ourselves.
273
        if (isset($payload->serverId) && $this->getServerId() === $payload->serverId) {
274
            return;
275
        }
276
277
        // Pull out the app ID. See RedisPusherBroadcaster
278
        $appId = $payload->appId;
279
280
        // We need to put the channel name in the payload.
281
        // We strip the app ID from the channel name, websocket clients
282
        // expect the channel name to not include the app ID.
283
        $payload->channel = Str::after($redisChannel, "{$appId}:");
284
285
        $channelManager = app(ChannelManager::class);
286
287
        // Load the Channel instance to sync.
288
        $channel = $channelManager->find($appId, $payload->channel);
289
290
        // If no channel is found, none of our connections want to
291
        // receive this message, so we ignore it.
292
        if (! $channel) {
293
            return;
294
        }
295
296
        $socketId = $payload->socketId ?? null;
297
        $serverId = $payload->serverId ?? null;
298
299
        // Remove fields intended for internal use from the payload.
300
        unset($payload->socketId);
301
        unset($payload->serverId);
302
        unset($payload->appId);
303
304
        // Push the message out to connected websocket clients.
305
        $channel->broadcastToEveryoneExcept($payload, $socketId, $appId, false);
306
307
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_RECEIVED, [
308
            'channel' => $channel->getChannelName(),
309
            'redisChannel' => $redisChannel,
310
            'serverId' => $this->getServerId(),
311
            'incomingServerId' => $serverId,
312
            'incomingSocketId' => $socketId,
313
            'payload' => $payload,
314
        ]);
315
    }
316
317
    /**
318
     * Build the Redis connection URL from Laravel database config.
319
     *
320
     * @return string
321
     */
322
    protected function getConnectionUri()
323
    {
324
        $name = config('websockets.replication.redis.connection') ?: 'default';
325
        $config = config('database.redis')[$name];
326
327
        $host = $config['host'];
328
        $port = $config['port'] ?: 6379;
329
330
        $query = [];
331
332
        if ($config['password']) {
333
            $query['password'] = $config['password'];
334
        }
335
336
        if ($config['database']) {
337
            $query['database'] = $config['database'];
338
        }
339
340
        $query = http_build_query($query);
341
342
        return "redis://{$host}:{$port}".($query ? "?{$query}" : '');
343
    }
344
345
    /**
346
     * Get the Subscribe client instance.
347
     *
348
     * @return Client
349
     */
350
    public function getSubscribeClient()
351
    {
352
        return $this->subscribeClient;
353
    }
354
355
    /**
356
     * Get the Publish client instance.
357
     *
358
     * @return Client
359
     */
360
    public function getPublishClient()
361
    {
362
        return $this->publishClient;
363
    }
364
365
    /**
366
     * Get the unique identifier for the server.
367
     *
368
     * @return string
369
     */
370
    public function getServerId()
371
    {
372
        return $this->serverId;
373
    }
374
375
    /**
376
     * Get the Pub/Sub Topic name to subscribe based on the
377
     * app ID and channel name.
378
     *
379
     * @param  mixed  $appId
380
     * @param  string  $channel
381
     * @return string
382
     */
383
    protected function getTopicName($appId, string $channel): string
384
    {
385
        $prefix = config('database.redis.options.prefix', null);
386
387
        return "{$prefix}{$appId}:{$channel}";
388
    }
389
}
390