Completed
Pull Request — master (#447)
by Alexandru
01:23
created

RedisClient::publish()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 18
rs 9.6666
c 0
b 0
f 0
cc 1
nc 1
nop 3
1
<?php
2
3
namespace BeyondCode\LaravelWebSockets\PubSub\Drivers;
4
5
use BeyondCode\LaravelWebSockets\Dashboard\DashboardLogger;
6
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
7
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
8
use Clue\React\Redis\Client;
9
use Clue\React\Redis\Factory;
10
use Illuminate\Support\Str;
11
use React\EventLoop\LoopInterface;
12
use React\Promise\PromiseInterface;
13
use stdClass;
14
15
class RedisClient extends LocalClient
16
{
17
    /**
18
     * The running loop.
19
     *
20
     * @var LoopInterface
21
     */
22
    protected $loop;
23
24
    /**
25
     * The unique server identifier.
26
     *
27
     * @var string
28
     */
29
    protected $serverId;
30
31
    /**
32
     * The pub client.
33
     *
34
     * @var Client
35
     */
36
    protected $publishClient;
37
38
    /**
39
     * The sub client.
40
     *
41
     * @var Client
42
     */
43
    protected $subscribeClient;
44
45
    /**
46
     * Mapping of subscribed channels, where the key is the channel name,
47
     * and the value is the amount of connections which are subscribed to
48
     * that channel. Used to keep track of whether we still need to stay
49
     * subscribed to those channels with Redis.
50
     *
51
     * @var int[]
52
     */
53
    protected $subscribedChannels = [];
54
55
    /**
56
     * Create a new Redis client.
57
     *
58
     * @return void
0 ignored issues
show
Comprehensibility Best Practice introduced by
Adding a @return annotation to constructors is generally not recommended as a constructor does not have a meaningful return value.

Adding a @return annotation to a constructor is not recommended, since a constructor does not have a meaningful return value.

Please refer to the PHP core documentation on constructors.

Loading history...
59
     */
60
    public function __construct()
61
    {
62
        $this->serverId = Str::uuid()->toString();
63
    }
64
65
    /**
66
     * Boot the RedisClient, initializing the connections.
67
     *
68
     * @param  LoopInterface  $loop
69
     * @param  string|null  $factoryClass
70
     * @return ReplicationInterface
71
     */
72
    public function boot(LoopInterface $loop, $factoryClass = null): ReplicationInterface
73
    {
74
        $factoryClass = $factoryClass ?: Factory::class;
75
76
        $this->loop = $loop;
77
78
        $connectionUri = $this->getConnectionUri();
79
        $factory = new $factoryClass($this->loop);
80
81
        $this->publishClient = $factory->createLazyClient($connectionUri);
82
        $this->subscribeClient = $factory->createLazyClient($connectionUri);
83
84
        // The subscribed client gets a message, it triggers the onMessage().
85
        $this->subscribeClient->on('message', function ($channel, $payload) {
86
            $this->onMessage($channel, $payload);
87
        });
88
89
        return $this;
90
    }
91
92
    /**
93
     * Publish a message to a channel on behalf of a websocket user.
94
     *
95
     * @param  string  $appId
96
     * @param  string  $channel
97
     * @param  stdClass  $payload
98
     * @return bool
99
     */
100
    public function publish($appId, string $channel, stdClass $payload): bool
101
    {
102
        $payload->appId = $appId;
103
        $payload->serverId = $this->getServerId();
104
105
        $payload = json_encode($payload);
106
107
        $this->publishClient->__call('publish', ["{$appId}:{$channel}", $payload]);
108
109
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_PUBLISHED, [
110
            'channel' => $channel,
111
            'serverId' => $this->getServerId(),
112
            'payload' => $payload,
113
            'pubsub' => "{$appId}:{$channel}",
114
        ]);
115
116
        return true;
117
    }
118
119
    /**
120
     * Subscribe to a channel on behalf of websocket user.
121
     *
122
     * @param  string  $appId
123
     * @param  string  $channel
124
     * @return bool
125
     */
126
    public function subscribe($appId, string $channel): bool
127
    {
128
        if (! isset($this->subscribedChannels["{$appId}:{$channel}"])) {
129
            // We're not subscribed to the channel yet, subscribe and set the count to 1
130
            $this->subscribeClient->__call('subscribe', ["{$appId}:{$channel}"]);
131
            $this->subscribedChannels["{$appId}:{$channel}"] = 1;
132
        } else {
133
            // Increment the subscribe count if we've already subscribed
134
            $this->subscribedChannels["{$appId}:{$channel}"]++;
135
        }
136
137
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_SUBSCRIBED, [
138
            'channel' => $channel,
139
            'serverId' => $this->getServerId(),
140
            'pubsub' => "{$appId}:{$channel}",
141
        ]);
142
143
        return true;
144
    }
