Completed
Pull Request — master (#450)
by Marcel
03:00 queued 01:39
created

RedisClient   A

Complexity

Total Complexity 22

Size/Duplication

Total Lines 263
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Importance

Changes 0
Metric Value
wmc 22
lcom 1
cbo 6
dl 0
loc 263
rs 10
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A boot() 0 16 1
A onMessage() 0 39 4
A subscribe() 0 13 2
A unsubscribe() 0 17 3
A publish() 0 9 1
A joinChannel() 0 4 1
A leaveChannel() 0 4 1
A channelMembers() 0 10 1
A channelMemberCounts() 0 13 2
A getConnectionUri() 0 18 5
1
<?php
2
3
namespace BeyondCode\LaravelWebSockets\PubSub\Drivers;
4
5
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
6
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
7
use Clue\React\Redis\Client;
8
use Clue\React\Redis\Factory;
9
use Illuminate\Support\Str;
10
use React\EventLoop\LoopInterface;
11
use React\Promise\PromiseInterface;
12
use stdClass;
13
14
class RedisClient implements ReplicationInterface
15
{
16
    /**
17
     * @var LoopInterface
18
     */
19
    protected $loop;
20
21
    /**
22
     * @var string
23
     */
24
    protected $serverId;
25
26
    /**
27
     * @var Client
28
     */
29
    protected $publishClient;
30
31
    /**
32
     * @var Client
33
     */
34
    protected $subscribeClient;
35
36
    /**
37
     * Mapping of subscribed channels, where the key is the channel name,
38
     * and the value is the amount of connections which are subscribed to
39
     * that channel. Used to keep track of whether we still need to stay
40
     * subscribed to those channels with Redis.
41
     *
42
     * @var int[]
43
     */
44
    protected $subscribedChannels = [];
45
46
    /**
47
     * RedisClient constructor.
48
     */
49
    public function __construct()
50
    {
51
        $this->serverId = Str::uuid()->toString();
52
    }
53
54
    /**
55
     * Boot the RedisClient, initializing the connections.
56
     *
57
     * @param LoopInterface $loop
58
     * @return ReplicationInterface
59
     */
60
    public function boot(LoopInterface $loop): ReplicationInterface
61
    {
62
        $this->loop = $loop;
63
64
        $connectionUri = $this->getConnectionUri();
65
        $factory = new Factory($this->loop);
66
67
        $this->publishClient = $factory->createLazyClient($connectionUri);
68
        $this->subscribeClient = $factory->createLazyClient($connectionUri);
69
70
        $this->subscribeClient->on('message', function ($channel, $payload) {
71
            $this->onMessage($channel, $payload);
72
        });
73
74
        return $this;
75
    }
76
77
    /**
78
     * Handle a message received from Redis on a specific channel.
79
     *
80
     * @param string $redisChannel
81
     * @param string $payload
82
     */
83
    protected function onMessage(string $redisChannel, string $payload)
84
    {
85
        $payload = json_decode($payload);
86
87
        // Ignore messages sent by ourselves
88
        if (isset($payload->serverId) && $this->serverId === $payload->serverId) {
89
            return;
90
        }
91
92
        // Pull out the app ID. See RedisPusherBroadcaster
93
        $appId = $payload->appId;
94
95
        // We need to put the channel name in the payload.
96
        // We strip the app ID from the channel name, websocket clients
97
        // expect the channel name to not include the app ID.
98
        $payload->channel = Str::after($redisChannel, "$appId:");
99
100
        /* @var ChannelManager $channelManager */
101
        $channelManager = app(ChannelManager::class);
102
103
        // Load the Channel instance, if any
104
        $channel = $channelManager->find($appId, $payload->channel);
105
106
        // If no channel is found, none of our connections want to
107
        // receive this message, so we ignore it.
108
        if (! $channel) {
109
            return;
110
        }
111
112
        $socket = $payload->socket ?? null;
113
114
        // Remove fields intended for internal use from the payload
115
        unset($payload->socket);
116
        unset($payload->serverId);
117
        unset($payload->appId);
118
119
        // Push the message out to connected websocket clients
120
        $channel->broadcastToEveryoneExcept($payload, $socket, $appId, false);
121
    }
122
123
    /**
124
     * Subscribe to a channel on behalf of websocket user.
125
     *
126
     * @param string $appId
127
     * @param string $channel
128
     * @return bool
129
     */
130
    public function subscribe(string $appId, string $channel): bool
131
    {
132
        if (! isset($this->subscribedChannels["$appId:$channel"])) {
133
            // We're not subscribed to the channel yet, subscribe and set the count to 1
134
            $this->subscribeClient->__call('subscribe', ["$appId:$channel"]);
135
            $this->subscribedChannels["$appId:$channel"] = 1;
136
        } else {
137
            // Increment the subscribe count if we've already subscribed
138
            $this->subscribedChannels["$appId:$channel"]++;
139
        }
140
141
        return true;
142
    }
143
144
    /**
145
     * Unsubscribe from a channel on behalf of a websocket user.
146
     *
147
     * @param string $appId
148
     * @param string $channel
149
     * @return bool
150
     */
151
    public function unsubscribe(string $appId, string $channel): bool
152
    {
153
        if (! isset($this->subscribedChannels["$appId:$channel"])) {
154
            return false;
155
        }
156
157
        // Decrement the subscription count for this channel
158
        $this->subscribedChannels["$appId:$channel"]--;
159
160
        // If we no longer have subscriptions to that channel, unsubscribe
161
        if ($this->subscribedChannels["$appId:$channel"] < 1) {
162
            $this->subscribeClient->__call('unsubscribe', ["$appId:$channel"]);
163
            unset($this->subscribedChannels["$appId:$channel"]);
164
        }
165
166
        return true;
167
    }
168
169
    /**
170
     * Publish a message to a channel on behalf of a websocket user.
171
     *
172
     * @param string $appId
173
     * @param string $channel
174
     * @param stdClass $payload
175
     * @return bool
176
     */
177
    public function publish(string $appId, string $channel, stdClass $payload): bool
178
    {
179
        $payload->appId = $appId;
180
        $payload->serverId = $this->serverId;
181
182
        $this->publishClient->__call('publish', ["$appId:$channel", json_encode($payload)]);
183
184
        return true;
185
    }
186
187
    /**
188
     * Add a member to a channel. To be called when they have
189
     * subscribed to the channel.
190
     *
191
     * @param string $appId
192
     * @param string $channel
193
     * @param string $socketId
194
     * @param string $data
195
     */
196
    public function joinChannel(string $appId, string $channel, string $socketId, string $data)
197
    {
198
        $this->publishClient->__call('hset', ["$appId:$channel", $socketId, $data]);
199
    }
200
201
    /**
202
     * Remove a member from the channel. To be called when they have
203
     * unsubscribed from the channel.
204
     *
205
     * @param string $appId
206
     * @param string $channel
207
     * @param string $socketId
208
     */
209
    public function leaveChannel(string $appId, string $channel, string $socketId)
210
    {
211
        $this->publishClient->__call('hdel', ["$appId:$channel", $socketId]);
212
    }
213
214
    /**
215
     * Retrieve the full information about the members in a presence channel.
216
     *
217
     * @param string $appId
218
     * @param string $channel
219
     * @return PromiseInterface
220
     */
221
    public function channelMembers(string $appId, string $channel): PromiseInterface
222
    {
223
        return $this->publishClient->__call('hgetall', ["$appId:$channel"])
224
            ->then(function ($members) {
225
                // The data is expected as objects, so we need to JSON decode
226
                return array_map(function ($user) {
227
                    return json_decode($user);
228
                }, $members);
229
            });
230
    }
231
232
    /**
233
     * Get the amount of users subscribed for each presence channel.
234
     *
235
     * @param string $appId
236
     * @param array $channelNames
237
     * @return PromiseInterface
238
     */
239
    public function channelMemberCounts(string $appId, array $channelNames): PromiseInterface
240
    {
241
        $this->publishClient->__call('multi', []);
242
243
        foreach ($channelNames as $channel) {
244
            $this->publishClient->__call('hlen', ["$appId:$channel"]);
245
        }
246
247
        return $this->publishClient->__call('exec', [])
248
            ->then(function ($data) use ($channelNames) {
249
                return array_combine($channelNames, $data);
250
            });
251
    }
252
253
    /**
254
     * Build the Redis connection URL from Laravel database config.
255
     *
256
     * @return string
257
     */
258
    protected function getConnectionUri()
259
    {
260
        $name = config('websockets.replication.connection') ?? 'default';
261
        $config = config("database.redis.$name");
262
        $host = $config['host'];
263
        $port = $config['port'] ? (':'.$config['port']) : ':6379';
264
265
        $query = [];
266
        if ($config['password']) {
267
            $query['password'] = $config['password'];
268
        }
269
        if ($config['database']) {
270
            $query['database'] = $config['database'];
271
        }
272
        $query = http_build_query($query);
273
274
        return "redis://$host$port".($query ? '?'.$query : '');
275
    }
276
}
277