Completed
Push — master ( c00ec8...083880 )
by Rafael
03:48
created

RedisPubSubHandler   A

Complexity

Total Complexity 24

Size/Duplication

Total Lines 162
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
wmc 24
eloc 66
dl 0
loc 162
ccs 0
cts 102
cp 0
rs 10
c 0
b 0
f 0

8 Methods

Rating   Name   Duplication   Size   Complexity  
A pub() 0 3 1
A __construct() 0 13 1
A sub() 0 18 5
B consume() 0 33 6
A unprefix() 0 3 1
A del() 0 6 3
A exists() 0 11 3
A touch() 0 8 4
1
<?php
2
/*******************************************************************************
3
 *  This file is part of the GraphQL Bundle package.
4
 *
5
 *  (c) YnloUltratech <[email protected]>
6
 *
7
 *  For the full copyright and license information, please view the LICENSE
8
 *  file that was distributed with this source code.
9
 ******************************************************************************/
10
11
namespace Ynlo\GraphQLBundle\Subscription\PubSub;
12
13
class RedisPubSubHandler implements PubSubHandlerInterface
14
{
15
    /**
16
     * @var \Redis
17
     */
18
    protected $client;
19
20
    /**
21
     * @var \Redis
22
     */
23
    protected $consumer;
24
25
    /**
26
     * @var string
27
     */
28
    protected $prefix;
29
30
    /**
31
     * RedisPubSubHandler constructor.
32
     *
33
     * @param array $config
34
     */
35
    public function __construct(array $config)
36
    {
37
        $host = $config['host'] ?? 'localhost';
38
        $port = $config['port'] ?? 6379;
39
        $this->prefix = $config['prefix'] ?? 'GraphQLSubscription:';
40
41
        $this->client = new \Redis();
42
        $this->client->connect($host, $port, 5);
43
        $this->client->setOption(\Redis::OPT_PREFIX, $this->prefix);
44
45
        $this->consumer = new \Redis();
46
        $this->consumer->connect($host, $port, 5);
47
        $this->consumer->setOption(\Redis::OPT_PREFIX, $this->prefix);
48
    }
49
50
    /**
51
     * @inheritDoc
52
     */
53
    public function sub(string $channel, string $id, array $meta, \DateTime $expireAt = null): void
54
    {
55
        $key = sprintf('%s:%s', $channel, $id);
56
        $alreadyExists = $this->exists($id);
57
        $ttl = $this->client->ttl($key);
58
        $this->client->set($key, serialize($meta));
59
60
        if ($alreadyExists) {
61
            if ($expireAt && $expireAt->getTimestamp() - $ttl > time()) {
62
                $this->client->expireAt($key, $expireAt->format('U'));
0 ignored issues
show
Bug introduced by
$expireAt->format('U') of type string is incompatible with the type integer expected by parameter $timestamp of Redis::expireAt(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

62
                $this->client->expireAt($key, /** @scrutinizer ignore-type */ $expireAt->format('U'));
Loading history...
63
            } else {
64
                $this->client->setTimeout($key, $ttl);
65
            }
66
        } elseif ($expireAt) {
67
            $this->client->expireAt($key, $expireAt->format('U'));
68
        }
69
70
        $iterator = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $iterator is dead and can be removed.
Loading history...
71
    }
72
73
    /**
74
     * @inheritDoc
75
     */
76
    public function pub(string $channel, array $filters = [], array $data = []): void
77
    {
78
        $this->client->publish($channel, serialize([$filters, $data]));
79
    }
80
81
    /**
82
     * @inheritDoc
83
     */
84
    public function touch(string $id, \DateTime $expireAt): void
85
    {
86
        $iterator = null;
87
        while ($iterator !== 0) {
88
            while ($keys = $this->client->scan($iterator, "*:$id*")) {
89
                foreach ($keys as $key) {
90
                    $key = $this->unprefix($key);
91
                    $this->client->expireAt($key, $expireAt->format('U'));
0 ignored issues
show
Bug introduced by
$expireAt->format('U') of type string is incompatible with the type integer expected by parameter $timestamp of Redis::expireAt(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

91
                    $this->client->expireAt($key, /** @scrutinizer ignore-type */ $expireAt->format('U'));
Loading history...
92
                }
93
            }
94
        }
95
    }
96
97
    /**
98
     * @inheritDoc
99
     */
100
    public function del(string $id): void
101
    {
102
        $iterator = null;
103
        while ($iterator !== 0) {
104
            while ($keys = $this->client->scan($iterator, "*:$id*")) {
105
                $this->client->del($keys);
106
            }
107
        }
108
    }
109
110
    /**
111
     * @inheritDoc
112
     */
113
    public function exists(string $id): bool
114
    {
115
        $iterator = null;
116
        while ($iterator !== 0) {
117
            $keys = $this->client->scan($iterator, "*:$id*");
118
            if (!empty($keys)) {
119
                return true;
120
            }
121
        }
122
123
        return false;
124
    }
125
126
    /**
127
     * @inheritDoc
128
     */
129
    public function consume(array $channels, callable $dispatch): void
130
    {
131
        try {
132
            $this->consumer->subscribe(
133
                $channels,
134
                function (\Redis $redis, $chan, $event) use ($dispatch) {
0 ignored issues
show
Bug introduced by
function(...) { /* ... */ } of type callable is incompatible with the type array|string expected by parameter $callback of Redis::subscribe(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

134
                /** @scrutinizer ignore-type */ function (\Redis $redis, $chan, $event) use ($dispatch) {
Loading history...
135
                    $iterator = null;
136
                    while ($iterator !== 0) {
137
                        while ($keys = $this->client->scan($iterator, "*$chan:*", 100)) {
138
                            [$filters, $data] = unserialize($event, [true]);
139
                            foreach ($keys as $key) {
140
                                $key = $this->unprefix($key);
141
                                $chan = $this->unprefix($chan);
142
                                $meta = $this->client->get($key);
143
                                preg_match("/$chan:(.+)/", $key, $matches);
144
                                if ($matches) {
145
                                    $message = new SubscriptionMessage(
146
                                        $chan,
147
                                        $matches[1],
148
                                        $data,
149
                                        $filters,
150
                                        unserialize($meta, [true])
151
                                    );
152
153
                                    $dispatch($message);
154
                                }
155
                            }
156
                        }
157
                    }
158
                }
159
            );
160
        } catch (\RedisException $redisException) {
161
            $this->consume($channels, $dispatch);
162
        }
163
    }
164
165
    /**
166
     * Helper to remove prefix
167
     *
168
     * @param string $key
169
     *
170
     * @return string
171
     */
172
    private function unprefix($key): string
173
    {
174
        return str_replace($this->prefix, null, $key);
175
    }
176
}
177