GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( eb67f1...bf97e9 )
by Cees-Jan
11s
created

AsyncClient::subscribeOnChannel()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 1
crap 1
1
<?php declare(strict_types=1);
2
3
namespace ApiClients\Client\Pusher;
4
5
use React\Dns\Resolver\Resolver;
6
use React\EventLoop\LoopInterface;
7
use RuntimeException;
8
use Rx\Observable;
9
use Rx\Scheduler;
10
use Rx\Subject\Subject;
11
use Rx\Websocket\WebsocketErrorException;
12
use Throwable;
13
14
final class AsyncClient
15
{
16
    const DEFAULT_DELAY = 200;
17
    const NO_ACTIVITY_TIMEOUT = 120;
18
    const NO_PING_RESPONSE_TIMEOUT = 30;
19
20
    /**
21
     * @var Observable
22
     */
23
    private $messages;
24
25
    /**
26
     * @var array
27
     */
28
    private $channels = [];
29
30
    /**
31
     * @var int
32
     */
33
    private $delay = self::DEFAULT_DELAY;
34
35
    /**
36
     * @var Subject
37
     */
38
    private $client;
39
40
    /**
41
     * @var Observable
42
     */
43
    private $connected;
44
45
    /**
46
     * @internal
47
     * @param  Subject                   $client
48
     * @throws \InvalidArgumentException
49
     */
50 19
    public function __construct(Subject $client)
51
    {
52 19
        $this->client = $client;
53
54
        /** @var Observable $events */
55
        $events = $client
56 19
            ->_ApiClients_jsonDecode()
57 19
            ->map([Event::class, 'createFromMessage'])
58 19
            ->singleInstance();
59
60
        $pusherErrors = $events
61 19
            ->filter([Event::class, 'isError'])
62 19
            ->flatMap(function (Event $event) {
63
                $throwable = new PusherErrorException($event->getData()['message'], (int)$event->getData()['code']);
64
65
                return Observable::error($throwable);
66 19
            });
67
68 19
        $this->connected = $events
69 19
            ->filter([Event::class, 'connectionEstablished'])
70 19
            ->singleInstance();
71
72 19
        $this->messages = $events
73 19
            ->merge($this->timeout($events))
74 19
            ->merge($pusherErrors)
75 19
            ->singleInstance();
76 19
    }
77
78
    /**
79
     * @param  LoopInterface             $loop
80
     * @param  string                    $app      Application ID
81
     * @param  Resolver                  $resolver Optional DNS resolver
82
     * @throws \InvalidArgumentException
83
     * @return AsyncClient
84
     */
85
    public static function create(LoopInterface $loop, string $app, Resolver $resolver = null): AsyncClient
86
    {
87
        try {
88 4
            Scheduler::setDefaultFactory(function () use ($loop) {
89 3
                return new Scheduler\EventLoopScheduler($loop);
0 ignored issues
show
Documentation introduced by
$loop is of type object<React\EventLoop\LoopInterface>, but the function expects a callable.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
90 4
            });
91 2
        } catch (Throwable $t) {
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
92
        }
93
94 4
        return new self(
95 4
            new WebSocket(ApiSettings::createUrl($app), false, [], $loop, $resolver)
96
        );
97
    }
98
99
    /**
100
     * Listen on a channel.
101
     *
102
     * @param  string                    $channel Channel to listen on
103
     * @throws \InvalidArgumentException
104
     * @return Observable
105
     */
106 16
    public function channel(string $channel): Observable
107
    {
108
        // Only join a channel once
109 16
        if (isset($this->channels[$channel])) {
110 1
            return $this->channels[$channel];
111
        }
112
113
        // Ensure we only get messages for the given channel
114 16
        $channelMessages = $this->messages->filter(function (Event $event) use ($channel) {
115 11
            return $event->getChannel() !== '' && $event->getChannel() === $channel;
116 16
        });
117
118 16
        $subscribe = $this->connected
119 16
            ->do(function () use ($channel) {
120
                // Subscribe to pusher channel after connected
121 11
                $this->send(Event::subscribeOn($channel));
122 16
            })
123 16
            ->flatMapTo(Observable::empty());
124
125
        // Observable representing channel events
126 16
        $this->channels[$channel] = $channelMessages
127 16
            ->merge($subscribe)
128 16
            ->filter([Event::class, 'subscriptionSucceeded'])
129
            ->retryWhen(function (Observable $errors) {
130 16
                return $errors->flatMap(function (Throwable $throwable) {
131 8
                    return $this->handleLowLevelError($throwable);
132 16
                });
133 16
            })
134 16
            ->finally(function () use ($channel) {
135
                // Send unsubscribe event
136 14
                $this->send(Event::unsubscribeOn($channel));
137
138
                // Remove our channel from the channel list so we don't resubscribe in case we reconnect
139 14
                unset($this->channels[$channel]);
140 16
            })
141 16
            ->singleInstance();
142
143 16
        return $this->channels[$channel];
144
    }
145
146
    /**
147
     * Send a message through the client.
148
     *
149
     * @param array $message Message to send, will be json encoded
150
     *
151
     */
152 14
    public function send(array $message)
153
    {
154 14
        $this->client->onNext(json_encode($message));
155 14
    }
156
157
    /**
158
     * Returns an observable of TimeoutException.
159
     * The timeout observable will get cancelled every time a new event is received.
160
     *
161
     * @param  Observable $events
162
     * @return Observable
163
     */
164
    private function timeout(Observable $events): Observable
165
    {
166 19
        $timeoutDuration = $this->connected->map(function (Event $event) {
167 11
            return ($event->getData()['activity_timeout'] ?? self::NO_ACTIVITY_TIMEOUT) * 1000;
168 19
        });
169
170
        return $timeoutDuration
171 19
            ->combineLatest([$events])
172 19
            ->pluck(0)
173 19
            ->concat(Observable::of(-1))
174 19
            ->flatMapLatest(function (int $time) {
175
176
                // If the events observable ends, return an empty observable so we don't keep the stream alive
177 12
                if ($time === -1) {
178 5
                    return Observable::empty();
179
                }
180
181 11
                return Observable::never()
182 11
                    ->timeout($time)
183 11
                    ->catch(function () use ($time) {
184
                        // ping (do something that causes incoming stream to get a message)
185 2
                        $this->send(Event::ping());
186
                        // this timeout will actually timeout with a TimeoutException - causing
187
                        // everything above this to dispose
188 2
                        return Observable::never()->timeout($time);
189 11
                    });
190 19
            });
191
    }
192
193
    /**
194
     * Handle errors as described at https://pusher.com/docs/pusher_protocol#error-codes.
195
     * @param  Throwable  $throwable
196
     * @return Observable
197
     */
198 8
    private function handleLowLevelError(Throwable $throwable): Observable
199
    {
200
        // Only allow certain, relevant, exceptions
201 8
        if (!($throwable instanceof WebsocketErrorException) &&
202 8
            !($throwable instanceof RuntimeException) &&
203 8
            !($throwable instanceof PusherErrorException)
204
        ) {
205 3
            return Observable::error($throwable);
206
        }
207
208 5
        $code = $throwable->getCode();
209 5
        $pusherError = ($throwable instanceof WebsocketErrorException || $throwable instanceof PusherErrorException);
210
211
        // Errors 4000-4099, don't retry connecting
212 5
        if ($pusherError && $code >= 4000 && $code <= 4099) {
213 1
            return Observable::error($throwable);
214
        }
215
216
        // Errors 4100-4199 reconnect after 1 or more seconds, we do it after 1.001 second
217 4
        if ($pusherError && $code >= 4100 && $code <= 4199) {
218 1
            return Observable::timer(1001);
219
        }
220
221
        // Errors 4200-4299 connection closed by Pusher, reconnect immediately, we wait 0.001 second
222 3
        if ($pusherError && $code >= 4200 && $code <= 4299) {
223 1
            return Observable::timer(1);
224
        }
225
226
        // Double our delay each time we get here
227 2
        $this->delay *= 2;
228
229 2
        return Observable::timer($this->delay);
230
    }
231
}
232