GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( d06bac...2213a8 )
by Cees-Jan
02:19
created

AsyncClient::channel()   B

Complexity

Conditions 3
Paths 2

Size

Total Lines 39
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 24
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 39
ccs 24
cts 24
cp 1
rs 8.8571
c 0
b 0
f 0
cc 3
eloc 20
nc 2
nop 1
crap 3
1
<?php declare(strict_types=1);
2
3
namespace ApiClients\Client\Pusher;
4
5
use React\Dns\Resolver\Resolver;
6
use React\EventLoop\LoopInterface;
7
use RuntimeException;
8
use Rx\Observable;
9
use Rx\Scheduler;
10
use Rx\Subject\Subject;
11
use Rx\Websocket\WebsocketErrorException;
12
use Throwable;
13
use function Rx\p;
14
15
final class AsyncClient
16
{
17
    const DEFAULT_DELAY = 200;
18
    const NO_ACTIVITY_TIMEOUT = 120;
19
    const NO_PING_RESPONSE_TIMEOUT = 30;
20
21
    /**
22
     * @var Observable
23
     */
24
    private $messages;
25
26
    /**
27
     * @var array
28
     */
29
    private $channels = [];
30
31
    /**
32
     * @var int
33
     */
34
    private $delay = self::DEFAULT_DELAY;
35
36
    /**
37
     * @var Subject
38
     */
39
    private $client;
40
41
    /**
42
     * @var Observable
43
     */
44
    private $connected;
45
46
    /**
47
     * @internal
48
     * @param  Subject                   $client
49
     * @throws \InvalidArgumentException
50
     */
51 19
    public function __construct(Subject $client)
52
    {
53 19
        $this->client = $client;
54
55
        /** @var Observable $events */
56
        $events = $client
57 19
            ->map(p('json_decode', true))
58 19
            ->map([Event::class, 'createFromMessage'])
59 19
            ->singleInstance();
60
61
        $pusherErrors = $events
62 19
            ->filter([Event::class, 'isError'])
63 19
            ->flatMap(function (Event $event) {
64
                $throwable = new PusherErrorException($event->getData()['message'], (int)$event->getData()['code']);
65
66
                return Observable::error($throwable);
67 19
            });
68
69 19
        $this->connected = $events
70 19
            ->filter([Event::class, 'connectionEstablished'])
71 19
            ->singleInstance();
72
73 19
        $this->messages = $events
74 19
            ->merge($this->timeout($events))
75 19
            ->merge($pusherErrors)
76 19
            ->singleInstance();
77 19
    }
78
79
    /**
80
     * @param  LoopInterface             $loop
81
     * @param  string                    $app      Application ID
82
     * @param  Resolver                  $resolver Optional DNS resolver
83
     * @throws \InvalidArgumentException
84
     * @return AsyncClient
85
     */
86
    public static function create(LoopInterface $loop, string $app, Resolver $resolver = null): AsyncClient
87
    {
88
        try {
89 4
            Scheduler::setDefaultFactory(function () use ($loop) {
90 3
                return new Scheduler\EventLoopScheduler($loop);
0 ignored issues
show
Documentation introduced by
$loop is of type object<React\EventLoop\LoopInterface>, but the function expects a callable.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
91 4
            });
92 2
        } catch (Throwable $t) {
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
93
        }
94
95 4
        return new self(
96 4
            WebSocket::createFactory(ApiSettings::createUrl($app), false, [], $loop, $resolver)
97
        );
98
    }
99
100
    /**
101
     * Listen on a channel.
102
     *
103
     * @param  string                    $channel Channel to listen on
104
     * @throws \InvalidArgumentException
105
     * @return Observable
106
     */
107 16
    public function channel(string $channel): Observable
108
    {
109
        // Only join a channel once
110 16
        if (isset($this->channels[$channel])) {
111 1
            return $this->channels[$channel];
112
        }
113
114
        // Ensure we only get messages for the given channel
115 16
        $channelMessages = $this->messages->filter(function (Event $event) use ($channel) {
116 11
            return $event->getChannel() !== '' && $event->getChannel() === $channel;
117 16
        });
118
119 16
        $subscribe = $this->connected
120 16
            ->do(function () use ($channel) {
121
                // Subscribe to pusher channel after connected
122 11
                $this->send(Event::subscribeOn($channel));
123 16
            })
124 16
            ->flatMapTo(Observable::empty());
125
126
        // Observable representing channel events
127 16
        $this->channels[$channel] = $channelMessages
128 16
            ->merge($subscribe)
129 16
            ->filter([Event::class, 'subscriptionSucceeded'])
130
            ->retryWhen(function (Observable $errors) {
131 16
                return $errors->flatMap(function (Throwable $throwable) {
132 8
                    return $this->handleLowLevelError($throwable);
133 16
                });
134 16
            })
135 16
            ->finally(function () use ($channel) {
136
                // Send unsubscribe event
137 14
                $this->send(Event::unsubscribeOn($channel));
138
139
                // Remove our channel from the channel list so we don't resubscribe in case we reconnect
140 14
                unset($this->channels[$channel]);
141 16
            })
142 16
            ->singleInstance();
143
144 16
        return $this->channels[$channel];
145
    }
146
147
    /**
148
     * Send a message through the client.
149
     *
150
     * @param array $message Message to send, will be json encoded
151
     *
152
     */
153 14
    public function send(array $message)
154
    {
155 14
        $this->client->onNext(json_encode($message));
156 14
    }
157
158
    /**
159
     * Returns an observable of TimeoutException.
160
     * The timeout observable will get cancelled every time a new event is received.
161
     *
162
     * @param  Observable $events
163
     * @return Observable
164
     */
165
    private function timeout(Observable $events): Observable
166
    {
167 19
        $timeoutDuration = $this->connected->map(function (Event $event) {
168 11
            return ($event->getData()['activity_timeout'] ?? self::NO_ACTIVITY_TIMEOUT) * 1000;
169 19
        });
170
171
        return $timeoutDuration
172 19
            ->combineLatest([$events])
173 19
            ->pluck(0)
174 19
            ->concat(Observable::of(-1))
175 19
            ->flatMapLatest(function (int $time) {
176
177
                // If the events observable ends, return an empty observable so we don't keep the stream alive
178 12
                if ($time === -1) {
179 5
                    return Observable::empty();
180
                }
181
182 11
                return Observable::never()
183 11
                    ->timeout($time)
184 11
                    ->catch(function () use ($time) {
185
                        // ping (do something that causes incoming stream to get a message)
186 2
                        $this->send(Event::ping());
187
                        // this timeout will actually timeout with a TimeoutException - causing
188
                        // everything above this to dispose
189 2
                        return Observable::never()->timeout($time);
190 11
                    });
191 19
            });
192
    }
193
194
    /**
195
     * Handle errors as described at https://pusher.com/docs/pusher_protocol#error-codes.
196
     * @param  Throwable  $throwable
197
     * @return Observable
198
     */
199 8
    private function handleLowLevelError(Throwable $throwable): Observable
200
    {
201
        // Only allow certain, relevant, exceptions
202 8
        if (!($throwable instanceof WebsocketErrorException) &&
203 8
            !($throwable instanceof RuntimeException) &&
204 8
            !($throwable instanceof PusherErrorException)
205
        ) {
206 3
            return Observable::error($throwable);
207
        }
208
209 5
        $code = $throwable->getCode();
210 5
        $pusherError = ($throwable instanceof WebsocketErrorException || $throwable instanceof PusherErrorException);
211
212
        // Errors 4000-4099, don't retry connecting
213 5
        if ($pusherError && $code >= 4000 && $code <= 4099) {
214 1
            return Observable::error($throwable);
215
        }
216
217
        // Errors 4100-4199 reconnect after 1 or more seconds, we do it after 1.001 second
218 4
        if ($pusherError && $code >= 4100 && $code <= 4199) {
219 1
            return Observable::timer(1001);
220
        }
221
222
        // Errors 4200-4299 connection closed by Pusher, reconnect immediately, we wait 0.001 second
223 3
        if ($pusherError && $code >= 4200 && $code <= 4299) {
224 1
            return Observable::timer(1);
225
        }
226
227
        // Double our delay each time we get here
228 2
        $this->delay *= 2;
229
230 2
        return Observable::timer($this->delay);
231
    }
232
}
233