Passed
Push — master ( c66900...e3cbf5 )
by Rafael
06:44
created

RedisPubSubHandler   A

Complexity

Total Complexity 22

Size/Duplication

Total Lines 172
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 6
Bugs 2 Features 0
Metric Value
wmc 22
eloc 61
c 6
b 2
f 0
dl 0
loc 172
ccs 0
cts 96
cp 0
rs 10

9 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 9 1
A consume() 0 31 5
A pub() 0 3 1
A clear() 0 5 2
A unprefix() 0 3 1
A del() 0 5 2
A exists() 0 11 3
A sub() 0 11 3
A touch() 0 7 4
1
<?php
2
/*******************************************************************************
3
 *  This file is part of the GraphQL Bundle package.
4
 *
5
 *  (c) YnloUltratech <[email protected]>
6
 *
7
 *  For the full copyright and license information, please view the LICENSE
8
 *  file that was distributed with this source code.
9
 ******************************************************************************/
10
11
namespace Ynlo\GraphQLBundle\Subscription\PubSub;
12
13
class RedisPubSubHandler implements PubSubHandlerInterface
14
{
15
    /**
16
     * @var string
17
     */
18
    protected $redisHost;
19
20
    /**
21
     * @var string
22
     */
23
    protected $redisPort;
24
25
    /**
26
     * @var \Redis
27
     */
28
    protected $client;
29
30
    /**
31
     * @var \Redis
32
     */
33
    protected $consumer;
34
35
    /**
36
     * @var string
37
     */
38
    protected $prefix;
39
40
    /**
41
     * RedisPubSubHandler constructor.
42
     *
43
     * @param array $config
44
     */
45
    public function __construct(array $config)
46
    {
47
        $this->redisHost = $config['host'] ?? 'localhost';
48
        $this->redisPort = $config['port'] ?? 6379;
49
        $this->prefix = $config['prefix'] ?? 'GraphQLSubscription:';
50
51
        $this->client = new \Redis();
52
        $this->client->connect($this->redisHost, $this->redisPort);
53
        $this->client->setOption(\Redis::OPT_PREFIX, $this->prefix);
54
    }
55
56
    /**
57
     * @inheritDoc
58
     */
59
    public function sub(string $channel, string $id, array $meta, \DateTime $expireAt = null): void
60
    {
61
        $key = sprintf('%s:%s', $channel, $id);
62
        $alreadyExists = $this->exists($id);
63
        $this->client->set($key, serialize($meta));
64
65
        if (!$alreadyExists && $expireAt) {
66
            $this->client->expireAt($key, $expireAt->format('U'));
0 ignored issues
show
Bug introduced by
$expireAt->format('U') of type string is incompatible with the type integer expected by parameter $timestamp of Redis::expireAt(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

66
            $this->client->expireAt($key, /** @scrutinizer ignore-type */ $expireAt->format('U'));
Loading history...
67
        }
68
69
        $iterator = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $iterator is dead and can be removed.
Loading history...
70
    }
71
72
    /**
73
     * @inheritDoc
74
     */
75
    public function pub(string $channel, array $filters = [], array $data = []): void
76
    {
77
        $this->client->publish($channel, serialize([$filters, $data]));
78
    }
79
80
    /**
81
     * @inheritDoc
82
     */
83
    public function touch(string $id): void
84
    {
85
        $iterator = null;
86
        while ($iterator !== 0) {
87
            while ($keys = $this->client->scan($iterator, "*:$id*")) {
88
                foreach ($keys as $key) {
89
                    $this->client->persist($this->unprefix($key));
90
                }
91
            }
92
        }
93
    }
94
95
    /**
96
     * @inheritDoc
97
     */
98
    public function del(string $id): void
99
    {
100
        $iterator = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $iterator is dead and can be removed.
Loading history...
101
        foreach ($this->client->keys("*:$id") as $key) {
102
            $this->client->del($this->unprefix($key));
103
        }
104
    }
105
106
    /**
107
     * @inheritDoc
108
     */
109
    public function clear(): void
110
    {
111
        $iterator = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $iterator is dead and can be removed.
Loading history...
112
        foreach ($this->client->keys('*') as $key) {
113
            $this->client->del($this->unprefix($key));
114
        }
115
    }
116
117
    /**
118
     * @inheritDoc
119
     */
120
    public function exists(string $id): bool
121
    {
122
        $iterator = null;
123
        while ($iterator !== 0) {
124
            $keys = $this->client->scan($iterator, "*:$id*");
125
            if (!empty($keys)) {
126
                return true;
127
            }
128
        }
129
130
        return false;
131
    }
132
133
    /**
134
     * @inheritDoc
135
     */
136
    public function consume(array $channels, callable $dispatch): void
137
    {
138
        $this->consumer = new \Redis();
139
        // the timeout is specified to avoid redis connection error after some time running the consumer
140
        // the `default_socket_timeout` to -1 like described here https://github.com/phpredis/phpredis/issues/70
141
        // can't be used because create a conflict with others sock open functions like used in Ynlo\GraphQLBundle\Subscription\SubscriptionManager::sendRequest
142
        $this->consumer->connect($this->redisHost, $this->redisPort, 0, null, 0, 100000);
0 ignored issues
show
Bug introduced by
$this->redisPort of type string is incompatible with the type integer expected by parameter $port of Redis::connect(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

142
        $this->consumer->connect($this->redisHost, /** @scrutinizer ignore-type */ $this->redisPort, 0, null, 0, 100000);
Loading history...
143
        $this->consumer->setOption(\Redis::OPT_READ_TIMEOUT, 1000000);
144
        $this->consumer->setOption(\Redis::OPT_PREFIX, $this->prefix);
145
        $this->consumer->subscribe(
146
            $channels,
147
            function (\Redis $redis, $chan, $event) use ($dispatch) {
0 ignored issues
show
Bug introduced by
function(...) { /* ... */ } of type callable is incompatible with the type array|string expected by parameter $callback of Redis::subscribe(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

147
            /** @scrutinizer ignore-type */ function (\Redis $redis, $chan, $event) use ($dispatch) {
Loading history...
148
                $iterator = null;
149
                while ($iterator !== 0) {
150
                    while ($keys = $this->client->scan($iterator, "*$chan:*", 100)) {
151
                        [$filters, $data] = unserialize($event, [true]);
152
                        foreach ($keys as $key) {
153
                            $key = $this->unprefix($key);
154
                            $chan = $this->unprefix($chan);
155
                            $meta = $this->client->get($key);
156
                            preg_match("/$chan:(.+)/", $key, $matches);
157
                            if ($matches) {
158
                                $message = new SubscriptionMessage(
159
                                    $chan,
160
                                    $matches[1],
161
                                    $data,
162
                                    $filters,
163
                                    unserialize($meta, [true])
164
                                );
165
166
                                $dispatch($message);
167
                            }
168
                        }
169
                    }
170
                }
171
            }
172
        );
173
    }
174
175
    /**
176
     * Helper to remove prefix
177
     *
178
     * @param string $key
179
     *
180
     * @return string
181
     */
182
    private function unprefix($key): string
183
    {
184
        return str_replace($this->prefix, null, $key);
185
    }
186
}
187