Completed
Pull Request — master (#13)
by Rafael
06:46
created

RedisPubSubHandler::exists()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
eloc 1
dl 0
loc 3
ccs 0
cts 3
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 2
1
<?php
2
/*******************************************************************************
3
 *  This file is part of the GraphQL Bundle package.
4
 *
5
 *  (c) YnloUltratech <[email protected]>
6
 *
7
 *  For the full copyright and license information, please view the LICENSE
8
 *  file that was distributed with this source code.
9
 ******************************************************************************/
10
11
namespace Ynlo\GraphQLBundle\Subscription\PubSub;
12
13
class RedisPubSubHandler implements PubSubHandlerInterface
14
{
15
    /**
16
     * @var \Redis
17
     */
18
    protected $client;
19
20
    /**
21
     * @var \Redis
22
     */
23
    protected $consumer;
24
25
    /**
26
     * @var string
27
     */
28
    protected $prefix;
29
30
    /**
31
     * RedisPubSubHandler constructor.
32
     *
33
     * @param array $config
34
     */
35
    public function __construct(array $config)
36
    {
37
        $host = $config['host'] ?? 'localhost';
38
        $port = $config['port'] ?? 6379;
39
        $this->prefix = $config['prefix'] ?? 'GraphQLSubscription:';
40
41
        $this->client = new \Redis();
42
        $this->client->connect($host, $port, 5);
43
        $this->client->setOption(\Redis::OPT_PREFIX, $this->prefix);
44
45
        $this->consumer = new \Redis();
46
        $this->consumer->connect($host, $port, 5);
47
        $this->consumer->setOption(\Redis::OPT_PREFIX, $this->prefix);
48
    }
49
50
    /**
51
     * @inheritDoc
52
     */
53
    public function sub(string $channel, string $id, array $meta, \DateTime $expireAt = null): void
54
    {
55
        $key = sprintf('%s:%s', $channel, $id);
56
        $alreadyExists = $this->exists($id);
57
        $ttl = $this->client->ttl($key);
58
        $this->client->set($key, serialize($meta));
59
60
        if ($alreadyExists) {
61
            if ($expireAt && $expireAt->getTimestamp() - $ttl > time()) {
62
                $this->client->expireAt($key, $expireAt->format('U'));
0 ignored issues
show
Bug introduced by
$expireAt->format('U') of type string is incompatible with the type integer expected by parameter $timestamp of Redis::expireAt(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

62
                $this->client->expireAt($key, /** @scrutinizer ignore-type */ $expireAt->format('U'));
Loading history...
63
            } else {
64
                $this->client->setTimeout($key, $ttl);
65
            }
66
        } elseif ($expireAt) {
67
            $this->client->expireAt($key, $expireAt->format('U'));
68
        }
69
70
        $iterator = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $iterator is dead and can be removed.
Loading history...
71
    }
72
73
    /**
74
     * @inheritDoc
75
     */
76
    public function pub(string $channel, array $filters = [], array $data = []): void
77
    {
78
        $this->client->publish($channel, serialize([$filters, $data]));
79
    }
80
81
    /**
82
     * @inheritDoc
83
     */
84
    public function touch(string $id, \DateTime $expireAt): void
85
    {
86
        while ($keys = $this->client->scan($iterator, "*:$id*")) {
87
            foreach ($keys as $key) {
88
                $key = $this->unprefix($key);
89
                $this->client->expireAt($key, $expireAt->format('U'));
0 ignored issues
show
Bug introduced by
$expireAt->format('U') of type string is incompatible with the type integer expected by parameter $timestamp of Redis::expireAt(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

89
                $this->client->expireAt($key, /** @scrutinizer ignore-type */ $expireAt->format('U'));
Loading history...
90
            }
91
        }
92
    }
93
94
    /**
95
     * @inheritDoc
96
     */
97
    public function del(string $id): void
98
    {
99
        while ($keys = $this->client->scan($iterator, "*:$id*")) {
100
            $this->client->del($keys);
101
        }
102
    }
103
104
    /**
105
     * @inheritDoc
106
     */
107
    public function exists(string $id): bool
108
    {
109
        return !empty($this->client->scan($iterator, "*:$id*"));
110
    }
111
112
    /**
113
     * @inheritDoc
114
     */
115
    public function consume(array $channels, callable $dispatch): void
116
    {
117
        try {
118
            $this->consumer->subscribe(
119
                $channels,
120
                function (\Redis $redis, $chan, $event) use ($dispatch) {
0 ignored issues
show
Bug introduced by
function(...) { /* ... */ } of type callable is incompatible with the type array|string expected by parameter $callback of Redis::subscribe(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

120
                /** @scrutinizer ignore-type */ function (\Redis $redis, $chan, $event) use ($dispatch) {
Loading history...
121
                    while ($keys = $this->client->scan($iterator, "*$chan:*", 100)) {
122
                        [$filters, $data] = unserialize($event, [true]);
123
                        foreach ($keys as $key) {
124
                            $key = $this->unprefix($key);
125
                            $chan = $this->unprefix($chan);
126
                            $meta = $this->client->get($key);
127
                            preg_match("/$chan:(.+)/", $key, $matches);
128
                            if ($matches) {
129
                                $message = new SubscriptionMessage(
130
                                    $chan,
131
                                    $matches[1],
132
                                    $data,
133
                                    $filters,
134
                                    unserialize($meta, [true])
135
                                );
136
137
                                $dispatch($message);
138
                            }
139
                        }
140
                    }
141
                }
142
            );
143
        } catch (\RedisException $redisException) {
144
            $this->consume($channels, $dispatch);
145
        }
146
    }
147
148
    /**
149
     * Helper to remove prefix
150
     *
151
     * @param string $key
152
     *
153
     * @return string
154
     */
155
    private function unprefix($key): string
156
    {
157
        return str_replace($this->prefix, null, $key);
158
    }
159
}
160