RedisTransport::dispatch()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 18
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 2.003

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
eloc 11
c 1
b 0
f 0
nc 2
nop 4
dl 0
loc 18
ccs 10
cts 11
cp 0.9091
crap 2.003
rs 9.9
1
<?php
2
3
namespace BenTools\MercurePHP\Transport\Redis;
4
5
use BenTools\MercurePHP\Security\TopicMatcher;
6
use BenTools\MercurePHP\Model\Message;
7
use BenTools\MercurePHP\Transport\TransportInterface;
8
use Clue\React\Redis\Client as AsynchronousClient;
9
use React\Promise\PromiseInterface;
10
use Rize\UriTemplate;
11
12
use function React\Promise\resolve;
13
14
/**
15
 * The Redis client cannot handle subscribe and publish within the same connection.
16
 * It actually needs 2 connections for that purpose.
17
 */
18
final class RedisTransport implements TransportInterface
19
{
20
    private AsynchronousClient $subscriber;
21
    private AsynchronousClient $publisher;
22
23 2
    public function __construct(AsynchronousClient $subscriber, AsynchronousClient $publisher)
24
    {
25 2
        $this->subscriber = $subscriber;
26 2
        $this->publisher = $publisher;
27 2
    }
28
29 1
    public function publish(string $topic, Message $message): PromiseInterface
30
    {
31 1
        $payload = \json_encode($message, \JSON_THROW_ON_ERROR);
32
33
        /** @phpstan-ignore-next-line */
34 1
        return $this->publisher
35 1
            ->publish($topic, $payload);
36
    }
37
38 1
    public function subscribe(string $topicSelector, callable $callback): PromiseInterface
39
    {
40
        // Uri templates
41 1
        if (false !== \strpos($topicSelector, '{')) {
42
            return $this->subscribePattern($topicSelector, $callback);
43
        }
44
45
        /** @phpstan-ignore-next-line */
46 1
        $this->subscriber->subscribe($topicSelector);
47 1
        $this->subscriber->on(
48 1
            'message',
49
            function (string $topic, string $payload) use ($topicSelector, $callback) {
50 1
                $this->dispatch($topic, $payload, $topicSelector, $callback);
51 1
            }
52
        );
53
54 1
        return resolve($topicSelector);
55
    }
56
57
    private function subscribePattern(string $topicSelector, callable $callback): PromiseInterface
58
    {
59
        static $uriTemplate;
60
        $uriTemplate ??= new UriTemplate();
61
        $keys = \array_keys($uriTemplate->extract($topicSelector, $topicSelector, false));
0 ignored issues
show
Bug introduced by
It seems like $uriTemplate->extract($t... $topicSelector, false) can also be of type null; however, parameter $array of array_keys() does only seem to accept array, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

61
        $keys = \array_keys(/** @scrutinizer ignore-type */ $uriTemplate->extract($topicSelector, $topicSelector, false));
Loading history...
62
63
        // Replaces /author/{author}/books/{book} by /author/*/books/* to match Redis' patterns
64
        $channel = $uriTemplate->expand(
65
            $topicSelector,
66
            \array_combine(
67
                $keys,
68
                \array_fill(0, count($keys), '*')
69
            )
70
        );
71
        $channel = \strtr($channel, ['%2A' => '*']);
72
73
        /** @phpstan-ignore-next-line */
74
        $this->subscriber->psubscribe($channel);
75
        $this->subscriber->on(
76
            'pmessage',
77
            function (string $pattern, string $topic, string $payload) use ($topicSelector, $callback) {
78
                $this->dispatch($topic, $payload, $topicSelector, $callback);
79
            }
80
        );
81
82
        return resolve($topicSelector);
83
    }
84
85 1
    private function dispatch(string $topic, string $payload, string $topicSelector, callable $callback): void
86
    {
87 1
        if (!TopicMatcher::matchesTopicSelectors($topic, [$topicSelector])) {
88
            return;
89
        }
90
91 1
        $message = Message::fromArray(
92 1
            \json_decode(
93 1
                $payload,
94 1
                true,
95 1
                512,
96 1
                \JSON_THROW_ON_ERROR
97
            )
98
        );
99
100 1
        $callback(
101 1
            $topic,
102
            $message
103
        );
104 1
    }
105
}
106