Passed
Push — master ( 2b3765...b8576e )
by Rafael
05:42
created

RedisPubSubHandler::consume()   B

Complexity

Conditions 6
Paths 1

Size

Total Lines 32
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 42

Importance

Changes 5
Bugs 3 Features 0
Metric Value
eloc 25
c 5
b 3
f 0
dl 0
loc 32
ccs 0
cts 28
cp 0
rs 8.8977
cc 6
nc 1
nop 2
crap 42
1
<?php
2
/*******************************************************************************
3
 *  This file is part of the GraphQL Bundle package.
4
 *
5
 *  (c) YnloUltratech <[email protected]>
6
 *
7
 *  For the full copyright and license information, please view the LICENSE
8
 *  file that was distributed with this source code.
9
 ******************************************************************************/
10
11
namespace Ynlo\GraphQLBundle\Subscription\PubSub;
12
13
class RedisPubSubHandler implements PubSubHandlerInterface
14
{
15
    /**
16
     * @var string
17
     */
18
    protected $redisHost;
19
20
    /**
21
     * @var string
22
     */
23
    protected $redisPort;
24
25
    /**
26
     * @var \Redis
27
     */
28
    protected $client;
29
30
    /**
31
     * @var \Redis
32
     */
33
    protected $consumer;
34
35
    /**
36
     * @var string
37
     */
38
    protected $prefix;
39
40
    /**
41
     * RedisPubSubHandler constructor.
42
     *
43
     * @param array $config
44
     */
45
    public function __construct(array $config)
46
    {
47
        $this->redisHost = $config['host'] ?? 'localhost';
48
        $this->redisPort = $config['port'] ?? 6379;
49
        $this->prefix = $config['prefix'] ?? 'GraphQLSubscription:';
50
51
        $this->client = new \Redis();
52
        $this->client->connect($this->redisHost, $this->redisPort);
53
        $this->client->setOption(\Redis::OPT_PREFIX, $this->prefix);
54
    }
55
56
    /**
57
     * @inheritDoc
58
     */
59
    public function sub(string $channel, string $id, array $meta, \DateTime $expireAt = null): void
60
    {
61
        $key = sprintf('%s:%s', $channel, $id);
62
        $alreadyExists = $this->exists($id);
63
        $this->client->set($key, serialize($meta));
64
65
        if (!$expireAt) {
66
            $expireAt = new \DateTime('+24Hours');
67
        }
68
        if (!$alreadyExists && $expireAt) {
69
            $this->client->expireAt($key, $expireAt->format('U'));
0 ignored issues
show
Bug introduced by
$expireAt->format('U') of type string is incompatible with the type integer expected by parameter $timestamp of Redis::expireAt(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

69
            $this->client->expireAt($key, /** @scrutinizer ignore-type */ $expireAt->format('U'));
Loading history...
70
        }
71
72
        $iterator = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $iterator is dead and can be removed.
Loading history...
73
    }
74
75
    /**
76
     * @inheritDoc
77
     */
78
    public function pub(string $channel, array $filters = [], array $data = []): void
79
    {
80
        $this->client->publish($channel, serialize([$filters, $data]));
81
    }
82
83
    /**
84
     * @inheritDoc
85
     */
86
    public function touch(string $id): void
87
    {
88
        $iterator = null;
89
        while ($iterator !== 0) {
90
            while ($keys = $this->client->scan($iterator, "*:$id*")) {
91
                foreach ($keys as $key) {
92
                    $this->client->expireAt($this->unprefix($key), (new \DateTime('+24Hours'))->format('U'));
0 ignored issues
show
Bug introduced by
new DateTime('+24Hours')->format('U') of type string is incompatible with the type integer expected by parameter $timestamp of Redis::expireAt(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

92
                    $this->client->expireAt($this->unprefix($key), /** @scrutinizer ignore-type */ (new \DateTime('+24Hours'))->format('U'));
Loading history...
93
                }
94
            }
95
        }
96
    }
97
98
    /**
99
     * @inheritDoc
100
     */
101
    public function del(string $id): void
102
    {
103
        $iterator = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $iterator is dead and can be removed.
Loading history...
104
        foreach ($this->client->keys("*:$id") as $key) {
105
            $this->client->del($this->unprefix($key));
106
        }
107
    }
108
109
    /**
110
     * @inheritDoc
111
     */
112
    public function clear(): void
113
    {
114
        $iterator = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $iterator is dead and can be removed.
Loading history...
115
        foreach ($this->client->keys('*') as $key) {
116
            $this->client->del($this->unprefix($key));
117
        }
118
    }
119
120
    /**
121
     * @inheritDoc
122
     */
123
    public function exists(string $id): bool
124
    {
125
        $iterator = null;
126
        while ($iterator !== 0) {
127
            $keys = $this->client->scan($iterator, "*:$id*");
128
            if (!empty($keys)) {
129
                return true;
130
            }
131
        }
132
133
        return false;
134
    }
135
136
    /**
137
     * @inheritDoc
138
     */
139
    public function consume(array $channels, callable $dispatch): void
140
    {
141
        $this->consumer = new \Redis();
142
        // the timeout is specified to avoid redis connection error after some time running the consumer
143
        // the `default_socket_timeout` to -1 like described here https://github.com/phpredis/phpredis/issues/70
144
        // can't be used because create a conflict with others sock open functions like used in Ynlo\GraphQLBundle\Subscription\SubscriptionManager::sendRequest
145
        $this->consumer->connect($this->redisHost, $this->redisPort, 0, null, 0, 100000);
0 ignored issues
show
Bug introduced by
$this->redisPort of type string is incompatible with the type integer expected by parameter $port of Redis::connect(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

145
        $this->consumer->connect($this->redisHost, /** @scrutinizer ignore-type */ $this->redisPort, 0, null, 0, 100000);
Loading history...
146
        $this->consumer->setOption(\Redis::OPT_READ_TIMEOUT, 1000000);
147
        $this->consumer->setOption(\Redis::OPT_PREFIX, $this->prefix);
148
        $this->consumer->subscribe(
149
            $channels,
150
            function (\Redis $redis, $chan, $event) use ($dispatch) {
0 ignored issues
show
Bug introduced by
function(...) { /* ... */ } of type callable is incompatible with the type array|string expected by parameter $callback of Redis::subscribe(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

150
            /** @scrutinizer ignore-type */ function (\Redis $redis, $chan, $event) use ($dispatch) {
Loading history...
151
                $iterator = null;
152
                while ($iterator !== 0) {
153
                    while ($keys = $this->client->scan($iterator, "*$chan:*", 100)) {
154
                        [$filters, $data] = unserialize($event, [true]);
155
                        foreach ($keys as $key) {
156
                            $key = $this->unprefix($key);
157
                            $chan = $this->unprefix($chan);
158
                            $meta = $this->client->get($key);
159
                            preg_match("/$chan:(.+)/", $key, $matches);
160
                            if ($matches) {
161
                                $metaArray = unserialize($meta, [true]);
162
                                $message = new SubscriptionMessage(
163
                                    $chan,
164
                                    $matches[1],
165
                                    $data,
166
                                    $filters,
167
                                    is_array($metaArray) ? $metaArray : []
168
                                );
169
170
                                $dispatch($message);
171
                            }
172
                        }
173
                    }
174
                }
175
            }
176
        );
177
    }
178
179
    /**
180
     * Helper to remove prefix
181
     *
182
     * @param string $key
183
     *
184
     * @return string
185
     */
186
    private function unprefix($key): string
187
    {
188
        return str_replace($this->prefix, null, $key);
189
    }
190
}
191