bpolaszek /
mercure-php-hub
| 1 | <?php |
||
| 2 | |||
| 3 | namespace BenTools\MercurePHP\Transport\Redis; |
||
| 4 | |||
| 5 | use BenTools\MercurePHP\Security\TopicMatcher; |
||
| 6 | use BenTools\MercurePHP\Model\Message; |
||
| 7 | use BenTools\MercurePHP\Transport\TransportInterface; |
||
| 8 | use Clue\React\Redis\Client as AsynchronousClient; |
||
| 9 | use React\Promise\PromiseInterface; |
||
| 10 | use Rize\UriTemplate; |
||
| 11 | |||
| 12 | use function React\Promise\resolve; |
||
| 13 | |||
| 14 | /** |
||
| 15 | * The Redis client cannot handle subscribe and publish within the same connection. |
||
| 16 | * It actually needs 2 connections for that purpose. |
||
| 17 | */ |
||
| 18 | final class RedisTransport implements TransportInterface |
||
| 19 | { |
||
| 20 | private AsynchronousClient $subscriber; |
||
| 21 | private AsynchronousClient $publisher; |
||
| 22 | |||
| 23 | 2 | public function __construct(AsynchronousClient $subscriber, AsynchronousClient $publisher) |
|
| 24 | { |
||
| 25 | 2 | $this->subscriber = $subscriber; |
|
| 26 | 2 | $this->publisher = $publisher; |
|
| 27 | 2 | } |
|
| 28 | |||
| 29 | 1 | public function publish(string $topic, Message $message): PromiseInterface |
|
| 30 | { |
||
| 31 | 1 | $payload = \json_encode($message, \JSON_THROW_ON_ERROR); |
|
| 32 | |||
| 33 | /** @phpstan-ignore-next-line */ |
||
| 34 | 1 | return $this->publisher |
|
| 35 | 1 | ->publish($topic, $payload); |
|
| 36 | } |
||
| 37 | |||
| 38 | 1 | public function subscribe(string $topicSelector, callable $callback): PromiseInterface |
|
| 39 | { |
||
| 40 | // Uri templates |
||
| 41 | 1 | if (false !== \strpos($topicSelector, '{')) { |
|
| 42 | return $this->subscribePattern($topicSelector, $callback); |
||
| 43 | } |
||
| 44 | |||
| 45 | /** @phpstan-ignore-next-line */ |
||
| 46 | 1 | $this->subscriber->subscribe($topicSelector); |
|
| 47 | 1 | $this->subscriber->on( |
|
| 48 | 1 | 'message', |
|
| 49 | function (string $topic, string $payload) use ($topicSelector, $callback) { |
||
| 50 | 1 | $this->dispatch($topic, $payload, $topicSelector, $callback); |
|
| 51 | 1 | } |
|
| 52 | ); |
||
| 53 | |||
| 54 | 1 | return resolve($topicSelector); |
|
| 55 | } |
||
| 56 | |||
| 57 | private function subscribePattern(string $topicSelector, callable $callback): PromiseInterface |
||
| 58 | { |
||
| 59 | static $uriTemplate; |
||
| 60 | $uriTemplate ??= new UriTemplate(); |
||
| 61 | $keys = \array_keys($uriTemplate->extract($topicSelector, $topicSelector, false)); |
||
|
0 ignored issues
–
show
Bug
introduced
by
Loading history...
|
|||
| 62 | |||
| 63 | // Replaces /author/{author}/books/{book} by /author/*/books/* to match Redis' patterns |
||
| 64 | $channel = $uriTemplate->expand( |
||
| 65 | $topicSelector, |
||
| 66 | \array_combine( |
||
| 67 | $keys, |
||
| 68 | \array_fill(0, count($keys), '*') |
||
| 69 | ) |
||
| 70 | ); |
||
| 71 | $channel = \strtr($channel, ['%2A' => '*']); |
||
| 72 | |||
| 73 | /** @phpstan-ignore-next-line */ |
||
| 74 | $this->subscriber->psubscribe($channel); |
||
| 75 | $this->subscriber->on( |
||
| 76 | 'pmessage', |
||
| 77 | function (string $pattern, string $topic, string $payload) use ($topicSelector, $callback) { |
||
| 78 | $this->dispatch($topic, $payload, $topicSelector, $callback); |
||
| 79 | } |
||
| 80 | ); |
||
| 81 | |||
| 82 | return resolve($topicSelector); |
||
| 83 | } |
||
| 84 | |||
| 85 | 1 | private function dispatch(string $topic, string $payload, string $topicSelector, callable $callback): void |
|
| 86 | { |
||
| 87 | 1 | if (!TopicMatcher::matchesTopicSelectors($topic, [$topicSelector])) { |
|
| 88 | return; |
||
| 89 | } |
||
| 90 | |||
| 91 | 1 | $message = Message::fromArray( |
|
| 92 | 1 | \json_decode( |
|
| 93 | 1 | $payload, |
|
| 94 | 1 | true, |
|
| 95 | 1 | 512, |
|
| 96 | 1 | \JSON_THROW_ON_ERROR |
|
| 97 | ) |
||
| 98 | ); |
||
| 99 | |||
| 100 | 1 | $callback( |
|
| 101 | 1 | $topic, |
|
| 102 | $message |
||
| 103 | ); |
||
| 104 | 1 | } |
|
| 105 | } |
||
| 106 |