Passed
Pull Request — master (#17)
by BENOIT
08:21
created

RedisStorage   A

Complexity

Total Complexity 17

Size/Duplication

Total Lines 132
Duplicated Lines 0 %

Test Coverage

Coverage 97.44%

Importance

Changes 0
Metric Value
eloc 57
dl 0
loc 132
ccs 38
cts 39
cp 0.9744
rs 10
c 0
b 0
f 0
wmc 17

10 Methods

Rating   Name   Duplication   Size   Complexity  
A findSubscriptionsBySubscriber() 0 17 2
A removeSubscriptions() 0 11 2
A findNextMessages() 0 32 4
A storeMessage() 0 10 1
A storeLastEventId() 0 10 2
A createSubscriptionKey() 0 6 1
A getLastEventId() 0 3 1
A retrieveMessagesAfterId() 0 3 1
A __construct() 0 4 1
A storeSubscriptions() 0 11 2
1
<?php
2
3
namespace BenTools\MercurePHP\Storage\Redis;
4
5
use BenTools\MercurePHP\Model\Message;
6
use BenTools\MercurePHP\Model\Subscription;
7
use BenTools\MercurePHP\Security\TopicMatcher;
8
use BenTools\MercurePHP\Storage\StorageInterface;
9
use Clue\React\Redis\Client as AsynchronousClient;
10
use Predis\Client as SynchronousClient;
11
use React\Promise\PromiseInterface;
12
13
use function React\Promise\all;
14
use function React\Promise\resolve;
15
16
final class RedisStorage implements StorageInterface
17
{
18
    private AsynchronousClient $async;
19 2
    private SynchronousClient $sync;
20
21 2
    public function __construct(AsynchronousClient $asyncClient, SynchronousClient $syncClient)
22 2
    {
23 2
        $this->async = $asyncClient;
24
        $this->sync = $syncClient;
25 1
    }
26
27 1
    public function retrieveMessagesAfterId(string $id, array $subscribedTopics): PromiseInterface
28
    {
29
        return resolve($this->findNextMessages($id, $subscribedTopics));
30 1
    }
31
32 1
    public function storeMessage(string $topic, Message $message): PromiseInterface
33 1
    {
34
        $id = $message->getId();
35
        $payload = \json_encode($message, \JSON_THROW_ON_ERROR);
36 1
37 1
        /** @phpstan-ignore-next-line */
38 1
        return $this->async->set('data:' . $id, $topic . \PHP_EOL . $payload)
39 1
            ->then(fn() => $this->getLastEventId())
40
            ->then(fn(?string $lastEventId) => $this->storeLastEventId($lastEventId, $id))
41
            ->then(fn() => $id);
42 1
    }
43
44 1
    public function storeSubscriptions(array $subscriptions): PromiseInterface
45
    {
46
        $promises = [];
47 1
48
        foreach ($subscriptions as $subscription) {
49 1
            $key = $this->createSubscriptionKey($subscription);
50
            /** @phpstan-ignore-next-line */
51 1
            $promises[] = $this->async->set($key, \json_encode($subscription, \JSON_THROW_ON_ERROR));
52 1
        }
53
54
        return all($promises);
55
    }
56 1
57
    public function removeSubscriptions(iterable $subscriptions): PromiseInterface
58
    {
59 1
        $promises = [];
60
61 1
        foreach ($subscriptions as $subscription) {
62
            $key = $this->createSubscriptionKey($subscription);
63
            /** @phpstan-ignore-next-line */
64 1
            $promises[] = $this->async->del($key);
65 1
        }
66
67
        return all($promises);
68 1
    }
69
70
    public function findSubscriptionsBySubscriber(string $subscriber): PromiseInterface
71 1
    {
72
        $keyPattern = \sprintf(
73
            'subscription:%s:*',
74
            $subscriber,
75 1
        );
76 1
77 1
        /** @phpstan-ignore-next-line */
78 1
        return $this->async->keys($keyPattern)
79 1
            ->then(
80 1
                function (array $keys) {
81 1
                    $promises = [];
82 1
                    foreach ($keys as $key) {
83
                        $promises[] = $this->async->get($key); /** @phpstan-ignore-line */
84
                    }
85
86 1
                    return all($promises);
87 1
                }
88
            );
89
    }
90 1
91 1
    private function getLastEventId(): PromiseInterface
92
    {
93
        return $this->async->get('Last-Event-ID'); /** @phpstan-ignore-line */
94
    }
95
96
    private function storeLastEventId(?string $previousEventId, string $newEventId): PromiseInterface
97
    {
98
        $promise = $this->async->set('Last-Event-ID', $newEventId); /** @phpstan-ignore-line */
99
100
        if (null === $previousEventId) {
101
            return $promise;
102
        }
103
104
        /** @phpstan-ignore-next-line */
105
        return $promise->then(fn() => $this->async->set('next:' . $previousEventId, $newEventId));
106
    }
107
108
    private function findNextMessages(string $id, array $subscribedTopics): iterable
109
    {
110
        $nextId = $this->sync->get('next:' . $id);
111
112
        /** @phpstan-ignore-next-line */
113
        if (null === $nextId) {
114
            return [];
115
        }
116
117
        $payload = $this->sync->get('data:' . $nextId);
118
119
        /** @phpstan-ignore-next-line */
120
        if (null === $payload) {
121
            return [];
122
        }
123
124
        $item = \explode(\PHP_EOL, $payload);
125
        $topic = \array_shift($item);
126
        $message = Message::fromArray(
127
            \json_decode(
128
                \implode(\PHP_EOL, $item),
129
                true,
130
                512,
131
                \JSON_THROW_ON_ERROR
132
            )
133
        );
134
135
        if (TopicMatcher::matchesTopicSelectors($topic, $subscribedTopics)) {
136
            yield $topic => $message;
137
        }
138
139
        yield from $this->findNextMessages($message->getId(), $subscribedTopics); // Sync client needed because of this
140
    }
141
142
    private function createSubscriptionKey(Subscription $subscription): string
143
    {
144
        return \sprintf(
145
            'subscription:%s:%s',
146
            $subscription->getSubscriber(),
147
            \hash('crc32', $subscription->getId())
148
        );
149
    }
150
}
151