Passed
Pull Request — master (#17)
by BENOIT
03:48
created

SubscribeController::fetchMissedMessages()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 7
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
eloc 3
nc 2
nop 2
dl 0
loc 7
ccs 0
cts 4
cp 0
crap 6
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace BenTools\MercurePHP\Controller;
4
5
use BenTools\MercurePHP\Configuration\Configuration;
6
use BenTools\MercurePHP\Exception\Http\AccessDeniedHttpException;
7
use BenTools\MercurePHP\Exception\Http\BadRequestHttpException;
8
use BenTools\MercurePHP\Helpers\QueryStringParser;
9
use BenTools\MercurePHP\Hub\Hub;
10
use BenTools\MercurePHP\Model\Subscription;
11
use BenTools\MercurePHP\Security\Authenticator;
12
use BenTools\MercurePHP\Security\TopicMatcher;
13
use BenTools\MercurePHP\Model\Message;
14
use Lcobucci\JWT\Token;
15
use Psr\Http\Message\RequestInterface;
16
use Psr\Http\Message\ResponseInterface;
17
use Psr\Http\Message\ServerRequestInterface as Request;
18
use Psr\Log\LoggerInterface;
19
use React\Http\Message\Response;
20
use React\Promise\PromiseInterface;
21
use React\Stream\ThroughStream;
22
use React\Stream\WritableStreamInterface as Stream;
23
24
use function BenTools\MercurePHP\nullify;
25
use function BenTools\QueryString\query_string;
26
use function React\Promise\all;
27
use function React\Promise\resolve;
28
29
final class SubscribeController extends AbstractController
30
{
31
    private Authenticator $authenticator;
32
    private QueryStringParser $queryStringParser;
33
    private bool $allowAnonymous;
34 9
    /**
35
     * @var Hub
36 9
     */
37 9
    private Hub $hub;
38 9
39 9
    public function __construct(
40 9
        array $config,
41 9
        Hub $hub,
42
        Authenticator $authenticator,
43 7
        ?LoggerInterface $logger = null
44
    ) {
45 7
        $this->config = $config;
46
        $this->hub = $hub;
47 2
        $this->allowAnonymous = $config[Configuration::ALLOW_ANONYMOUS];
48
        $this->authenticator = $authenticator;
49
        $this->queryStringParser = new QueryStringParser();
50
        $this->logger = $logger;
0 ignored issues
show
Deprecated Code introduced by
The property BenTools\MercurePHP\Help...ggerAwareTrait::$logger has been deprecated: - Please call $this->logger() instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

50
        /** @scrutinizer ignore-deprecated */ $this->logger = $logger;

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
51 2
    }
52
53 2
    public function __invoke(Request $request): ResponseInterface
54 2
    {
55 2
        $request = $this->withAttributes($request);
56 2
57 2
        if ('OPTIONS' === $request->getMethod()) {
58
            return new Response(200);
59
        }
60 2
61
        $stream = new ThroughStream();
62
63 2
        $lastEventID = $request->getAttribute('lastEventId');
64
        $subscribedTopics = $request->getAttribute('subscribedTopics');
65
        $this->hub->hook(
66
            fn() => $this->hub->dispatchSubscriptions($request->getAttribute('subscriptions'))
67 2
                ->then(fn() => $this->hub->fetchMissedMessages($lastEventID, $subscribedTopics))
68
                ->then(fn(iterable $messages) => $this->sendMissedMessages($messages, $request, $stream))
69
                ->then(fn() => $this->subscribe($request, $stream))
70 5
        );
71
72 5
        $headers = [
73 5
            'Content-Type' => 'text/event-stream',
74
            'Cache-Control' => 'no-cache',
75
        ];
76 7
77
        return new Response(200, $headers, $stream);
78
    }
79 7
80 2
    public function matchRequest(RequestInterface $request): bool
81 2
    {
82
        return \in_array($request->getMethod(), ['GET', 'OPTIONS'], true)
83
            && '/.well-known/mercure' === $request->getUri()->getPath();
84 5
    }
85 2
86
    private function withAttributes(Request $request): Request
87
    {
88 3
        try {
89 3
            $token = $this->authenticator->authenticate($request);
90
        } catch (\RuntimeException $e) {
91 3
            throw new AccessDeniedHttpException($e->getMessage());
92 1
        }
93
94
        if (null === $token && false === $this->allowAnonymous) {
95
            throw new AccessDeniedHttpException('Anonymous subscriptions are not allowed on this hub.', 401);
96 2
        }
97 2
98 2
        $qs = query_string($request->getUri(), $this->queryStringParser);
99 2
        $subscribedTopics = \array_map('\\urldecode', $qs->getParam('topic') ?? []);
100
101
        if ([] === $subscribedTopics) {
102 2
            throw new BadRequestHttpException('Missing "topic" parameter.');
103
        }
104
105
        $subscriptions = $this->createSubscriptions(
106
            $subscribedTopics,
107
            $request->getAttribute('clientId'),
108
            $token
109
        );
110
111
        $request = $request
112
            ->withQueryParams($qs->getParams())
113
            ->withAttribute('token', $token)
114
            ->withAttribute('subscribedTopics', $subscribedTopics)
115
            ->withAttribute('lastEventId', $this->getLastEventID($request, $qs->getParams()))
116
            ->withAttribute('subscriptions', $subscriptions ?? [])
117
        ;
118
119
        return $request;
120
    }
121
122
    private function createSubscriptions(array $subscribedTopics, string $clientId, ?Token $token): array
123
    {
124
        if (false === $this->config[Configuration::SUBSCRIPTIONS]) {
125
            return [];
126
        }
127
128
        if (null !== $token) {
129
            $payload = $token->getClaim('mercure')->payload ?? null;
130
        }
131
132
        $subscriptions = [];
133
        foreach ($subscribedTopics as $subscribedTopic) {
134
            $id = \sprintf('/.well-known/mercure/subscriptions/%s/%s', \urlencode($subscribedTopic), $clientId);
135
            $subscriptions[] = new Subscription(
136
                $id,
137
                $clientId,
138
                $subscribedTopic,
139
                true,
140
                $payload ?? null,
141
            );
142
        }
143
144
        return $subscriptions;
145
    }
146
147
    private function subscribe(Request $request, Stream $stream): PromiseInterface
148
    {
149
        $subscribedTopics = $request->getAttribute('subscribedTopics');
150
        $token = $request->getAttribute('token');
151
        $subscriber = $request->getAttribute('clientId');
152
        $promises = [];
153
        foreach ($subscribedTopics as $topicSelector) {
154
            $promises[] = $this->hub->subscribe(
155
                $subscriber,
156
                $topicSelector,
157
                $token,
158
                fn(string $topic, Message $message) => $this->sendIfAllowed($topic, $message, $request, $stream)
159
            );
160
        }
161
162
        if ([] === $promises) {
163
            return resolve(true);
164
        }
165
166
        return all($promises);
167
    }
168
169
    private function sendMissedMessages(iterable $messages, Request $request, Stream $stream): PromiseInterface
170
    {
171
        $promises = [];
172
        foreach ($messages as $topic => $message) {
173
            $promises[] = $this->sendIfAllowed($topic, $message, $request, $stream);
174
        }
175
176
        if ([] === $promises) {
177
            return resolve(true);
178 2
        }
179
180 2
        return all($promises);
181 2
    }
182 2
183 2
    private function sendIfAllowed(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface
184
    {
185
        $subscribedTopics = $request->getAttribute('subscribedTopics');
186
        $token = $request->getAttribute('token');
187
        if (!TopicMatcher::canReceiveUpdate($topic, $message, $subscribedTopics, $token, $this->allowAnonymous)) {
188
            return resolve(false);
189
        }
190
191
        return resolve($this->send($topic, $message, $request, $stream));
192
    }
193
194
    private function send(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface
195
    {
196
        $stream->write((string) $message);
197
        $clientId = $request->getAttribute('clientId');
198
        $id = $message->getId();
199
        $this->logger()->debug("Dispatched message {$id} to client {$clientId} on topic {$topic}");
200
201
        return resolve(true);
202
    }
203
204
    private function getLastEventID(Request $request, array $queryParams): ?string
205
    {
206
        return nullify($request->getHeaderLine('Last-Event-ID'))
207
            ?? nullify($queryParams['Last-Event-ID'] ?? null)
208
            ?? nullify($queryParams['Last-Event-Id'] ?? null)
209
            ?? nullify($queryParams['last-event-id'] ?? null);
210
    }
211
}
212