Completed
Pull Request — master (#140)
by
unknown
01:42
created

RedisClient   A

Complexity

Total Complexity 17

Size/Duplication

Total Lines 197
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 5

Importance

Changes 0
Metric Value
wmc 17
lcom 1
cbo 5
dl 0
loc 197
rs 10
c 0
b 0
f 0

7 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A boot() 0 16 1
A onMessage() 0 39 4
A subscribe() 0 13 2
A unsubscribe() 0 17 3
A publish() 0 9 1
A getConnectionUri() 0 18 5
1
<?php
2
3
namespace BeyondCode\LaravelWebSockets\PubSub\Redis;
4
5
use stdClass;
6
use Illuminate\Support\Str;
7
use Clue\React\Redis\Client;
8
use Clue\React\Redis\Factory;
9
use React\EventLoop\LoopInterface;
10
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
11
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
12
13
class RedisClient implements ReplicationInterface
14
{
15
    /**
16
     * @var LoopInterface
17
     */
18
    protected $loop;
19
20
    /**
21
     * @var string
22
     */
23
    protected $serverId;
24
25
    /**
26
     * @var Client
27
     */
28
    protected $publishClient;
29
30
    /**
31
     * @var Client
32
     */
33
    protected $subscribeClient;
34
35
    /**
36
     * Mapping of subscribed channels, where the key is the channel name,
37
     * and the value is the amount of connections which are subscribed to
38
     * that channel. Used to keep track of whether we still need to stay
39
     * subscribed to those channels with Redis.
40
     *
41
     * @var int[]
42
     */
43
    protected $subscribedChannels = [];
44
45
    /**
46
     * RedisClient constructor.
47
     */
48
    public function __construct()
49
    {
50
        $this->serverId = Str::uuid()->toString();
51
    }
52
53
    /**
54
     * Boot the RedisClient, initializing the connections.
55
     *
56
     * @param LoopInterface $loop
57
     * @return ReplicationInterface
58
     */
59
    public function boot(LoopInterface $loop): ReplicationInterface
60
    {
61
        $this->loop = $loop;
62
63
        $connectionUri = $this->getConnectionUri();
64
        $factory = new Factory($this->loop);
65
66
        $this->publishClient = $factory->createLazyClient($connectionUri);
67
        $this->subscribeClient = $factory->createLazyClient($connectionUri);
68
69
        $this->subscribeClient->on('message', function ($channel, $payload) {
70
            $this->onMessage($channel, $payload);
71
        });
72
73
        return $this;
74
    }
75
76
    /**
77
     * Handle a message received from Redis on a specific channel.
78
     *
79
     * @param string $redisChannel
80
     * @param string $payload
81
     */
82
    protected function onMessage(string $redisChannel, string $payload)
83
    {
84
        $payload = json_decode($payload);
85
86
        // Ignore messages sent by ourselves
87
        if (isset($payload->serverId) && $this->serverId === $payload->serverId) {
88
            return;
89
        }
90
91
        // Pull out the app ID. See RedisPusherBroadcaster
92
        $appId = $payload->appId;
93
94
        // We need to put the channel name in the payload.
95
        // We strip the app ID from the channel name, websocket clients
96
        // expect the channel name to not include the app ID.
97
        $payload->channel = Str::after($redisChannel, "$appId:");
98
99
        /* @var $channelManager ChannelManager */
100
        $channelManager = app(ChannelManager::class);
101
102
        // Load the Channel instance, if any
103
        $channel = $channelManager->find($appId, $payload->channel);
104
105
        // If no channel is found, none of our connections want to
106
        // receive this message, so we ignore it.
107
        if (! $channel) {
108
            return;
109
        }
110
111
        $socket = $payload->socket ?? null;
112
113
        // Remove fields intended for internal use from the payload
114
        unset($payload->socket);
115
        unset($payload->serverId);
116
        unset($payload->appId);
117
118
        // Push the message out to connected websocket clients
119
        $channel->broadcastToEveryoneExcept($payload, $socket);
120
    }
121
122
    /**
123
     * Subscribe to a channel on behalf of websocket user.
124
     *
125
     * @param string $appId
126
     * @param string $channel
127
     * @return bool
128
     */
129
    public function subscribe(string $appId, string $channel): bool
130
    {
131
        if (! isset($this->subscribedChannels["$appId:$channel"])) {
132
            // We're not subscribed to the channel yet, subscribe and set the count to 1
133
            $this->subscribeClient->__call('subscribe', ["$appId:$channel"]);
134
            $this->subscribedChannels["$appId:$channel"] = 1;
135
        } else {
136
            // Increment the subscribe count if we've already subscribed
137
            $this->subscribedChannels["$appId:$channel"]++;
138
        }
139
140
        return true;
141
    }
142
143
    /**
144
     * Unsubscribe from a channel on behalf of a websocket user.
145
     *
146
     * @param string $appId
147
     * @param string $channel
148
     * @return bool
149
     */
150
    public function unsubscribe(string $appId, string $channel): bool
151
    {
152
        if (! isset($this->subscribedChannels["$appId:$channel"])) {
153
            return false;
154
        }
155
156
        // Decrement the subscription count for this channel
157
        $this->subscribedChannels["$appId:$channel"]--;
158
159
        // If we no longer have subscriptions to that channel, unsubscribe
160
        if ($this->subscribedChannels["$appId:$channel"] < 1) {
161
            $this->subscribeClient->__call('unsubscribe', ["$appId:$channel"]);
162
            unset($this->subscribedChannels["$appId:$channel"]);
163
        }
164
165
        return true;
166
    }
167
168
    /**
169
     * Publish a message to a channel on behalf of a websocket user.
170
     *
171
     * @param string $appId
172
     * @param string $channel
173
     * @param stdClass $payload
174
     * @return bool
175
     */
176
    public function publish(string $appId, string $channel, stdClass $payload): bool
177
    {
178
        $payload->appId = $appId;
179
        $payload->serverId = $this->serverId;
180
181
        $this->publishClient->__call('publish', ["$appId:$channel", json_encode($payload)]);
182
183
        return true;
184
    }
185
186
    /**
187
     * Build the Redis connection URL from Laravel database config.
188
     *
189
     * @return string
190
     */
191
    protected function getConnectionUri()
192
    {
193
        $name = config('websockets.replication.connection') ?? 'default';
194
        $config = config("database.redis.$name");
195
        $host = $config['host'];
196
        $port = $config['port'] ? (':'.$config['port']) : ':6379';
197
198
        $query = [];
199
        if ($config['password']) {
200
            $query['password'] = $config['password'];
201
        }
202
        if ($config['database']) {
203
            $query['database'] = $config['database'];
204
        }
205
        $query = http_build_query($query);
206
207
        return "redis://$host$port".($query ? '?'.$query : '');
208
    }
209
}
210