Passed
Pull Request — master (#17)
by BENOIT
02:04
created

RedisStorage::findSubscriptions()   B

Complexity

Conditions 6
Paths 2

Size

Total Lines 36
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 6.0131

Importance

Changes 0
Metric Value
cc 6
eloc 24
c 0
b 0
f 0
nc 2
nop 2
dl 0
loc 36
rs 8.9137
ccs 13
cts 14
cp 0.9286
crap 6.0131
1
<?php
2
3
namespace BenTools\MercurePHP\Storage\Redis;
4
5
use BenTools\MercurePHP\Model\Message;
6
use BenTools\MercurePHP\Model\Subscription;
7
use BenTools\MercurePHP\Security\TopicMatcher;
8
use BenTools\MercurePHP\Storage\StorageInterface;
9
use Clue\React\Redis\Client as AsynchronousClient;
10
use Predis\Client as SynchronousClient;
11
use React\Promise\PromiseInterface;
12
13
use function React\Promise\all;
14
use function React\Promise\resolve;
15
16
final class RedisStorage implements StorageInterface
17
{
18
    private AsynchronousClient $async;
19 2
    private SynchronousClient $sync;
20
21 2
    public function __construct(AsynchronousClient $asyncClient, SynchronousClient $syncClient)
22 2
    {
23 2
        $this->async = $asyncClient;
24
        $this->sync = $syncClient;
25 1
    }
26
27 1
    public function retrieveMessagesAfterID(string $id, array $subscribedTopics): PromiseInterface
28
    {
29
        return resolve($this->findNextMessages($id, $subscribedTopics));
30 1
    }
31
32 1
    public function storeMessage(string $topic, Message $message): PromiseInterface
33 1
    {
34
        $id = $message->getId();
35
        $payload = \json_encode($message, \JSON_THROW_ON_ERROR);
36 1
37 1
        /** @phpstan-ignore-next-line */
38 1
        return $this->async->set('data:' . $id, $topic . \PHP_EOL . $payload)
39 1
            ->then(fn() => $this->getLastEventID())
40
            ->then(fn(?string $lastEventId) => $this->storeLastEventId($lastEventId, $id))
41
            ->then(fn() => $id);
42 1
    }
43
44 1
    public function storeSubscriptions(array $subscriptions): PromiseInterface
45
    {
46
        $promises = [];
47 1
48
        foreach ($subscriptions as $subscription) {
49 1
            $key = $this->createSubscriptionKey($subscription);
50
            /** @phpstan-ignore-next-line */
51 1
            $promises[] = $this->async->set($key, \json_encode($subscription, \JSON_THROW_ON_ERROR));
52 1
        }
53
54
        return all($promises);
55
    }
56 1
57
    public function removeSubscriptions(iterable $subscriptions): PromiseInterface
58
    {
59 1
        $promises = [];
60
61 1
        foreach ($subscriptions as $subscription) {
62
            $key = $this->createSubscriptionKey($subscription);
63
            /** @phpstan-ignore-next-line */
64 1
            $promises[] = $this->async->del($key);
65 1
        }
66
67
        return all($promises);
68 1
    }
69
70
    public function findSubscriptions(?string $topic = null, ?string $subscriber = null): PromiseInterface
71 1
    {
72
        $keyPattern = null === $subscriber ? 'subscription:*' : \sprintf('subscription:%s:*', $subscriber);
73
74
        /** @phpstan-ignore-next-line */
75 1
        return $this->async->keys($keyPattern)
76 1
            ->then(
77 1
                function (array $keys) use ($topic) {
78 1
                    $promises = [];
79 1
                    foreach ($keys as $key) {
80 1
                        $promises[] = $this->async->get($key); /** @phpstan-ignore-line */
81 1
                    }
82 1
83
                    return all($promises)->then(
84
                        fn(array $subscriptions) => \array_map(
85
                            function (string $serialized) {
86 1
                                $data = \json_decode($serialized, true);
87 1
88
                                return new Subscription(
89
                                    $data['id'],
90 1
                                    $data['subscriber'],
91 1
                                    $data['topic'],
92
                                    $data['payload'] ?? null
93
                                );
94
                            },
95
                            $subscriptions
96
                        )
97
                    )
98
                        ->then(function (array $subscriptions) use ($topic): iterable {
99
                            foreach ($subscriptions as $subscription) {
100
                                $matchtopic = null === $topic ||  TopicMatcher::matchesTopicSelectors(
101
                                    $subscription->getTopic(),
102
                                    [$topic]
103
                                );
104
                                if ($matchtopic) {
105
                                    yield $subscription;
106
                                }
107
                            }
108
                        });
109
                }
110
            );
111
    }
112
113
    public function getLastEventID(): PromiseInterface
114
    {
115
        return $this->async->get('Last-Event-ID'); /** @phpstan-ignore-line */
116
    }
117
118
    private function storeLastEventId(?string $previousEventId, string $newEventId): PromiseInterface
119
    {
120
        $promise = $this->async->set('Last-Event-ID', $newEventId); /** @phpstan-ignore-line */
121
122
        if (null === $previousEventId) {
123
            return $promise;
124
        }
125
126
        /** @phpstan-ignore-next-line */
127
        return $promise->then(fn() => $this->async->set('next:' . $previousEventId, $newEventId));
128
    }
129
130
    private function findNextMessages(string $id, array $subscribedTopics): iterable
131
    {
132
        $nextId = $this->sync->get('next:' . $id);
133
134
        /** @phpstan-ignore-next-line */
135
        if (null === $nextId) {
136
            return [];
137
        }
138
139
        $payload = $this->sync->get('data:' . $nextId);
140
141
        /** @phpstan-ignore-next-line */
142
        if (null === $payload) {
143
            return [];
144
        }
145
146
        $item = \explode(\PHP_EOL, $payload);
147
        $topic = \array_shift($item);
148
        $message = Message::fromArray(
149
            \json_decode(
150
                \implode(\PHP_EOL, $item),
151
                true,
152
                512,
153
                \JSON_THROW_ON_ERROR
154
            )
155
        );
156
157
        if (TopicMatcher::matchesTopicSelectors($topic, $subscribedTopics)) {
158
            yield $topic => $message;
159
        }
160
161
        yield from $this->findNextMessages($message->getId(), $subscribedTopics); // Sync client needed because of this
162
    }
163
164
    private function createSubscriptionKey(Subscription $subscription): string
165
    {
166
        return \sprintf(
167
            'subscription:%s:%s',
168
            $subscription->getSubscriber(),
169
            \hash('crc32', $subscription->getId())
170
        );
171
    }
172
}
173