Passed
Pull Request — master (#17)
by BENOIT
08:21
created

SubscribeController::createSubscriptions()   A

Complexity

Conditions 4
Paths 5

Size

Total Lines 23
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 1
Bugs 1 Features 0
Metric Value
cc 4
eloc 14
nc 5
nop 3
dl 0
loc 23
ccs 0
cts 11
cp 0
crap 20
rs 9.7998
c 1
b 1
f 0
1
<?php
2
3
namespace BenTools\MercurePHP\Controller;
4
5
use BenTools\MercurePHP\Configuration\Configuration;
6
use BenTools\MercurePHP\Exception\Http\AccessDeniedHttpException;
7
use BenTools\MercurePHP\Exception\Http\BadRequestHttpException;
8
use BenTools\MercurePHP\Helpers\QueryStringParser;
9
use BenTools\MercurePHP\Hub\Hub;
10
use BenTools\MercurePHP\Model\Subscription;
11
use BenTools\MercurePHP\Security\Authenticator;
12
use BenTools\MercurePHP\Security\TopicMatcher;
13
use BenTools\MercurePHP\Model\Message;
14
use Lcobucci\JWT\Token;
15
use Psr\Http\Message\RequestInterface;
16
use Psr\Http\Message\ResponseInterface;
17
use Psr\Http\Message\ServerRequestInterface as Request;
18
use Psr\Log\LoggerInterface;
19
use React\EventLoop\Factory;
20
use React\EventLoop\LoopInterface;
21
use React\Http\Message\Response;
22
use React\Promise\PromiseInterface;
23
use React\Stream\ThroughStream;
24
use React\Stream\WritableStreamInterface as Stream;
25
26
use function BenTools\MercurePHP\nullify;
27
use function BenTools\QueryString\query_string;
28
use function React\Promise\all;
29
use function React\Promise\resolve;
30
31
final class SubscribeController extends AbstractController
32
{
33
    private Authenticator $authenticator;
34 9
    private LoopInterface $loop;
35
    private QueryStringParser $queryStringParser;
36 9
    private bool $allowAnonymous;
37 9
    /**
38 9
     * @var Hub
39 9
     */
40 9
    private Hub $hub;
41 9
42
    public function __construct(
43 7
        array $config,
44
        Hub $hub,
45 7
        Authenticator $authenticator,
46
        ?LoopInterface $loop = null,
47 2
        ?LoggerInterface $logger = null
48
    ) {
49
        $this->config = $config;
50
        $this->hub = $hub;
51 2
        $this->allowAnonymous = $config[Configuration::ALLOW_ANONYMOUS];
52
        $this->authenticator = $authenticator;
53 2
        $this->queryStringParser = new QueryStringParser();
54 2
        $this->loop = $loop ?? Factory::create();
55 2
        $this->logger = $logger;
0 ignored issues
show
Deprecated Code introduced by
The property BenTools\MercurePHP\Help...ggerAwareTrait::$logger has been deprecated: - Please call $this->logger() instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

55
        /** @scrutinizer ignore-deprecated */ $this->logger = $logger;

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
56 2
    }
57 2
58
    public function __invoke(Request $request): ResponseInterface
59
    {
60 2
        $request = $this->withAttributes($request);
61
62
        if ('OPTIONS' === $request->getMethod()) {
63 2
            return new Response(200);
64
        }
65
66
        $stream = new ThroughStream();
67 2
68
        $lastEventID = $request->getAttribute('lastEventId');
69
        $subscribedTopics = $request->getAttribute('subscribedTopics');
70 5
        $this->loop
71
            ->futureTick(
72 5
                fn() => $this->hub->dispatchSubscriptions($request->getAttribute('subscriptions'))
73 5
                    ->then(fn() => $this->hub->fetchMissedMessages($lastEventID, $subscribedTopics))
74
                    ->then(fn(iterable $messages) => $this->sendMissedMessages($messages, $request, $stream))
75
                    ->then(fn() => $this->subscribe($request, $stream))
76 7
            );
77
78
        $headers = [
79 7
            'Content-Type' => 'text/event-stream',
80 2
            'Cache-Control' => 'no-cache',
81 2
        ];
82
83
        return new Response(200, $headers, $stream);
84 5
    }
85 2
86
    public function matchRequest(RequestInterface $request): bool
87
    {
88 3
        return \in_array($request->getMethod(), ['GET', 'OPTIONS'], true)
89 3
            && '/.well-known/mercure' === $request->getUri()->getPath();
90
    }
91 3
92 1
    private function withAttributes(Request $request): Request
93
    {
94
        try {
95
            $token = $this->authenticator->authenticate($request);
96 2
        } catch (\RuntimeException $e) {
97 2
            throw new AccessDeniedHttpException($e->getMessage());
98 2
        }
99 2
100
        if (null === $token && false === $this->allowAnonymous) {
101
            throw new AccessDeniedHttpException('Anonymous subscriptions are not allowed on this hub.', 401);
102 2
        }
103
104
        $qs = query_string($request->getUri(), $this->queryStringParser);
105
        $subscribedTopics = \array_map('\\urldecode', $qs->getParam('topic') ?? []);
106
107
        if ([] === $subscribedTopics) {
108
            throw new BadRequestHttpException('Missing "topic" parameter.');
109
        }
110
111
        $subscriptions = $this->createSubscriptions(
112
            $subscribedTopics,
113
            $request->getAttribute('clientId'),
114
            $token
115
        );
116
117
        $request = $request
118
            ->withQueryParams($qs->getParams())
119
            ->withAttribute('token', $token)
120
            ->withAttribute('subscribedTopics', $subscribedTopics)
121
            ->withAttribute('lastEventId', $this->getLastEventID($request, $qs->getParams()))
122
            ->withAttribute('subscriptions', $subscriptions ?? [])
123
        ;
124
125
        return $request;
126
    }
127
128
    private function createSubscriptions(array $subscribedTopics, string $clientId, ?Token $token): array
129
    {
130
        if (false === $this->config[Configuration::SUBSCRIPTIONS]) {
131
            return [];
132
        }
133
134
        if (null !== $token) {
135
            $payload = $token->getClaim('mercure')->payload ?? null;
136
        }
137
138
        $subscriptions = [];
139
        foreach ($subscribedTopics as $subscribedTopic) {
140
            $id = \sprintf('/.well-known/mercure/subscriptions/%s/%s', \urlencode($subscribedTopic), $clientId);
141
            $subscriptions[] = new Subscription(
142
                $id,
143
                $clientId,
144
                $subscribedTopic,
145
                true,
146
                $payload ?? null,
147
            );
148
        }
149
150
        return $subscriptions;
151
    }
152
153
    private function subscribe(Request $request, Stream $stream): PromiseInterface
154
    {
155
        $subscribedTopics = $request->getAttribute('subscribedTopics');
156
        $token = $request->getAttribute('token');
157
        $subscriber = $request->getAttribute('clientId');
158
        $promises = [];
159
        foreach ($subscribedTopics as $topicSelector) {
160
            $promises[] = $this->hub->subscribe(
161
                $subscriber,
162
                $topicSelector,
163
                $token,
164
                fn(string $topic, Message $message) => $this->sendIfAllowed($topic, $message, $request, $stream)
165
            );
166
        }
167
168
        if ([] === $promises) {
169
            return resolve(true);
170
        }
171
172
        return all($promises);
173
    }
174
175
    private function sendMissedMessages(iterable $messages, Request $request, Stream $stream): PromiseInterface
176
    {
177
        $promises = [];
178 2
        foreach ($messages as $topic => $message) {
179
            $promises[] = $this->sendIfAllowed($topic, $message, $request, $stream);
180 2
        }
181 2
182 2
        if ([] === $promises) {
183 2
            return resolve(true);
184
        }
185
186
        return all($promises);
187
    }
188
189
    private function sendIfAllowed(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface
190
    {
191
        $subscribedTopics = $request->getAttribute('subscribedTopics');
192
        $token = $request->getAttribute('token');
193
        if (!TopicMatcher::canReceiveUpdate($topic, $message, $subscribedTopics, $token, $this->allowAnonymous)) {
194
            return resolve(false);
195
        }
196
197
        return resolve($this->send($topic, $message, $request, $stream));
198
    }
199
200
    private function send(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface
201
    {
202
        $stream->write((string) $message);
203
        $clientId = $request->getAttribute('clientId');
204
        $id = $message->getId();
205
        $this->logger()->debug("Dispatched message {$id} to client {$clientId} on topic {$topic}");
206
207
        return resolve(true);
208
    }
209
210
    private function getLastEventID(Request $request, array $queryParams): ?string
211
    {
212
        return nullify($request->getHeaderLine('Last-Event-ID'))
213
            ?? nullify($queryParams['Last-Event-ID'] ?? null)
214
            ?? nullify($queryParams['Last-Event-Id'] ?? null)
215
            ?? nullify($queryParams['last-event-id'] ?? null);
216
    }
217
}
218