Completed
Pull Request — master (#140)
by
unknown
06:57
created

RedisClient::publish()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 9
rs 9.9666
c 0
b 0
f 0
cc 1
nc 1
nop 3
1
<?php
2
3
namespace BeyondCode\LaravelWebSockets\PubSub\Redis;
4
5
use stdClass;
6
use Illuminate\Support\Str;
7
use Clue\React\Redis\Client;
8
use Clue\React\Redis\Factory;
9
use React\EventLoop\LoopInterface;
10
use BeyondCode\LaravelWebSockets\PubSub\ReplicationInterface;
11
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
12
13
class RedisClient implements ReplicationInterface
14
{
15
    /**
16
     * @var LoopInterface
17
     */
18
    protected $loop;
19
20
    /**
21
     * @var string
22
     */
23
    protected $serverId;
24
25
    /**
26
     * @var Client
27
     */
28
    protected $publishClient;
29
30
    /**
31
     * @var Client
32
     */
33
    protected $subscribeClient;
34
35
    /**
36
     * Mapping of subscribed channels, where the key is the channel name,
37
     * and the value is the amount of connections which are subscribed to
38
     * that channel. Used to keep track of whether we still need to stay
39
     * subscribed to those channels with Redis.
40
     *
41
     * @var int[]
42
     */
43
    protected $subscribedChannels = [];
44
45
    /**
46
     * RedisClient constructor.
47
     */
48
    public function __construct()
49
    {
50
        $this->serverId = Str::uuid()->toString();
51
    }
52
53
    /**
54
     * Boot the RedisClient, initializing the connections
55
     *
56
     * @param LoopInterface $loop
57
     * @return ReplicationInterface
58
     */
59
    public function boot(LoopInterface $loop): ReplicationInterface
60
    {
61
        $this->loop = $loop;
62
63
        $connectionUri = $this->getConnectionUri();
64
        $factory = new Factory($this->loop);
65
66
        $this->publishClient = $factory->createLazyClient($connectionUri);
67
        $this->subscribeClient = $factory->createLazyClient($connectionUri);
68
69
        $this->subscribeClient->on('message', function ($channel, $payload) {
70
            $this->onMessage($channel, $payload);
71
        });
72
73
        return $this;
74
    }
75
76
    /**
77
     * Handle a message received from Redis on a specific channel
78
     *
79
     * @param string $redisChannel
80
     * @param string $payload
81
     * @return bool
82
     */
83
    protected function onMessage(string $redisChannel, string $payload)
84
    {
85
        $payload = json_decode($payload);
86
87
        // Ignore messages sent by ourselves
88
        if (isset($payload->serverId) && $this->serverId === $payload->serverId) {
89
            return false;
90
        }
91
92
        // We need to put the channel name in the payload
93
        $payload->channel = $redisChannel;
94
95
        /* @var $channelManager ChannelManager */
96
        $channelManager = app(ChannelManager::class);
97
98
        // Load the Channel instance, if any
99
        $channel = $channelManager->find($payload->appId, $payload->channel);
100
        if ($channel === null) {
101
            return false;
102
        }
103
104
        $socket = $payload->socket;
105
106
        // Remove the internal keys from the payload
107
        unset($payload->socket);
108
        unset($payload->serverId);
109
        unset($payload->appId);
110
111
        // Push the message out to connected websocket clients
112
        $channel->broadcastToEveryoneExcept($payload, $socket);
113
114
        return true;
115
    }
116
117
    /**
118
     * Subscribe to a channel on behalf of websocket user
119
     *
120
     * @param string $appId
121
     * @param string $channel
122
     * @return bool
123
     */
124
    public function subscribe(string $appId, string $channel): bool
125
    {
126
        if (! isset($this->subscribedChannels[$channel])) {
127
            // We're not subscribed to the channel yet, subscribe and set the count to 1
128
            $this->subscribeClient->__call('subscribe', [$channel]);
129
            $this->subscribedChannels[$channel] = 1;
130
        } else {
131
            // Increment the subscribe count if we've already subscribed
132
            $this->subscribedChannels[$channel]++;
133
        }
134
135
        return true;
136
    }
137
138
    /**
139
     * Unsubscribe from a channel on behalf of a websocket user
140
     *
141
     * @param string $appId
142
     * @param string $channel
143
     * @return bool
144
     */
145
    public function unsubscribe(string $appId, string $channel): bool
146
    {
147
        if (! isset($this->subscribedChannels[$channel])) {
148
            return false;
149
        }
150
151
        // Decrement the subscription count for this channel
152
        $this->subscribedChannels[$channel]--;
153
154
        // If we no longer have subscriptions to that channel, unsubscribe
155
        if ($this->subscribedChannels[$channel] < 1) {
156
            $this->subscribeClient->__call('unsubscribe', [$channel]);
157
            unset($this->subscribedChannels[$channel]);
158
        }
159
160
        return true;
161
    }
162
163
    /**
164
     * Publish a message to a channel on behalf of a websocket user
165
     *
166
     * @param string $appId
167
     * @param string $channel
168
     * @param stdClass $payload
169
     * @return bool
170
     */
171
    public function publish(string $appId, string $channel, stdClass $payload): bool
172
    {
173
        $payload->appId = $appId;
174
        $payload->serverId = $this->serverId;
175
176
        $this->publishClient->__call('publish', [$channel, json_encode($payload)]);
177
178
        return true;
179
    }
180
181
    /**
182
     * Build the Redis connection URL from Laravel database config
183
     *
184
     * @return string
185
     */
186
    protected function getConnectionUri()
187
    {
188
        $name = config('websockets.replication.connection') ?? 'default';
189
        $config = config("database.redis.$name");
190
        $host = $config['host'];
191
        $port = $config['port'] ? (':' . $config['port']) : ':6379';
192
193
        $query = [];
194
        if ($config['password']) {
195
            $query['password'] = $config['password'];
196
        }
197
        if ($config['database']) {
198
            $query['database'] = $config['database'];
199
        }
200
        $query = http_build_query($query);
201
202
        return "redis://$host$port".($query ? '?'.$query : '');
203
    }
204
}
205