Passed
Pull Request — master (#17)
by BENOIT
02:08
created

RedisStorage::findSubscriptionsBySubscriber()   A

Complexity

Conditions 2
Paths 1

Size

Total Lines 17
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 2.0078

Importance

Changes 0
Metric Value
cc 2
eloc 10
nc 1
nop 1
dl 0
loc 17
ccs 7
cts 8
cp 0.875
crap 2.0078
rs 9.9332
c 0
b 0
f 0
1
<?php
2
3
namespace BenTools\MercurePHP\Storage\Redis;
4
5
use BenTools\MercurePHP\Model\Message;
6
use BenTools\MercurePHP\Security\TopicMatcher;
7
use BenTools\MercurePHP\Storage\StorageInterface;
8
use Clue\React\Redis\Client as AsynchronousClient;
9
use Predis\Client as SynchronousClient;
10
use React\Promise\PromiseInterface;
11
12
use function React\Promise\all;
13
use function React\Promise\resolve;
14
15
final class RedisStorage implements StorageInterface
16
{
17
    private AsynchronousClient $async;
18
    private SynchronousClient $sync;
19 2
20
    public function __construct(AsynchronousClient $asyncClient, SynchronousClient $syncClient)
21 2
    {
22 2
        $this->async = $asyncClient;
23 2
        $this->sync = $syncClient;
24
    }
25 1
26
    public function retrieveMessagesAfterId(string $id, array $subscribedTopics): PromiseInterface
27 1
    {
28
        return resolve($this->findNextMessages($id, $subscribedTopics));
29
    }
30 1
31
    public function storeMessage(string $topic, Message $message): PromiseInterface
32 1
    {
33 1
        $id = $message->getId();
34
        $payload = \json_encode($message, \JSON_THROW_ON_ERROR);
35
36 1
        /** @phpstan-ignore-next-line */
37 1
        return $this->async->set('data:' . $id, $topic . \PHP_EOL . $payload)
38 1
            ->then(fn() => $this->getLastEventId())
39 1
            ->then(fn(?string $lastEventId) => $this->storeLastEventId($lastEventId, $id))
40
            ->then(fn() => $id);
41
    }
42 1
43
    public function storeSubscriptions(array $subscriptions): PromiseInterface
44 1
    {
45
        $promises = [];
46
47 1
        foreach ($subscriptions as $subscription) {
48
            $key = \sprintf(
49 1
                'subscription:%s:%s',
50
                $subscription->getSubscriber(),
51 1
                \hash('crc32', $subscription->getId())
52 1
            );
53
            /** @phpstan-ignore-next-line */
54
            $promises[] = $this->async->set($key, \json_encode($subscription, \JSON_THROW_ON_ERROR));
55
        }
56 1
57
        return all($promises);
58
    }
59 1
60
    public function findSubscriptionsBySubscriber(string $subscriber): PromiseInterface
61 1
    {
62
        $keyPattern = \sprintf(
63
            'subscription:%s:*',
64 1
            $subscriber,
65 1
        );
66
67
        /** @phpstan-ignore-next-line */
68 1
        return $this->async->keys($keyPattern)
69
            ->then(
70
                function (array $keys) {
71 1
                    $promises = [];
72
                    foreach ($keys as $key) {
73
                        $promises[] = $this->async->get($key); /** @phpstan-ignore-line */
74
                    }
75 1
76 1
                    return all($promises);
77 1
                }
78 1
            );
79 1
    }
80 1
81 1
    private function getLastEventId(): PromiseInterface
82 1
    {
83
        return $this->async->get('Last-Event-ID'); /** @phpstan-ignore-line */
84
    }
85
86 1
    private function storeLastEventId(?string $previousEventId, string $newEventId): PromiseInterface
87 1
    {
88
        $promise = $this->async->set('Last-Event-ID', $newEventId); /** @phpstan-ignore-line */
89
90 1
        if (null === $previousEventId) {
91 1
            return $promise;
92
        }
93
94
        /** @phpstan-ignore-next-line */
95
        return $promise->then(fn() => $this->async->set('next:' . $previousEventId, $newEventId));
96
    }
97
98
    private function findNextMessages(string $id, array $subscribedTopics): iterable
99
    {
100
        $nextId = $this->sync->get('next:' . $id);
101
102
        /** @phpstan-ignore-next-line */
103
        if (null === $nextId) {
104
            return [];
105
        }
106
107
        $payload = $this->sync->get('data:' . $nextId);
108
109
        /** @phpstan-ignore-next-line */
110
        if (null === $payload) {
111
            return [];
112
        }
113
114
        $item = \explode(\PHP_EOL, $payload);
115
        $topic = \array_shift($item);
116
        $message = Message::fromArray(
117
            \json_decode(
118
                \implode(\PHP_EOL, $item),
119
                true,
120
                512,
121
                \JSON_THROW_ON_ERROR
122
            )
123
        );
124
125
        if (TopicMatcher::matchesTopicSelectors($topic, $subscribedTopics)) {
126
            yield $topic => $message;
127
        }
128
129
        yield from $this->findNextMessages($message->getId(), $subscribedTopics); // Sync client needed because of this
130
    }
131
}
132