Passed
Push — master ( 6f5173...a1a76b )
by BENOIT
01:53
created

SubscribeController::getLastEventID()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 4
nc 1
nop 2
dl 0
loc 6
ccs 0
cts 0
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace BenTools\MercurePHP\Controller;
4
5
use BenTools\MercurePHP\Configuration\Configuration;
6
use BenTools\MercurePHP\Exception\Http\AccessDeniedHttpException;
7
use BenTools\MercurePHP\Exception\Http\BadRequestHttpException;
8
use BenTools\MercurePHP\Helpers\QueryStringParser;
9
use BenTools\MercurePHP\Model\Message;
10
use BenTools\MercurePHP\Security\Authenticator;
11
use BenTools\MercurePHP\Security\TopicMatcher;
12
use Psr\Http\Message\RequestInterface;
13
use Psr\Http\Message\ServerRequestInterface as Request;
14
use Psr\Log\LoggerInterface;
15
use React\EventLoop\LoopInterface;
16
use React\Http\Message\Response;
17
use React\Promise\PromiseInterface;
18
use React\Stream\ThroughStream;
19
use React\Stream\WritableStreamInterface as Stream;
20
21
use function BenTools\MercurePHP\nullify;
22
use function BenTools\QueryString\query_string;
23
use function React\Promise\all;
24
use function React\Promise\resolve;
25
26
final class SubscribeController extends AbstractController
27
{
28
    private Authenticator $authenticator;
29
    private LoopInterface $loop;
30
    private QueryStringParser $queryStringParser;
31
32
    public function __construct(
33
        LoopInterface $loop,
34 9
        LoggerInterface $logger
35
    ) {
36 9
        $this->queryStringParser = new QueryStringParser();
37 9
        $this->loop = $loop;
38 9
        $this->logger = $logger;
39 9
    }
40 9
41 9
    public function __invoke(Request $request): PromiseInterface
42
    {
43 7
44
        if ('OPTIONS' === $request->getMethod()) {
45 7
            return resolve(new Response(200));
46
        }
47 2
48
        $request = $this->withAttributes($request);
49
50
        $stream = new ThroughStream();
51 2
52
        $lastEventID = $request->getAttribute('lastEventId');
53 2
        $subscribedTopics = $request->getAttribute('subscribedTopics');
54 2
        $this->loop
55 2
            ->futureTick(
56 2
                fn() => $this->fetchMissedMessages($lastEventID, $subscribedTopics)
57 2
                        ->then(fn(iterable $messages) => $this->sendMissedMessages($messages, $request, $stream))
58
                        ->then(fn() => $this->subscribe($request, $stream))
59
            );
60 2
61
        $headers = [
62
            'Content-Type' => 'text/event-stream',
63 2
            'Cache-Control' => 'no-cache',
64
        ];
65
66
        return resolve(new Response(200, $headers, $stream));
67 2
    }
68
69
    public function matchRequest(RequestInterface $request): bool
70 5
    {
71
        return \in_array($request->getMethod(), ['GET', 'OPTIONS'], true)
72 5
            && '/.well-known/mercure' === $request->getUri()->getPath();
73 5
    }
74
75
    public function withConfig(array $config): self
76 7
    {
77
        /** @var self $clone */
78
        $clone = parent::withConfig($config);
79 7
80 2
        return $clone->withAuthenticator(Authenticator::createSubscriberAuthenticator($config));
81 2
    }
82
83
    private function withAttributes(Request $request): Request
84 5
    {
85 2
        try {
86
            $token = $this->authenticator->authenticate($request);
87
        } catch (\RuntimeException $e) {
88 3
            throw new AccessDeniedHttpException($e->getMessage());
89 3
        }
90
91 3
        $allowAnonymous = $this->config[Configuration::ALLOW_ANONYMOUS];
92 1
        if (null === $token && false === $allowAnonymous) {
93
            throw new AccessDeniedHttpException('Anonymous subscriptions are not allowed on this hub.', 401);
94
        }
95
96 2
        $qs = query_string($request->getUri(), $this->queryStringParser);
97 2
        $subscribedTopics = \array_map('\\urldecode', $qs->getParam('topic') ?? []);
98 2
99 2
        if ([] === $subscribedTopics) {
100
            throw new BadRequestHttpException('Missing "topic" parameter.');
101
        }
102 2
103
        $request = $request
104
            ->withQueryParams($qs->getParams())
105
            ->withAttribute('token', $token)
106
            ->withAttribute('subscribedTopics', $subscribedTopics)
107
            ->withAttribute('lastEventId', $this->getLastEventID($request, $qs->getParams()))
108
        ;
109
110
        return  $request;
111
    }
112
113
    private function subscribe(Request $request, Stream $stream): PromiseInterface
114
    {
115
        $allowAnonymous = $this->config[Configuration::ALLOW_ANONYMOUS];
116
        $subscribedTopics = $request->getAttribute('subscribedTopics');
117
        $token = $request->getAttribute('token');
118
        $promises = [];
119
        foreach ($subscribedTopics as $topicSelector) {
120
            if (!TopicMatcher::canSubscribeToTopic($topicSelector, $token, $allowAnonymous)) {
121
                $clientId = $request->getAttribute('clientId');
122
                $this->logger->debug("Client {$clientId} cannot subscribe to {$topicSelector}");
123
                continue;
124
            }
125
            $promises[] = $this->transport
126
                ->subscribe(
127
                    $topicSelector,
128
                    fn(string $topic, Message $message) => $this->sendIfAllowed($topic, $message, $request, $stream)
129
                )
130
                ->then(function (string $topic) use ($request) {
131
                    $clientId = $request->getAttribute('clientId');
132
                    $this->logger->debug("Client {$clientId} subscribed to {$topic}");
133
                });
134
        }
135
136
        if ([] === $promises) {
137
            return resolve(true);
138
        }
139
140
        return all($promises);
141
    }
142
143
    private function fetchMissedMessages(?string $lastEventID, array $subscribedTopics): PromiseInterface
144
    {
145
        if (null === $lastEventID) {
146
            return resolve([]);
147
        }
148
149
        return $this->storage->retrieveMessagesAfterId($lastEventID, $subscribedTopics);
150
    }
151
152
    private function sendMissedMessages(iterable $messages, Request $request, Stream $stream): PromiseInterface
153
    {
154
        $promises = [];
155
        foreach ($messages as $topic => $message) {
156
            $promises[] = $this->sendIfAllowed($topic, $message, $request, $stream);
157
        }
158
159
        if ([] === $promises) {
160
            return resolve(true);
161
        }
162
163
        return all($promises);
164
    }
165
166
    private function sendIfAllowed(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface
167
    {
168
        $allowAnonymous = $this->config[Configuration::ALLOW_ANONYMOUS];
169
        $subscribedTopics = $request->getAttribute('subscribedTopics');
170
        $token = $request->getAttribute('token');
171
        if (!TopicMatcher::canReceiveUpdate($topic, $message, $subscribedTopics, $token, $allowAnonymous)) {
172
            return resolve(false);
173
        }
174
175
        return resolve($this->send($topic, $message, $request, $stream));
176
    }
177
178 2
    private function send(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface
179
    {
180 2
        $stream->write((string) $message);
181 2
        $clientId = $request->getAttribute('clientId');
182 2
        $id = $message->getId();
183 2
        $this->logger->debug("Dispatched message {$id} to client {$clientId} on topic {$topic}");
184
185
        return resolve(true);
186
    }
187
188
    private function getLastEventID(Request $request, array $queryParams): ?string
189
    {
190
        return nullify($request->getHeaderLine('Last-Event-ID'))
191
            ?? nullify($queryParams['Last-Event-ID'] ?? null)
192
            ?? nullify($queryParams['Last-Event-Id'] ?? null)
193
            ?? nullify($queryParams['last-event-id'] ?? null);
194
    }
195
196
    private function withAuthenticator(Authenticator $authenticator): self
197
    {
198
        $clone = clone $this;
199
        $clone->authenticator = $authenticator;
200
201
        return $clone;
202
    }
203
}
204