Passed
Pull Request — master (#17)
by BENOIT
01:58
created

SubscribeController::withAttributes()   A

Complexity

Conditions 5
Paths 4

Size

Total Lines 34
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 6.9683

Importance

Changes 0
Metric Value
cc 5
eloc 21
nc 4
nop 1
dl 0
loc 34
ccs 12
cts 21
cp 0.5714
crap 6.9683
rs 9.2728
c 0
b 0
f 0
1
<?php
2
3
namespace BenTools\MercurePHP\Controller;
4
5
use BenTools\MercurePHP\Configuration\Configuration;
6
use BenTools\MercurePHP\Exception\Http\AccessDeniedHttpException;
7
use BenTools\MercurePHP\Exception\Http\BadRequestHttpException;
8
use BenTools\MercurePHP\Helpers\QueryStringParser;
9
use BenTools\MercurePHP\Model\Subscription;
10
use BenTools\MercurePHP\Security\Authenticator;
11
use BenTools\MercurePHP\Security\TopicMatcher;
12
use BenTools\MercurePHP\Model\Message;
13
use Lcobucci\JWT\Token;
14
use Psr\Http\Message\RequestInterface;
15
use Psr\Http\Message\ResponseInterface;
16
use Psr\Http\Message\ServerRequestInterface as Request;
17
use Ramsey\Uuid\Uuid;
18
use React\EventLoop\Factory;
19
use React\EventLoop\LoopInterface;
20
use React\Http\Message\Response;
21
use React\Promise\PromiseInterface;
22
use React\Stream\ThroughStream;
23
use React\Stream\WritableStreamInterface as Stream;
24
25
use function BenTools\MercurePHP\nullify;
26
use function BenTools\QueryString\query_string;
27
use function React\Promise\all;
28
use function React\Promise\any;
29
use function React\Promise\resolve;
30
31
final class SubscribeController extends AbstractController
32
{
33
    private Authenticator $authenticator;
34 9
    private LoopInterface $loop;
35
    private QueryStringParser $queryStringParser;
36 9
    private bool $allowAnonymous;
37 9
38 9
    public function __construct(array $config, Authenticator $authenticator, ?LoopInterface $loop = null)
39 9
    {
40 9
        $this->config = $config;
41 9
        $this->allowAnonymous = $config[Configuration::ALLOW_ANONYMOUS];
42
        $this->authenticator = $authenticator;
43 7
        $this->queryStringParser = new QueryStringParser();
44
        $this->loop = $loop ?? Factory::create();
45 7
    }
46
47 2
    public function __invoke(Request $request): ResponseInterface
48
    {
49
        $request = $this->withAttributes($request);
50
51 2
        if ('OPTIONS' === $request->getMethod()) {
52
            return new Response(200);
53 2
        }
54 2
55 2
        $stream = new ThroughStream();
56 2
57 2
        $lastEventID = $request->getAttribute('lastEventId');
58
        $subscribedTopics = $request->getAttribute('subscribedTopics');
59
        $this->loop
60 2
            ->futureTick(
61
                fn() => $this->dispatchSubscriptions($request->getAttribute('subscriptions'))
62
                    ->then(fn() => $this->fetchMissedMessages($lastEventID, $subscribedTopics))
63 2
                    ->then(fn(iterable $messages) => $this->sendMissedMessages($messages, $request, $stream))
64
                    ->then(fn() => $this->subscribe($request, $stream))
65
            );
66
67 2
        $headers = [
68
            'Content-Type' => 'text/event-stream',
69
            'Cache-Control' => 'no-cache',
70 5
        ];
71
72 5
        return new Response(200, $headers, $stream);
73 5
    }
74
75
    public function matchRequest(RequestInterface $request): bool
76 7
    {
77
        return \in_array($request->getMethod(), ['GET', 'OPTIONS'], true)
78
            && '/.well-known/mercure' === $request->getUri()->getPath();
79 7
    }
80 2
81 2
    private function withAttributes(Request $request): Request
82
    {
83
        try {
84 5
            $token = $this->authenticator->authenticate($request);
85 2
        } catch (\RuntimeException $e) {
86
            throw new AccessDeniedHttpException($e->getMessage());
87
        }
88 3
89 3
        if (null === $token && false === $this->allowAnonymous) {
90
            throw new AccessDeniedHttpException('Anonymous subscriptions are not allowed on this hub.', 401);
91 3
        }
92 1
93
        $qs = query_string($request->getUri(), $this->queryStringParser);
94
        $subscribedTopics = \array_map('\\urldecode', $qs->getParam('topic') ?? []);
95
96 2
        if ([] === $subscribedTopics) {
97 2
            throw new BadRequestHttpException('Missing "topic" parameter.');
98 2
        }
99 2
100
        $subscriptions = $this->createSubscriptions(
101
            $subscribedTopics,
102 2
            $request->getAttribute('clientId'),
103
            $token
104
        );
105
106
        $request = $request
107
            ->withQueryParams($qs->getParams())
108
            ->withAttribute('token', $token)
109
            ->withAttribute('subscribedTopics', $subscribedTopics)
110
            ->withAttribute('lastEventId', $this->getLastEventID($request, $qs->getParams()))
111
            ->withAttribute('subscriptions', $subscriptions)
112
        ;
113
114
        return $request;
115
    }
116
117
    private function createSubscriptions(array $subscribedTopics, string $clientId, ?Token $token): array
118
    {
119
        if (false === $this->config[Configuration::SUBSCRIPTIONS]) {
120
            return [];
121
        }
122
123
        $subscriptions = [];
124
        foreach ($subscribedTopics as $subscribedTopic) {
125
            $id = \sprintf('/.well-known/mercure/subscriptions/%s/%s', \urlencode($subscribedTopic), $clientId);
126
            try {
127
                $payload = null !== $token ? ($token->getClaim('mercure')['payload'] ?? null) : null;
128
            } catch (\Exception $e) {
129
                $payload = null;
130
            }
131
            $subscriptions[] = new Subscription(
132
                $id,
133
                $clientId,
134
                $subscribedTopic,
135
                true,
136
                $payload,
137
            );
138
        }
139
140
        return $subscriptions;
141
    }
142
143
    /**
144
     * @param Subscription[] $subscriptions
145
     */
146
    private function dispatchSubscriptions(array $subscriptions): PromiseInterface
147
    {
148
        $promises = [$this->storage->storeSubscriptions($subscriptions)];
0 ignored issues
show
Bug introduced by
The method storeSubscriptions() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

148
        $promises = [$this->storage->/** @scrutinizer ignore-call */ storeSubscriptions($subscriptions)];

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
149
        foreach ($subscriptions as $subscription) {
150
            $message = new Message(
151
                (string) Uuid::uuid4(),
152
                \json_encode($subscription, \JSON_THROW_ON_ERROR),
153
                true,
154
            );
155
            $topic = $subscription->getId();
156
            $promises[] = $this->transport->publish($topic, $message);
0 ignored issues
show
Bug introduced by
The method publish() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

156
            /** @scrutinizer ignore-call */ 
157
            $promises[] = $this->transport->publish($topic, $message);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
157
        }
158
159
        return any($promises);
160
    }
161
162
    private function subscribe(Request $request, Stream $stream): PromiseInterface
163
    {
164
        $subscribedTopics = $request->getAttribute('subscribedTopics');
165
        $token = $request->getAttribute('token');
166
        $promises = [];
167
        foreach ($subscribedTopics as $topicSelector) {
168
            if (!TopicMatcher::canSubscribeToTopic($topicSelector, $token, $this->allowAnonymous)) {
169
                $clientId = $request->getAttribute('clientId');
170
                $this->logger()->debug("Client {$clientId} cannot subscribe to {$topicSelector}");
171
                continue;
172
            }
173
            $promises[] = $this->transport
174
                ->subscribe(
175
                    $topicSelector,
176
                    fn(string $topic, Message $message) => $this->sendIfAllowed($topic, $message, $request, $stream)
177
                )
178 2
                ->then(function (string $topic) use ($request) {
179
                    $clientId = $request->getAttribute('clientId');
180 2
                    $this->logger()->debug("Client {$clientId} subscribed to {$topic}");
181 2
                });
182 2
        }
183 2
184
        if ([] === $promises) {
185
            return resolve(true);
186
        }
187
188
        return all($promises);
189
    }
190
191
    private function fetchMissedMessages(?string $lastEventID, array $subscribedTopics): PromiseInterface
192
    {
193
        if (null === $lastEventID) {
194
            return resolve([]);
195
        }
196
197
        return $this->storage->retrieveMessagesAfterId($lastEventID, $subscribedTopics);
198
    }
199
200
    private function sendMissedMessages(iterable $messages, Request $request, Stream $stream): PromiseInterface
201
    {
202
        $promises = [];
203
        foreach ($messages as $topic => $message) {
204
            $promises[] = $this->sendIfAllowed($topic, $message, $request, $stream);
205
        }
206
207
        if ([] === $promises) {
208
            return resolve(true);
209
        }
210
211
        return all($promises);
212
    }
213
214
    private function sendIfAllowed(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface
215
    {
216
        $subscribedTopics = $request->getAttribute('subscribedTopics');
217
        $token = $request->getAttribute('token');
218
        if (!TopicMatcher::canReceiveUpdate($topic, $message, $subscribedTopics, $token, $this->allowAnonymous)) {
219
            return resolve(false);
220
        }
221
222
        return resolve($this->send($topic, $message, $request, $stream));
223
    }
224
225
    private function send(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface
226
    {
227
        $stream->write((string) $message);
228
        $clientId = $request->getAttribute('clientId');
229
        $id = $message->getId();
230
        $this->logger()->debug("Dispatched message {$id} to client {$clientId} on topic {$topic}");
231
232
        return resolve(true);
233
    }
234
235
    private function getLastEventID(Request $request, array $queryParams): ?string
236
    {
237
        return nullify($request->getHeaderLine('Last-Event-ID'))
238
            ?? nullify($queryParams['Last-Event-ID'] ?? null)
239
            ?? nullify($queryParams['Last-Event-Id'] ?? null)
240
            ?? nullify($queryParams['last-event-id'] ?? null);
241
    }
242
}
243