145
146
    /**
147
     * Unsubscribe from a channel on behalf of a websocket user.
148
     *
149
     * @param  string  $appId
150
     * @param  string  $channel
151
     * @return bool
152
     */
153
    public function unsubscribe($appId, string $channel): bool
154
    {
155
        if (! isset($this->subscribedChannels["{$appId}:{$channel}"])) {
156
            return false;
157
        }
158
159
        // Decrement the subscription count for this channel
160
        $this->subscribedChannels["{$appId}:{$channel}"]--;
161
162
        // If we no longer have subscriptions to that channel, unsubscribe
163
        if ($this->subscribedChannels["{$appId}:{$channel}"] < 1) {
164
            $this->subscribeClient->__call('unsubscribe', ["{$appId}:{$channel}"]);
165
166
            unset($this->subscribedChannels["{$appId}:{$channel}"]);
167
        }
168
169
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_UNSUBSCRIBED, [
170
            'channel' => $channel,
171
            'serverId' => $this->getServerId(),
172
            'pubsub' => "{$appId}:{$channel}",
173
        ]);
174
175
        return true;
176
    }
177
178
    /**
179
     * Add a member to a channel. To be called when they have
180
     * subscribed to the channel.
181
     *
182
     * @param  string  $appId
183
     * @param  string  $channel
184
     * @param  string  $socketId
185
     * @param  string  $data
186
     * @return void
187
     */
188 View Code Duplication
    public function joinChannel($appId, string $channel, string $socketId, string $data)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
189
    {
190
        $this->publishClient->__call('hset', ["{$appId}:{$channel}", $socketId, $data]);
191
192
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_JOINED_CHANNEL, [
193
            'channel' => $channel,
194
            'serverId' => $this->getServerId(),
195
            'socketId' => $socketId,
196
            'data' => $data,
197
            'pubsub' => "{$appId}:{$channel}",
198
        ]);
199
    }
200
201
    /**
202
     * Remove a member from the channel. To be called when they have
203
     * unsubscribed from the channel.
204
     *
205
     * @param  string  $appId
206
     * @param  string  $channel
207
     * @param  string  $socketId
208
     * @return void
209
     */
210 View Code Duplication
    public function leaveChannel($appId, string $channel, string $socketId)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
211
    {
212
        $this->publishClient->__call('hdel', ["{$appId}:{$channel}", $socketId]);
213
214
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_LEFT_CHANNEL, [
215
            'channel' => $channel,
216
            'serverId' => $this->getServerId(),
217
            'socketId' => $socketId,
218
            'pubsub' => "{$appId}:{$channel}",
219
        ]);
220
    }
221
222
    /**
223
     * Retrieve the full information about the members in a presence channel.
224
     *
225
     * @param  string  $appId
226
     * @param  string  $channel
227
     * @return PromiseInterface
228
     */
229
    public function channelMembers($appId, string $channel): PromiseInterface
230
    {
231
        return $this->publishClient->__call('hgetall', ["{$appId}:{$channel}"])
232
            ->then(function ($members) {
233
                // The data is expected as objects, so we need to JSON decode
234
                return array_map(function ($user) {
235
                    return json_decode($user);
236
                }, $members);
237
            });
238
    }
