Passed
Pull Request — master (#17)
by BENOIT
02:04
created

SubscribeController::createSubscriptions()   A

Complexity

Conditions 4
Paths 5

Size

Total Lines 22
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 1
Bugs 1 Features 0
Metric Value
cc 4
eloc 13
nc 5
nop 3
dl 0
loc 22
rs 9.8333
c 1
b 1
f 0
ccs 0
cts 10
cp 0
crap 20
1
<?php
2
3
namespace BenTools\MercurePHP\Controller;
4
5
use BenTools\MercurePHP\Configuration\Configuration;
6
use BenTools\MercurePHP\Exception\Http\AccessDeniedHttpException;
7
use BenTools\MercurePHP\Exception\Http\BadRequestHttpException;
8
use BenTools\MercurePHP\Helpers\QueryStringParser;
9
use BenTools\MercurePHP\Hub\Hub;
10
use BenTools\MercurePHP\Model\Subscription;
11
use BenTools\MercurePHP\Security\Authenticator;
12
use BenTools\MercurePHP\Security\TopicMatcher;
13
use BenTools\MercurePHP\Model\Message;
14
use Lcobucci\JWT\Token;
15
use Psr\Http\Message\RequestInterface;
16
use Psr\Http\Message\ResponseInterface;
17
use Psr\Http\Message\ServerRequestInterface as Request;
18
use Psr\Log\LoggerInterface;
19
use React\Http\Message\Response;
20
use React\Promise\PromiseInterface;
21
use React\Stream\ThroughStream;
22
use React\Stream\WritableStreamInterface as Stream;
23
24
use function BenTools\MercurePHP\nullify;
25
use function BenTools\QueryString\query_string;
26
use function React\Promise\all;
27
use function React\Promise\resolve;
28
29
final class SubscribeController extends AbstractController
30
{
31
    private Authenticator $authenticator;
32
    private QueryStringParser $queryStringParser;
33
    private bool $allowAnonymous;
34 9
    /**
35
     * @var Hub
36 9
     */
37 9
    private Hub $hub;
38 9
39 9
    public function __construct(
40 9
        array $config,
41 9
        Hub $hub,
42
        Authenticator $authenticator,
43 7
        ?LoggerInterface $logger = null
44
    ) {
45 7
        $this->config = $config;
46
        $this->hub = $hub;
47 2
        $this->allowAnonymous = $config[Configuration::ALLOW_ANONYMOUS];
48
        $this->authenticator = $authenticator;
49
        $this->queryStringParser = new QueryStringParser();
50
        $this->logger = $logger;
0 ignored issues
show
Deprecated Code introduced by
The property BenTools\MercurePHP\Help...ggerAwareTrait::$logger has been deprecated: - Please call $this->logger() instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

50
        /** @scrutinizer ignore-deprecated */ $this->logger = $logger;

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
51 2
    }
52
53 2
    public function __invoke(Request $request): ResponseInterface
54 2
    {
55 2
56 2
        if ('OPTIONS' === $request->getMethod()) {
57 2
            return new Response(200);
58
        }
59
60 2
        $request = $this->withAttributes($request);
61
62
        $stream = new ThroughStream();
63 2
64
        $lastEventID = $request->getAttribute('lastEventId');
65
        $subscribedTopics = $request->getAttribute('subscribedTopics');
66
        $this->hub->hook(
67 2
            fn() => $this->hub->dispatchSubscriptions($request->getAttribute('subscriptions'))
68
                ->then(fn() => $this->hub->fetchMissedMessages($lastEventID, $subscribedTopics))
69
                ->then(fn(iterable $messages) => $this->sendMissedMessages($messages, $request, $stream))
70 5
                ->then(fn() => $this->subscribe($request, $stream))
71
        );
72 5
73 5
        $headers = [
74
            'Content-Type' => 'text/event-stream',
75
            'Cache-Control' => 'no-cache',
76 7
        ];
77
78
        return new Response(200, $headers, $stream);
79 7
    }
80 2
81 2
    public function matchRequest(RequestInterface $request): bool
82
    {
83
        return \in_array($request->getMethod(), ['GET', 'OPTIONS'], true)
84 5
            && '/.well-known/mercure' === $request->getUri()->getPath();
85 2
    }
86
87
    private function withAttributes(Request $request): Request
88 3
    {
89 3
        try {
90
            $token = $this->authenticator->authenticate($request);
91 3
        } catch (\RuntimeException $e) {
92 1
            throw new AccessDeniedHttpException($e->getMessage());
93
        }
94
95
        if (null === $token && false === $this->allowAnonymous) {
96 2
            throw new AccessDeniedHttpException('Anonymous subscriptions are not allowed on this hub.', 401);
97 2
        }
98 2
99 2
        $qs = query_string($request->getUri(), $this->queryStringParser);
100
        $subscribedTopics = \array_map('\\urldecode', $qs->getParam('topic') ?? []);
101
102 2
        if ([] === $subscribedTopics) {
103
            throw new BadRequestHttpException('Missing "topic" parameter.');
104
        }
105
106
        $subscriptions = $this->createSubscriptions(
107
            $subscribedTopics,
108
            $request->getAttribute('clientId'),
109
            $token
110
        );
111
112
        $request = $request
113
            ->withQueryParams($qs->getParams())
114
            ->withAttribute('token', $token)
115
            ->withAttribute('subscribedTopics', $subscribedTopics)
116
            ->withAttribute('lastEventId', $this->getLastEventID($request, $qs->getParams()))
117
            ->withAttribute('subscriptions', $subscriptions ?? [])
118
        ;
119
120
        return $request;
121
    }
122
123
    private function createSubscriptions(array $subscribedTopics, string $clientId, ?Token $token): array
124
    {
125
        if (false === $this->config[Configuration::SUBSCRIPTIONS]) {
126
            return [];
127
        }
128
129
        if (null !== $token) {
130
            $payload = $token->getClaim('mercure')->payload ?? null;
131
        }
132
133
        $subscriptions = [];
134
        foreach ($subscribedTopics as $subscribedTopic) {
135
            $id = \sprintf('/.well-known/mercure/subscriptions/%s/%s', \urlencode($subscribedTopic), $clientId);
136
            $subscriptions[] = new Subscription(
137
                $id,
138
                $clientId,
139
                $subscribedTopic,
140
                $payload ?? null,
141
            );
142
        }
143
144
        return $subscriptions;
145
    }
146
147
    private function subscribe(Request $request, Stream $stream): PromiseInterface
148
    {
149
        $subscribedTopics = $request->getAttribute('subscribedTopics');
150
        $token = $request->getAttribute('token');
151
        $subscriber = $request->getAttribute('clientId');
152
        $promises = [];
153
        foreach ($subscribedTopics as $topicSelector) {
154
            $promises[] = $this->hub->subscribe(
155
                $subscriber,
156
                $topicSelector,
157
                $token,
158
                fn(string $topic, Message $message) => $this->sendIfAllowed($topic, $message, $request, $stream)
159
            );
160
        }
161
162
        if ([] === $promises) {
163
            return resolve(true);
164
        }
165
166
        return all($promises);
167
    }
168
169
    private function sendMissedMessages(iterable $messages, Request $request, Stream $stream): PromiseInterface
170
    {
171
        $promises = [];
172
        foreach ($messages as $topic => $message) {
173
            $promises[] = $this->sendIfAllowed($topic, $message, $request, $stream);
174
        }
175
176
        if ([] === $promises) {
177
            return resolve(true);
178 2
        }
179
180 2
        return all($promises);
181 2
    }
182 2
183 2
    private function sendIfAllowed(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface
184
    {
185
        $subscribedTopics = $request->getAttribute('subscribedTopics');
186
        $token = $request->getAttribute('token');
187
        if (!TopicMatcher::canReceiveUpdate($topic, $message, $subscribedTopics, $token, $this->allowAnonymous)) {
188
            return resolve(false);
189
        }
190
191
        return resolve($this->send($topic, $message, $request, $stream));
192
    }
193
194
    private function send(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface
195
    {
196
        $stream->write((string) $message);
197
        $clientId = $request->getAttribute('clientId');
198
        $id = $message->getId();
199
        $this->logger()->debug("Dispatched message {$id} to client {$clientId} on topic {$topic}");
200
201
        return resolve(true);
202
    }
203
204
    private function getLastEventID(Request $request, array $queryParams): ?string
205
    {
206
        return nullify($request->getHeaderLine('Last-Event-ID'))
207
            ?? nullify($queryParams['Last-Event-ID'] ?? null)
208
            ?? nullify($queryParams['Last-Event-Id'] ?? null)
209
            ?? nullify($queryParams['last-event-id'] ?? null);
210
    }
211
}
212