GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 82f380...eb67f1 )
by Cees-Jan
05:10
created

AsyncClient::subscribeOnChannel()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 1
crap 1
1
<?php declare(strict_types=1);
2
3
namespace ApiClients\Client\Pusher;
4
5
use React\Dns\Resolver\Resolver;
6
use React\EventLoop\LoopInterface;
7
use RuntimeException;
8
use Rx\Disposable\CallbackDisposable;
9
use Rx\Observable;
10
use Rx\ObserverInterface;
11
use Rx\Scheduler;
12
use Rx\Websocket\Client as WebsocketClient;
13
use Rx\Websocket\MessageSubject;
14
use Rx\Websocket\WebsocketErrorException;
15
use Throwable;
16
17
final class AsyncClient
18
{
19
    const DEFAULT_DELAY = 200;
20
    const NO_ACTIVITY_TIMEOUT = 120;
21
    const NO_PING_RESPONSE_TIMEOUT = 30;
22
23
    protected $noActivityTimeout = self::NO_ACTIVITY_TIMEOUT;
24
25
    /**
26
     * @var Observable\RefCountObservable
27
     */
28
    protected $client;
29
30
    /**
31
     * @var Observable\AnonymousObservable
32
     */
33
    protected $messages;
34
35
    /**
36
     * @var MessageSubject
37
     */
38
    protected $sendSubject;
39
40
    /**
41
     * @var array
42
     */
43
    protected $channels = [];
44
45
    /**
46
     * @var int
47
     */
48
    protected $delay = self::DEFAULT_DELAY;
49
50
    /**
51
     * @internal
52
     */
53 5
    public function __construct(Observable $client)
54
    {
55 5
        $this->messages = $client
56
            // Save this subject for sending stuff
57
            ->do(function (MessageSubject $ms) {
58
                $this->sendSubject = $ms;
59
60
                // Resubscribe to an channels we where subscribed to when disconnected
61
                foreach ($this->channels as $channel => $_) {
62
                    $this->subscribeOnChannel($channel);
63
                }
64 5
            })
65
66
            // Make sure if there is a disconnect or something
67
            // that we unset the sendSubject
68
            ->finally(function () {
69 2
                $this->sendSubject = null;
70 5
            })
71
72
            ->flatMap(function (MessageSubject $ms) {
73
                return $ms;
74 5
            })
75
76
            // This is the ping/timeout functionality
77
            ->flatMapLatest(function ($x) {
78
                // this Observable emits the current value immediately
79
                // if another value comes along, this all gets disposed (because we are using flatMapLatest)
80
                // before the timeouts start get triggered
81
                return Observable::never()
82
                    ->timeout($this->noActivityTimeout * 1000)
83
                    ->catch(function () use ($x) {
84
                        // ping (do something that causes incoming stream to get a message)
85
                        $this->send(['event' => 'pusher:ping']);
86
                        // this timeout will actually timeout with a TimeoutException - causing
87
                        //   everything above this to dispose
88
                        return Observable::never()->timeout(self::NO_PING_RESPONSE_TIMEOUT * 1000);
89
                    })
90
                    ->startWith($x);
91 5
            })
92
93
            // Decode JSON
94 5
            ->_ApiClients_jsonDecode()
95
96
            // Deal with connection established messages
97
            ->flatMap(function (array $message) {
98
                $this->delay = self::DEFAULT_DELAY;
99
100
                $event = Event::createFromMessage($message);
101
102
                if ($event->getEvent() === 'pusher:error') {
103
                    $throwable = new PusherErrorException($event->getData()['message'], $event->getData()['code']);
104
105
                    return Observable::error($throwable);
106
                }
107
108
                // If this event represents the connection_established event set the timeout
109
                if ($event->getEvent() === 'pusher:connection_established') {
110
                    $this->setActivityTimeout($event);
111
                }
112
113
                return Observable::of($event);
114 5
            })
115
116
            // Handle connection level and Pusher procotol errors
117
            ->retryWhen(function (Observable $errors) {
118
                return $errors->flatMap(function (Throwable $throwable) {
119 2
                    return $this->handleLowLevelError($throwable);
120 2
                });
121 5
            })
122
123
        // Share client
124 5
        ->share();
125 5
    }
126
127
    /**
128
     * @param  LoopInterface $loop
129
     * @param  string        $app      Application ID
130
     * @param  Resolver      $resolver Optional DNS resolver
131
     * @return AsyncClient
132
     */
133 4
    public static function create(LoopInterface $loop, string $app, Resolver $resolver = null): AsyncClient
134
    {
135
        // Rather not do this, but have to until ReactPHP gets it's own global loop
136
        try {
137
            Scheduler::setAsyncFactory(function () use ($loop) {
138 1
                return new Scheduler\EventLoopScheduler($loop);
0 ignored issues
show
Documentation introduced by
$loop is of type object<React\EventLoop\LoopInterface>, but the function expects a callable.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
139 4
            });
140 3
        } catch (Throwable $t) {
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
141
        }
142
143
        try {
144
            Scheduler::setDefaultFactory(function () {
145
                return Scheduler::getImmediate();
146 4
            });
147
        } catch (Throwable $t) {
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
148
        }
149
150 4
        return new self(
151 4
            new WebsocketClient(
152 4
                ApiSettings::createUrl($app),
153 4
                false,
154 4
                [],
155
                $loop,
156
                $resolver
157
            )
158
        );
159
    }
160
161
    /**
162
     * Listen on a channel.
163
     *
164
     * @param  string     $channel Channel to listen on
165
     * @return Observable
166
     */
167 2
    public function channel(string $channel): Observable
168
    {
169
        // Only join a channel once
170 2
        if (isset($this->channels[$channel])) {
171
            return $this->channels[$channel];
172
        }
173
174
        // Ensure we only get messages for the given channel
175
        $channelMessages = $this->messages->filter(function (Event $event) use ($channel) {
176
            return $event->getChannel() !== '' && $event->getChannel() === $channel;
177 2
        });
178
179
        // Observable representing channel events
180
        $events = Observable::create(function (
181
            ObserverInterface $observer
182
        ) use (
183 2
            $channel,
184 2
            $channelMessages
185
        ) {
186
            // Subscribe to channel messages but filter out internal events
187
            $subscription = $channelMessages
188
                ->filter(function (Event $event) {
189
                    return $event->getEvent() !== 'pusher_internal:subscription_succeeded';
190 2
                })
191 2
                ->subscribe($observer);
0 ignored issues
show
Documentation introduced by
$observer is of type object<Rx\ObserverInterface>, but the function expects a callable|null.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
192
193 2
            $this->subscribeOnChannel($channel);
194
195 2
            return new CallbackDisposable(function () use ($channel, $subscription) {
196
                // Send unsubscribe event
197
                $this->send(['event' => 'pusher:unsubscribe', 'data' => ['channel' => $channel]]);
198
                // Dispose our own subscription to messages
199
                $subscription->dispose();
200
                // Remove our channel from the channel list so we don't resubscribe in case we reconnect
201
                unset($this->channels[$channel]);
202 2
            });
203 2
        });
204
205
        // Share stream amount subscribers to this channel
206 2
        $this->channels[$channel] = $events->share();
207
208 2
        return $this->channels[$channel];
209
    }
210
211
    /**
212
     * Send a message through the client.
213
     *
214
     * @param array $message Message to send, will be json encoded
215
     *
216
     * @return A bool indicating whether or not the connection was active
217
     *           and the given message has been pass onto the connection.
218
     */
219 2
    public function send(array $message): bool
220
    {
221
        // Don't send messages when we aren't connected
222 2
        if ($this->sendSubject ===  null) {
223 2
            return false;
224
        }
225
226
        $this->sendSubject->onNext(json_encode($message));
227
228
        return true;
229
    }
230
231
    /**
232
     *  Handle errors as described at https://pusher.com/docs/pusher_protocol#error-codes.
233
     */
234 2
    private function handleLowLevelError(Throwable $throwable)
235
    {
236
        // Only allow certain, relevant, exceptions
237 2
        if (!($throwable instanceof WebsocketErrorException) &&
238 2
            !($throwable instanceof RuntimeException) &&
239 2
            !($throwable instanceof PusherErrorException)
240
        ) {
241
            return Observable::error($throwable);
242
        }
243
244 2
        $code = $throwable->getCode();
245 2
        $pusherError = ($throwable instanceof WebsocketErrorException || $throwable instanceof PusherErrorException);
246
247
        // Errors 4000-4099, don't retry connecting
248 2
        if ($pusherError && $code >= 4000 && $code <= 4099) {
249
            return Observable::error($throwable);
250
        }
251
252
        // Errors 4100-4199 reconnect after 1 or more seconds, we do it after 1.001 second
253 2
        if ($pusherError && $code >= 4100 && $code <= 4199) {
254
            return Observable::timer(1001);
255
        }
256
257
        // Errors 4200-4299 connection closed by Pusher, reconnect immediately, we wait 0.001 second
258 2
        if ($pusherError && $code >= 4200 && $code <= 4299) {
259
            return Observable::timer(1);
260
        }
261
262
        // Double our delay each time we get here
263 2
        $this->delay *= 2;
264
265 2
        return Observable::timer($this->delay);
266
    }
267
268
    /**
269
     * @param string $channel
270
     */
271 2
    private function subscribeOnChannel(string $channel)
272
    {
273 2
        $this->send(['event' => 'pusher:subscribe', 'data' => ['channel' => $channel]]);
274 2
    }
275
276
    /**
277
     * Get connection activity timeout from connection established event.
278
     *
279
     * @param Event $event
280
     */
281
    private function setActivityTimeout(Event $event)
282
    {
283
        $data = $event->getData();
284
285
        // No activity_timeout found on event
286
        if (!isset($data['activity_timeout'])) {
287
            return;
288
        }
289
290
        // activity_timeout holds zero or invalid value (we don't want to hammer Pusher)
291
        if ((int)$data['activity_timeout'] <= 0) {
292
            return;
293
        }
294
295
        $this->noActivityTimeout = (int)$data['activity_timeout'];
296
    }
297
}
298