Completed
Push — master ( 930556...27fb5d )
by Rafael
03:19
created

RedisPubSubHandler::consume()   A

Complexity

Conditions 5
Paths 1

Size

Total Lines 24
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 30

Importance

Changes 0
Metric Value
eloc 20
dl 0
loc 24
ccs 0
cts 23
cp 0
rs 9.2888
c 0
b 0
f 0
cc 5
nc 1
nop 2
crap 30
1
<?php
2
/*******************************************************************************
3
 *  This file is part of the GraphQL Bundle package.
4
 *
5
 *  (c) YnloUltratech <[email protected]>
6
 *
7
 *  For the full copyright and license information, please view the LICENSE
8
 *  file that was distributed with this source code.
9
 ******************************************************************************/
10
11
namespace Ynlo\GraphQLBundle\Subscription\PubSub;
12
13
class RedisPubSubHandler implements PubSubHandlerInterface
14
{
15
    /**
16
     * @var \Redis
17
     */
18
    protected $client;
19
20
    /**
21
     * @var \Redis
22
     */
23
    protected $consumer;
24
25
    /**
26
     * @var string
27
     */
28
    protected $prefix;
29
30
    /**
31
     * RedisPubSubHandler constructor.
32
     *
33
     * @param array $config
34
     */
35
    public function __construct(array $config)
36
    {
37
        // solution to fix phpredis connection error bug: https://github.com/phpredis/phpredis/issues/70
38
        ini_set('default_socket_timeout', -1);
39
40
        $host = $config['host'] ?? 'localhost';
41
        $port = $config['port'] ?? 6379;
42
        $this->prefix = $config['prefix'] ?? 'GraphQLSubscription:';
43
44
        $this->client = new \Redis();
45
        $this->client->connect($host, $port);
46
        $this->client->setOption(\Redis::OPT_PREFIX, $this->prefix);
47
48
        $this->consumer = new \Redis();
49
        $this->consumer->connect($host, $port);
50
        $this->consumer->setOption(\Redis::OPT_PREFIX, $this->prefix);
51
    }
52
53
    /**
54
     * @inheritDoc
55
     */
56
    public function sub(string $channel, string $id, array $meta, \DateTime $expireAt = null): void
57
    {
58
        $key = sprintf('%s:%s', $channel, $id);
59
        $alreadyExists = $this->exists($id);
60
        $ttl = $this->client->ttl($key);
61
        $this->client->set($key, serialize($meta));
62
63
        if ($alreadyExists) {
64
            if ($expireAt && $expireAt->getTimestamp() - $ttl > time()) {
65
                $this->client->expireAt($key, $expireAt->format('U'));
0 ignored issues
show
Bug introduced by
$expireAt->format('U') of type string is incompatible with the type integer expected by parameter $timestamp of Redis::expireAt(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

65
                $this->client->expireAt($key, /** @scrutinizer ignore-type */ $expireAt->format('U'));
Loading history...
66
            } else {
67
                $this->client->setTimeout($key, $ttl);
68
            }
69
        } elseif ($expireAt) {
70
            $this->client->expireAt($key, $expireAt->format('U'));
71
        }
72
73
        $iterator = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $iterator is dead and can be removed.
Loading history...
74
    }
75
76
    /**
77
     * @inheritDoc
78
     */
79
    public function pub(string $channel, array $filters = [], array $data = []): void
80
    {
81
        $this->client->publish($channel, serialize([$filters, $data]));
82
    }
83
84
    /**
85
     * @inheritDoc
86
     */
87
    public function touch(string $id, \DateTime $expireAt): void
88
    {
89
        $iterator = null;
90
        while ($iterator !== 0) {
91
            while ($keys = $this->client->scan($iterator, "*:$id*")) {
92
                foreach ($keys as $key) {
93
                    $key = $this->unprefix($key);
94
                    $this->client->expireAt($key, $expireAt->format('U'));
0 ignored issues
show
Bug introduced by
$expireAt->format('U') of type string is incompatible with the type integer expected by parameter $timestamp of Redis::expireAt(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

94
                    $this->client->expireAt($key, /** @scrutinizer ignore-type */ $expireAt->format('U'));
Loading history...
95
                }
96
            }
97
        }
98
    }
99
100
    /**
101
     * @inheritDoc
102
     */
103
    public function del(string $id): void
104
    {
105
        $iterator = null;
106
        while ($iterator !== 0) {
107
            while ($keys = $this->client->scan($iterator, "*:$id*")) {
108
                $this->client->del($keys);
109
            }
110
        }
111
    }
112
113
    /**
114
     * @inheritDoc
115
     */
116
    public function exists(string $id): bool
117
    {
118
        $iterator = null;
119
        while ($iterator !== 0) {
120
            $keys = $this->client->scan($iterator, "*:$id*");
121
            if (!empty($keys)) {
122
                return true;
123
            }
124
        }
125
126
        return false;
127
    }
128
129
    /**
130
     * @inheritDoc
131
     */
132
    public function consume(array $channels, callable $dispatch): void
133
    {
134
        $this->consumer->subscribe(
135
            $channels,
136
            function (\Redis $redis, $chan, $event) use ($dispatch) {
0 ignored issues
show
Bug introduced by
function(...) { /* ... */ } of type callable is incompatible with the type array|string expected by parameter $callback of Redis::subscribe(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

136
            /** @scrutinizer ignore-type */ function (\Redis $redis, $chan, $event) use ($dispatch) {
Loading history...
137
                $iterator = null;
138
                while ($iterator !== 0) {
139
                    while ($keys = $this->client->scan($iterator, "*$chan:*", 100)) {
140
                        [$filters, $data] = unserialize($event, [true]);
141
                        foreach ($keys as $key) {
142
                            $key = $this->unprefix($key);
143
                            $chan = $this->unprefix($chan);
144
                            $meta = $this->client->get($key);
145
                            preg_match("/$chan:(.+)/", $key, $matches);
146
                            if ($matches) {
147
                                $message = new SubscriptionMessage(
148
                                    $chan,
149
                                    $matches[1],
150
                                    $data,
151
                                    $filters,
152
                                    unserialize($meta, [true])
153
                                );
154
155
                                $dispatch($message);
156
                            }
157
                        }
158
                    }
159
                }
160
            }
161
        );
162
    }
163
164
    /**
165
     * Helper to remove prefix
166
     *
167
     * @param string $key
168
     *
169
     * @return string
170
     */
171
    private function unprefix($key): string
172
    {
173
        return str_replace($this->prefix, null, $key);
174
    }
175
}
176