Completed
Pull Request — master (#61)
by
unknown
10:24
created

RedisClient::subscribe()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 6
c 0
b 0
f 0
rs 10
cc 1
nc 1
nop 1
1
<?php
2
3
namespace BeyondCode\LaravelWebSockets\PubSub\Redis;
4
5
use BeyondCode\LaravelWebSockets\PubSub\PubSubInterface;
6
use BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManager;
7
use Clue\React\Block;
8
use Clue\React\Redis\Client;
9
use Clue\React\Redis\Factory;
10
use Illuminate\Support\Str;
11
use React\EventLoop\LoopInterface;
12
use React\Promise\PromiseInterface;
13
14
class RedisClient implements PubSubInterface
15
{
16
17
    const REDIS_KEY = ':websockets:replication:';
18
    protected $apps;
19
    protected $loop;
20
    protected $serverId;
21
    protected $publishClient;
22
    protected $subscribeClient;
23
24
    public function __construct()
25
    {
26
        $this->apps     = collect(config('websockets.apps'));
27
        $this->serverId = Str::uuid()->toString();
28
    }
29
30
    public function publish(string $appId, array $payload): bool
31
    {
32
        $payload['appId']    = $appId;
33
        $payload['serverId'] = $this->serverId;
34
        $this->publishClient->publish(self::REDIS_KEY, json_encode($payload));
35
        return true;
36
    }
37
38
    public function subscribe(LoopInterface $loop): PubSubInterface
39
    {
40
        $this->loop = $loop;
41
        [$this->publishClient, $this->subscribeClient] = Block\awaitAll([$this->publishConnection(), $this->subscribeConnection()], $this->loop);
42
        return $this->publishClient;
43
    }
44
45
    protected function publishConnection(): PromiseInterface
46
    {
47
        $connectionUri = $this->getConnectionUri();
48
        $factory       = new Factory($this->loop);
49
        return $factory->createClient($connectionUri)->then(
50
            function (Client $client) {
51
                $this->publishClient = $client;
52
                return $this;
53
            }
54
        );
55
    }
56
57
58
    protected function subscribeConnection(): PromiseInterface
59
    {
60
        $connectionUri = $this->getConnectionUri();
61
        $factory       = new Factory($this->loop);
62
        return $factory->createClient($connectionUri)->then(
63
            function (Client $client) {
64
                $this->subscribeClient = $client;
65
                $this->onConnected();
66
                return $this;
67
            }
68
        );
69
    }
70
71
    protected function getConnectionUri()
72
    {
73
        $name   = config('websockets.replication.connection') ?? 'default';
74
        $config = config('database.redis.' . $name);
75
        $host   = $config['host'];
76
        $port   = $config['port'] ? (':' . $config['port']) : ':6379';
77
78
        $query = [];
79
        if ($config['password']) {
80
            $query['password'] = $config['password'];
81
        }
82
        if ($config['database']) {
83
            $query['database'] = $config['database'];
84
        }
85
        $query = http_build_query($query);
86
87
        return "redis://$host$port" . ($query ? '?' . $query : '');
88
    }
89
90
    protected function onConnected()
91
    {
92
        $this->subscribeClient->subscribe(self::REDIS_KEY);
93
        $this->subscribeClient->on('message', function ($channel, $payload) {
94
            $this->onMessage($channel, $payload);
95
        });
96
    }
97
98
    protected function onMessage($channel, $payload)
99
    {
100
        $payload = json_decode($payload);
101
102
        if ($this->serverId === $payload->serverId) {
103
            return false;
104
        }
105
106
        /* @var $channelManager ChannelManager */
107
        $channelManager = app(ChannelManager::class);
108
        $channelSearch        = $channelManager->find($payload->appId, $payload->channel);
109
110
        if ($channelSearch === null) {
111
            return false;
112
        }
113
114
        $channel->broadcast($payload);
115
        return true;
116
    }
117
118
}