239
240
    /**
241
     * Get the amount of users subscribed for each presence channel.
242
     *
243
     * @param  string  $appId
244
     * @param  array  $channelNames
245
     * @return PromiseInterface
246
     */
247
    public function channelMemberCounts($appId, array $channelNames): PromiseInterface
248
    {
249
        $this->publishClient->__call('multi', []);
250
251
        foreach ($channelNames as $channel) {
252
            $this->publishClient->__call('hlen', ["{$appId}:{$channel}"]);
253
        }
254
255
        return $this->publishClient->__call('exec', [])
256
            ->then(function ($data) use ($channelNames) {
257
                return array_combine($channelNames, $data);
258
            });
259
    }
260
261
    /**
262
     * Handle a message received from Redis on a specific channel.
263
     *
264
     * @param  string  $redisChannel
265
     * @param  string  $payload
266
     * @return void
267
     */
268
    public function onMessage(string $redisChannel, string $payload)
269
    {
270
        $payload = json_decode($payload);
271
272
        // Ignore messages sent by ourselves.
273
        if (isset($payload->serverId) && $this->getServerId() === $payload->serverId) {
274
            return;
275
        }
276
277
        // Pull out the app ID. See RedisPusherBroadcaster
278
        $appId = $payload->appId;
279
280
        // We need to put the channel name in the payload.
281
        // We strip the app ID from the channel name, websocket clients
282
        // expect the channel name to not include the app ID.
283
        $payload->channel = Str::after($redisChannel, "{$appId}:");
284
285
        $channelManager = app(ChannelManager::class);
286
287
        // Load the Channel instance to sync.
288
        $channel = $channelManager->find($appId, $payload->channel);
289
290
        // If no channel is found, none of our connections want to
291
        // receive this message, so we ignore it.
292
        if (! $channel) {
293
            return;
294
        }
295
296
        $socket = $payload->socket ?? null;
297
        $serverId = $payload->serverId ?? null;
298
299
        // Remove fields intended for internal use from the payload.
300
        unset($payload->socket);
301
        unset($payload->serverId);
302
        unset($payload->appId);
303
304
        // Push the message out to connected websocket clients.
305
        $channel->broadcastToEveryoneExcept($payload, $socket, $appId, false);
306
307
        DashboardLogger::log($appId, DashboardLogger::TYPE_REPLICATOR_MESSAGE_RECEIVED, [
308
            'channel' => $channel->getChannelName(),
309
            'redisChannel' => $redisChannel,
310
            'serverId' => $this->getServerId(),
311
            'incomingServerId' => $serverId,
312
            'incomingSocketId' => $socket,
313
            'payload' => $payload,
314
        ]);
315
    }
316
317
    /**
318
     * Build the Redis connection URL from Laravel database config.
319
     *
320
     * @return string
321
     */
322
    protected function getConnectionUri()
323
    {
324
        $name = config('websockets.replication.redis.connection') ?: 'default';
325
        $config = config('database.redis')[$name];
326
327
        $host = $config['host'];
328
        $port = $config['port'] ?: 6379;
329
330
        $query = [];
331
332
        if ($config['password']) {
333
            $query['password'] = $config['password'];
334
        }
335
336
        if ($config['database']) {
337
            $query['database'] = $config['database'];
338
        }
339
340
        $query = http_build_query($query);
341
342
        return "redis://{$host}:{$port}".($query ? "?{$query}" : '');
343
    }
344
345
    /**
346
     * Get the Subscribe client instance.
347
     *
348
     * @return Client
349
     */
350
    public function getSubscribeClient()
351
    {
352
        return $this->subscribeClient;
353
    }
354
355
    /**
356
     * Get the Publish client instance.
357
     *
358
     * @return Client
359
     */
360
    public function getPublishClient()
361
    {
362
        return $this->publishClient;
363
    }
364
365
    /**
366
     * Get the unique identifier for the server.
367
     *
368
     * @return string
369
     */
370
    public function getServerId()
371
    {
372
        return $this->serverId;
373
    }
374
}
375