Passed
Pull Request — master (#17)
by BENOIT
02:04
created

SubscribeController::fetchMissedMessages()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 7
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
eloc 3
nc 2
nop 2
dl 0
loc 7
ccs 0
cts 4
cp 0
crap 6
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace BenTools\MercurePHP\Controller;
4
5
use BenTools\MercurePHP\Configuration\Configuration;
6
use BenTools\MercurePHP\Exception\Http\AccessDeniedHttpException;
7
use BenTools\MercurePHP\Exception\Http\BadRequestHttpException;
8
use BenTools\MercurePHP\Helpers\QueryStringParser;
9
use BenTools\MercurePHP\Hub\Hub;
10
use BenTools\MercurePHP\Model\Subscription;
11
use BenTools\MercurePHP\Security\Authenticator;
12
use BenTools\MercurePHP\Security\TopicMatcher;
13
use BenTools\MercurePHP\Model\Message;
14
use Lcobucci\JWT\Token;
15
use Psr\Http\Message\RequestInterface;
16
use Psr\Http\Message\ResponseInterface;
17
use Psr\Http\Message\ServerRequestInterface as Request;
18
use Psr\Log\LoggerInterface;
19
use React\Http\Message\Response;
20
use React\Promise\PromiseInterface;
21
use React\Stream\ThroughStream;
22
use React\Stream\WritableStreamInterface as Stream;
23
24
use function BenTools\MercurePHP\nullify;
25
use function BenTools\QueryString\query_string;
26
use function React\Promise\all;
27
use function React\Promise\resolve;
28
29
final class SubscribeController extends AbstractController
30
{
31
    private Authenticator $authenticator;
32
    private QueryStringParser $queryStringParser;
33
    private bool $allowAnonymous;
34 9
    /**
35
     * @var Hub
36 9
     */
37 9
    private Hub $hub;
38 9
39 9
    public function __construct(
40 9
        array $config,
41 9
        Hub $hub,
42
        Authenticator $authenticator,
43 7
        ?LoggerInterface $logger = null
44
    ) {
45 7
        $this->config = $config;
46
        $this->hub = $hub;
47 2
        $this->allowAnonymous = $config[Configuration::ALLOW_ANONYMOUS];
48
        $this->authenticator = $authenticator;
49
        $this->queryStringParser = new QueryStringParser();
50
        $this->logger = $logger;
0 ignored issues
show
Deprecated Code introduced by
The property BenTools\MercurePHP\Help...ggerAwareTrait::$logger has been deprecated: - Please call $this->logger() instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

50
        /** @scrutinizer ignore-deprecated */ $this->logger = $logger;

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
51 2
    }
52
53 2
    public function __invoke(Request $request): ResponseInterface
54 2
    {
55 2
        $request = $this->withAttributes($request);
56 2
57 2
        if ('OPTIONS' === $request->getMethod()) {
58
            return new Response(200);
59
        }
60 2
61
        $stream = new ThroughStream();
62
63 2
        $lastEventID = $request->getAttribute('lastEventId');
64
        $subscribedTopics = $request->getAttribute('subscribedTopics');
65
        $this->hub->hook(
66
            fn() => $this->hub->dispatchSubscriptions($request->getAttribute('subscriptions'))
67 2
                ->then(fn() => $this->hub->fetchMissedMessages($lastEventID, $subscribedTopics))
68
                ->then(fn(iterable $messages) => $this->sendMissedMessages($messages, $request, $stream))
69
                ->then(fn() => $this->subscribe($request, $stream))
70 5
        );
71
72 5
        $headers = [
73 5
            'Content-Type' => 'text/event-stream',
74
            'Cache-Control' => 'no-cache',
75
        ];
76 7
77
        return new Response(200, $headers, $stream);
78
    }
79 7
80 2
    public function matchRequest(RequestInterface $request): bool
81 2
    {
82
        return \in_array($request->getMethod(), ['GET', 'OPTIONS'], true)
83
            && '/.well-known/mercure' === $request->getUri()->getPath();
84 5
    }
85 2
86
    private function withAttributes(Request $request): Request
87
    {
88 3
        try {
89 3
            $token = $this->authenticator->authenticate($request);
90
        } catch (\RuntimeException $e) {
91 3
            throw new AccessDeniedHttpException($e->getMessage());
92 1
        }
93
94
        if (null === $token && false === $this->allowAnonymous) {
95
            throw new AccessDeniedHttpException('Anonymous subscriptions are not allowed on this hub.', 401);
96 2
        }
97 2
98 2
        $qs = query_string($request->getUri(), $this->queryStringParser);
99 2
        $subscribedTopics = \array_map('\\urldecode', $qs->getParam('topic') ?? []);
100
101
        if ([] === $subscribedTopics) {
102 2
            throw new BadRequestHttpException('Missing "topic" parameter.');
103
        }
104
105
        $subscriptions = $this->createSubscriptions(
106
            $subscribedTopics,
107
            $request->getAttribute('clientId'),
108
            $token
109
        );
110
111
        $request = $request
112
            ->withQueryParams($qs->getParams())
113
            ->withAttribute('token', $token)
114
            ->withAttribute('subscribedTopics', $subscribedTopics)
115
            ->withAttribute('lastEventId', $this->getLastEventID($request, $qs->getParams()))
116
            ->withAttribute('subscriptions', $subscriptions ?? [])
117
        ;
118
119
        return $request;
120
    }
121
122
    private function createSubscriptions(array $subscribedTopics, string $clientId, ?Token $token): array
123
    {
124
        if (false === $this->config[Configuration::SUBSCRIPTIONS]) {
125
            return [];
126
        }
127
128
        if (null !== $token) {
129
            $payload = $token->getClaim('mercure')->payload ?? null;
130
        }
131
132
        $subscriptions = [];
133
        foreach ($subscribedTopics as $subscribedTopic) {
134
            $id = \sprintf('/.well-known/mercure/subscriptions/%s/%s', \urlencode($subscribedTopic), $clientId);
135
            $subscriptions[] = new Subscription(
136
                $id,
137
                $clientId,
138
                $subscribedTopic,
139
                $payload ?? null,
140
            );
141
        }
142
143
        return $subscriptions;
144
    }
145
146
    private function subscribe(Request $request, Stream $stream): PromiseInterface
147
    {
148
        $subscribedTopics = $request->getAttribute('subscribedTopics');
149
        $token = $request->getAttribute('token');
150
        $subscriber = $request->getAttribute('clientId');
151
        $promises = [];
152
        foreach ($subscribedTopics as $topicSelector) {
153
            $promises[] = $this->hub->subscribe(
154
                $subscriber,
155
                $topicSelector,
156
                $token,
157
                fn(string $topic, Message $message) => $this->sendIfAllowed($topic, $message, $request, $stream)
158
            );
159
        }
160
161
        if ([] === $promises) {
162
            return resolve(true);
163
        }
164
165
        return all($promises);
166
    }
167
168
    private function sendMissedMessages(iterable $messages, Request $request, Stream $stream): PromiseInterface
169
    {
170
        $promises = [];
171
        foreach ($messages as $topic => $message) {
172
            $promises[] = $this->sendIfAllowed($topic, $message, $request, $stream);
173
        }
174
175
        if ([] === $promises) {
176
            return resolve(true);
177
        }
178 2
179
        return all($promises);
180 2
    }
181 2
182 2
    private function sendIfAllowed(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface
183 2
    {
184
        $subscribedTopics = $request->getAttribute('subscribedTopics');
185
        $token = $request->getAttribute('token');
186
        if (!TopicMatcher::canReceiveUpdate($topic, $message, $subscribedTopics, $token, $this->allowAnonymous)) {
187
            return resolve(false);
188
        }
189
190
        return resolve($this->send($topic, $message, $request, $stream));
191
    }
192
193
    private function send(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface
194
    {
195
        $stream->write((string) $message);
196
        $clientId = $request->getAttribute('clientId');
197
        $id = $message->getId();
198
        $this->logger()->debug("Dispatched message {$id} to client {$clientId} on topic {$topic}");
199
200
        return resolve(true);
201
    }
202
203
    private function getLastEventID(Request $request, array $queryParams): ?string
204
    {
205
        return nullify($request->getHeaderLine('Last-Event-ID'))
206
            ?? nullify($queryParams['Last-Event-ID'] ?? null)
207
            ?? nullify($queryParams['Last-Event-Id'] ?? null)
208
            ?? nullify($queryParams['last-event-id'] ?? null);
209
    }
210
}
211