Passed
Pull Request — master (#17)
by BENOIT
02:08
created

SubscribeController::createSubscriptions()   A

Complexity

Conditions 5
Paths 6

Size

Total Lines 24
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 30

Importance

Changes 0
Metric Value
cc 5
eloc 16
nc 6
nop 3
dl 0
loc 24
ccs 0
cts 11
cp 0
crap 30
rs 9.4222
c 0
b 0
f 0
1
<?php
2
3
namespace BenTools\MercurePHP\Controller;
4
5
use BenTools\MercurePHP\Configuration\Configuration;
6
use BenTools\MercurePHP\Exception\Http\AccessDeniedHttpException;
7
use BenTools\MercurePHP\Exception\Http\BadRequestHttpException;
8
use BenTools\MercurePHP\Helpers\QueryStringParser;
9
use BenTools\MercurePHP\Model\Subscription;
10
use BenTools\MercurePHP\Security\Authenticator;
11
use BenTools\MercurePHP\Security\TopicMatcher;
12
use BenTools\MercurePHP\Model\Message;
13
use BenTools\MercurePHP\Storage\StorageInterface;
14
use BenTools\MercurePHP\Transport\TransportInterface;
15
use Lcobucci\JWT\Token;
16
use Psr\Http\Message\RequestInterface;
17
use Psr\Http\Message\ResponseInterface;
18
use Psr\Http\Message\ServerRequestInterface as Request;
19
use Psr\Log\LoggerInterface;
20
use Ramsey\Uuid\Uuid;
21
use React\EventLoop\Factory;
22
use React\EventLoop\LoopInterface;
23
use React\Http\Message\Response;
24
use React\Promise\PromiseInterface;
25
use React\Stream\ThroughStream;
26
use React\Stream\WritableStreamInterface as Stream;
27
28
use function BenTools\MercurePHP\nullify;
29
use function BenTools\QueryString\query_string;
30
use function React\Promise\all;
31
use function React\Promise\any;
32
use function React\Promise\resolve;
33
34 9
final class SubscribeController extends AbstractController
35
{
36 9
    private Authenticator $authenticator;
37 9
    private LoopInterface $loop;
38 9
    private QueryStringParser $queryStringParser;
39 9
    private bool $allowAnonymous;
40 9
41 9
    public function __construct(
42
        array $config,
43 7
        StorageInterface $storage,
44
        TransportInterface $transport,
45 7
        Authenticator $authenticator,
46
        ?LoopInterface $loop = null,
47 2
        ?LoggerInterface $logger = null
48
    ) {
49
        $this->config = $config;
50
        $this->storage = $storage;
51 2
        $this->transport = $transport;
52
        $this->allowAnonymous = $config[Configuration::ALLOW_ANONYMOUS];
53 2
        $this->authenticator = $authenticator;
54 2
        $this->queryStringParser = new QueryStringParser();
55 2
        $this->loop = $loop ?? Factory::create();
56 2
        $this->logger = $logger;
0 ignored issues
show
Deprecated Code introduced by
The property BenTools\MercurePHP\Help...ggerAwareTrait::$logger has been deprecated: - Please call $this->logger() instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

56
        /** @scrutinizer ignore-deprecated */ $this->logger = $logger;

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
57 2
    }
58
59
    public function __invoke(Request $request): ResponseInterface
60 2
    {
61
        $request = $this->withAttributes($request);
62
63 2
        if ('OPTIONS' === $request->getMethod()) {
64
            return new Response(200);
65
        }
66
67 2
        $stream = new ThroughStream();
68
69
        $lastEventID = $request->getAttribute('lastEventId');
70 5
        $subscribedTopics = $request->getAttribute('subscribedTopics');
71
        $this->loop
72 5
            ->futureTick(
73 5
                fn() => $this->dispatchSubscriptions($request->getAttribute('subscriptions'))
74
                    ->then(fn() => $this->fetchMissedMessages($lastEventID, $subscribedTopics))
75
                    ->then(fn(iterable $messages) => $this->sendMissedMessages($messages, $request, $stream))
76 7
                    ->then(fn() => $this->subscribe($request, $stream))
77
            );
78
79 7
        $headers = [
80 2
            'Content-Type' => 'text/event-stream',
81 2
            'Cache-Control' => 'no-cache',
82
        ];
83
84 5
        return new Response(200, $headers, $stream);
85 2
    }
86
87
    public function matchRequest(RequestInterface $request): bool
88 3
    {
89 3
        return \in_array($request->getMethod(), ['GET', 'OPTIONS'], true)
90
            && '/.well-known/mercure' === $request->getUri()->getPath();
91 3
    }
92 1
93
    private function withAttributes(Request $request): Request
94
    {
95
        try {
96 2
            $token = $this->authenticator->authenticate($request);
97 2
        } catch (\RuntimeException $e) {
98 2
            throw new AccessDeniedHttpException($e->getMessage());
99 2
        }
100
101
        if (null === $token && false === $this->allowAnonymous) {
102 2
            throw new AccessDeniedHttpException('Anonymous subscriptions are not allowed on this hub.', 401);
103
        }
104
105
        $qs = query_string($request->getUri(), $this->queryStringParser);
106
        $subscribedTopics = \array_map('\\urldecode', $qs->getParam('topic') ?? []);
107
108
        if ([] === $subscribedTopics) {
109
            throw new BadRequestHttpException('Missing "topic" parameter.');
110
        }
111
112
        $subscriptions = $this->createSubscriptions(
113
            $subscribedTopics,
114
            $request->getAttribute('clientId'),
115
            $token
116
        );
117
118
        $request = $request
119
            ->withQueryParams($qs->getParams())
120
            ->withAttribute('token', $token)
121
            ->withAttribute('subscribedTopics', $subscribedTopics)
122
            ->withAttribute('lastEventId', $this->getLastEventID($request, $qs->getParams()))
123
            ->withAttribute('subscriptions', $subscriptions)
124
        ;
125
126
        return $request;
127
    }
128
129
    private function createSubscriptions(array $subscribedTopics, string $clientId, ?Token $token): array
130
    {
131
        if (false === $this->config[Configuration::SUBSCRIPTIONS]) {
132
            return [];
133
        }
134
135
        $subscriptions = [];
136
        foreach ($subscribedTopics as $subscribedTopic) {
137
            $id = \sprintf('/.well-known/mercure/subscriptions/%s/%s', \urlencode($subscribedTopic), $clientId);
138
            try {
139
                $payload = null !== $token ? ($token->getClaim('mercure')['payload'] ?? null) : null;
140
            } catch (\Exception $e) {
141
                $payload = null;
142
            }
143
            $subscriptions[] = new Subscription(
144
                $id,
145
                $clientId,
146
                $subscribedTopic,
147
                true,
148
                $payload,
149
            );
150
        }
151
152
        return $subscriptions;
153
    }
154
155
    /**
156
     * @param Subscription[] $subscriptions
157
     */
158
    private function dispatchSubscriptions(array $subscriptions): PromiseInterface
159
    {
160
        $promises = [$this->storage->storeSubscriptions($subscriptions)];
0 ignored issues
show
Bug introduced by
The method storeSubscriptions() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

160
        $promises = [$this->storage->/** @scrutinizer ignore-call */ storeSubscriptions($subscriptions)];

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
161
        foreach ($subscriptions as $subscription) {
162
            $message = new Message(
163
                (string) Uuid::uuid4(),
164
                \json_encode($subscription, \JSON_THROW_ON_ERROR),
165
                true,
166
            );
167
            $topic = $subscription->getId();
168
            $promises[] = $this->transport->publish($topic, $message);
0 ignored issues
show
Bug introduced by
The method publish() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

168
            /** @scrutinizer ignore-call */ 
169
            $promises[] = $this->transport->publish($topic, $message);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
169
        }
170
171
        return any($promises);
172
    }
173
174
    private function subscribe(Request $request, Stream $stream): PromiseInterface
175
    {
176
        $subscribedTopics = $request->getAttribute('subscribedTopics');
177
        $token = $request->getAttribute('token');
178 2
        $promises = [];
179
        foreach ($subscribedTopics as $topicSelector) {
180 2
            if (!TopicMatcher::canSubscribeToTopic($topicSelector, $token, $this->allowAnonymous)) {
181 2
                $clientId = $request->getAttribute('clientId');
182 2
                $this->logger()->debug("Client {$clientId} cannot subscribe to {$topicSelector}");
183 2
                continue;
184
            }
185
            $promises[] = $this->transport
186
                ->subscribe(
187
                    $topicSelector,
188
                    fn(string $topic, Message $message) => $this->sendIfAllowed($topic, $message, $request, $stream)
189
                )
190
                ->then(function (string $topic) use ($request) {
191
                    $clientId = $request->getAttribute('clientId');
192
                    $this->logger()->debug("Client {$clientId} subscribed to {$topic}");
193
                });
194
        }
195
196
        if ([] === $promises) {
197
            return resolve(true);
198
        }
199
200
        return all($promises);
201
    }
202
203
    private function fetchMissedMessages(?string $lastEventID, array $subscribedTopics): PromiseInterface
204
    {
205
        if (null === $lastEventID) {
206
            return resolve([]);
207
        }
208
209
        return $this->storage->retrieveMessagesAfterId($lastEventID, $subscribedTopics);
210
    }
211
212
    private function sendMissedMessages(iterable $messages, Request $request, Stream $stream): PromiseInterface
213
    {
214
        $promises = [];
215
        foreach ($messages as $topic => $message) {
216
            $promises[] = $this->sendIfAllowed($topic, $message, $request, $stream);
217
        }
218
219
        if ([] === $promises) {
220
            return resolve(true);
221
        }
222
223
        return all($promises);
224
    }
225
226
    private function sendIfAllowed(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface
227
    {
228
        $subscribedTopics = $request->getAttribute('subscribedTopics');
229
        $token = $request->getAttribute('token');
230
        if (!TopicMatcher::canReceiveUpdate($topic, $message, $subscribedTopics, $token, $this->allowAnonymous)) {
231
            return resolve(false);
232
        }
233
234
        return resolve($this->send($topic, $message, $request, $stream));
235
    }
236
237
    private function send(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface
238
    {
239
        $stream->write((string) $message);
240
        $clientId = $request->getAttribute('clientId');
241
        $id = $message->getId();
242
        $this->logger()->debug("Dispatched message {$id} to client {$clientId} on topic {$topic}");
243
244
        return resolve(true);
245
    }
246
247
    private function getLastEventID(Request $request, array $queryParams): ?string
248
    {
249
        return nullify($request->getHeaderLine('Last-Event-ID'))
250
            ?? nullify($queryParams['Last-Event-ID'] ?? null)
251
            ?? nullify($queryParams['Last-Event-Id'] ?? null)
252
            ?? nullify($queryParams['last-event-id'] ?? null);
253
    }
254
}
255