Passed
Push — master ( a3b46f...abd2be )
by BENOIT
03:03 queued 48s
created

SubscribeController   A

Complexity

Total Complexity 23

Size/Duplication

Total Lines 156
Duplicated Lines 0 %

Test Coverage

Coverage 47.73%

Importance

Changes 0
Metric Value
eloc 82
dl 0
loc 156
ccs 42
cts 88
cp 0.4773
rs 10
c 0
b 0
f 0
wmc 23

10 Methods

Rating   Name   Duplication   Size   Complexity  
A fetchMissedMessages() 0 7 2
A matchRequest() 0 4 2
A send() 0 8 1
A subscribe() 0 27 4
A __invoke() 0 25 2
A sendIfAllowed() 0 9 2
A getLastEventID() 0 6 1
A withAttributes() 0 27 5
A sendMissedMessages() 0 12 3
A __construct() 0 7 1
1
<?php
2
3
namespace BenTools\MercurePHP\Controller;
4
5
use BenTools\MercurePHP\Configuration\Configuration;
6
use BenTools\MercurePHP\Exception\Http\AccessDeniedHttpException;
7
use BenTools\MercurePHP\Exception\Http\BadRequestHttpException;
8
use BenTools\MercurePHP\Helpers\QueryStringParser;
9
use BenTools\MercurePHP\Security\Authenticator;
10
use BenTools\MercurePHP\Security\TopicMatcher;
11
use BenTools\MercurePHP\Transport\Message;
12
use Psr\Http\Message\RequestInterface;
13
use Psr\Http\Message\ResponseInterface;
14
use Psr\Http\Message\ServerRequestInterface as Request;
15
use React\EventLoop\Factory;
16
use React\Http;
17
use React\Promise\PromiseInterface;
18
use React\Stream\ThroughStream;
19
use React\Stream\WritableStreamInterface as Stream;
20
21
use function BenTools\MercurePHP\nullify;
22
use function BenTools\QueryString\query_string;
23
use function React\Promise\all;
24
use function React\Promise\resolve;
25
26
final class SubscribeController extends AbstractController
27
{
28
    private Authenticator $authenticator;
29
    private QueryStringParser $queryStringParser;
30
    private bool $allowAnonymous;
31
32 9
    public function __construct(array $config, Authenticator $authenticator)
33
    {
34 9
        $this->config = $config;
35 9
        $this->allowAnonymous = $config[Configuration::ALLOW_ANONYMOUS];
36 9
        $this->authenticator = $authenticator;
37 9
        $this->queryStringParser = new QueryStringParser();
38 9
        $this->loop = Factory::create();
39 9
    }
40
41 7
    public function __invoke(Request $request): ResponseInterface
42
    {
43 7
        $request = $this->withAttributes($request);
44
45 2
        if ('OPTIONS' === $request->getMethod()) {
46
            return new Http\Response(200);
47
        }
48
49 2
        $stream = new ThroughStream();
50
51 2
        $lastEventID = $request->getAttribute('lastEventId');
52 2
        $subscribedTopics = $request->getAttribute('subscribedTopics');
53 2
        $this->loop
54 2
            ->futureTick(
55 2
                fn() => $this->fetchMissedMessages($lastEventID, $subscribedTopics)
56
                        ->then(fn(iterable $messages) => $this->sendMissedMessages($messages, $request, $stream))
57
                        ->then(fn() => $this->subscribe($request, $stream))
58 2
            );
59
60
        $headers = [
61 2
            'Content-Type' => 'text/event-stream',
62
            'Cache-Control' => 'no-cache',
63
        ];
64
65 2
        return new Http\Response(200, $headers, $stream);
66
    }
67
68 5
    public function matchRequest(RequestInterface $request): bool
69
    {
70 5
        return \in_array($request->getMethod(), ['GET', 'OPTIONS'], true)
71 5
            && '/.well-known/mercure' === $request->getUri()->getPath();
72
    }
73
74 7
    private function withAttributes(Request $request): Request
75
    {
76
        try {
77 7
            $token = $this->authenticator->authenticate($request);
78 2
        } catch (\RuntimeException $e) {
79 2
            throw new AccessDeniedHttpException($e->getMessage());
80
        }
81
82 5
        if (null === $token && false === $this->allowAnonymous) {
83 2
            throw new AccessDeniedHttpException('Anonymous subscriptions are not allowed on this hub.', 401);
84
        }
85
86 3
        $qs = query_string($request->getUri(), $this->queryStringParser);
87 3
        $subscribedTopics = \array_map('\\urldecode', $qs->getParam('topic') ?? []);
88
89 3
        if ([] === $subscribedTopics) {
90 1
            throw new BadRequestHttpException('Missing "topic" parameter.');
91
        }
92
93
        $request = $request
94 2
            ->withQueryParams($qs->getParams())
95 2
            ->withAttribute('token', $token)
96 2
            ->withAttribute('subscribedTopics', $subscribedTopics)
97 2
            ->withAttribute('lastEventId', $this->getLastEventID($request, $qs->getParams()))
98
        ;
99
100 2
        return  $request;
101
    }
102
103
    private function subscribe(Request $request, Stream $stream): PromiseInterface
104
    {
105
        $subscribedTopics = $request->getAttribute('subscribedTopics');
106
        $token = $request->getAttribute('token');
107
        $promises = [];
108
        foreach ($subscribedTopics as $topicSelector) {
109
            if (!TopicMatcher::canSubscribeToTopic($topicSelector, $token, $this->allowAnonymous)) {
110
                $clientId = $request->getAttribute('clientId');
111
                $this->logger()->debug("Client {$clientId} cannot subscribe to {$topicSelector}");
112
                continue;
113
            }
114
            $promises[] = $this->transport
115
                ->subscribe(
116
                    $topicSelector,
117
                    fn(string $topic, Message $message) => $this->sendIfAllowed($topic, $message, $request, $stream)
118
                )
119
                ->then(function (string $topic) use ($request) {
120
                    $clientId = $request->getAttribute('clientId');
121
                    $this->logger()->debug("Client {$clientId} subscribed to {$topic}");
122
                });
123
        }
124
125
        if ([] === $promises) {
126
            return resolve(true);
127
        }
128
129
        return all($promises);
130
    }
131
132
    private function fetchMissedMessages(?string $lastEventID, array $subscribedTopics): PromiseInterface
133
    {
134
        if (null === $lastEventID) {
135
            return resolve([]);
136
        }
137
138
        return $this->storage->retrieveMessagesAfterId($lastEventID, $subscribedTopics);
139
    }
140
141
    private function sendMissedMessages(iterable $messages, Request $request, Stream $stream): PromiseInterface
142
    {
143
        $promises = [];
144
        foreach ($messages as $topic => $message) {
145
            $promises[] = $this->sendIfAllowed($topic, $message, $request, $stream);
146
        }
147
148
        if ([] === $promises) {
149
            return resolve(true);
150
        }
151
152
        return all($promises);
153
    }
154
155
    private function sendIfAllowed(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface
156
    {
157
        $subscribedTopics = $request->getAttribute('subscribedTopics');
158
        $token = $request->getAttribute('token');
159
        if (!TopicMatcher::canReceiveUpdate($topic, $message, $subscribedTopics, $token, $this->allowAnonymous)) {
160
            return resolve(false);
161
        }
162
163
        return resolve($this->send($topic, $message, $request, $stream));
164
    }
165
166
    private function send(string $topic, Message $message, Request $request, Stream $stream): PromiseInterface
167
    {
168
        $stream->write((string) $message);
169
        $clientId = $request->getAttribute('clientId');
170
        $id = $message->getId();
171
        $this->logger()->debug("Dispatched message {$id} to client {$clientId} on topic {$topic}");
172
173
        return resolve(true);
174
    }
175
176 2
    private function getLastEventID(Request $request, array $queryParams): ?string
177
    {
178 2
        return nullify($request->getHeaderLine('Last-Event-ID'))
179 2
            ?? nullify($queryParams['Last-Event-ID'] ?? null)
180 2
            ?? nullify($queryParams['Last-Event-Id'] ?? null)
181 2
            ?? nullify($queryParams['last-event-id'] ?? null);
182
    }
183
}
184