GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( f8794f...05924e )
by Cees-Jan
09:12
created

AsyncClient::send()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 11
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 2.2559

Importance

Changes 0
Metric Value
dl 0
loc 11
ccs 3
cts 5
cp 0.6
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 5
nc 2
nop 1
crap 2.2559
1
<?php declare(strict_types=1);
2
3
namespace ApiClients\Client\Pusher;
4
5
use React\Dns\Resolver\Resolver;
6
use React\EventLoop\LoopInterface;
7
use RuntimeException;
8
use Rx\Disposable\CallbackDisposable;
9
use Rx\Observable;
10
use Rx\ObserverInterface;
11
use Rx\Scheduler;
12
use Rx\Websocket\Client as WebsocketClient;
13
use Rx\Websocket\MessageSubject;
14
use Rx\Websocket\WebsocketErrorException;
15
use Throwable;
16
use function React\Promise\reject;
17
use function React\Promise\resolve;
18
19
final class AsyncClient
20
{
21
    const DEFAULT_DELAY = 200;
22
    const NO_ACTIVITY_TIMEOUT = 120;
23
    const NO_PING_RESPONSE_TIMEOUT = 30;
24
25
    protected $noActivityTimeout = self::NO_ACTIVITY_TIMEOUT;
26
27
    /**
28
     * @var Observable\RefCountObservable
29
     */
30
    protected $client;
31
32
    /**
33
     * @var Observable\AnonymousObservable
34
     */
35
    protected $messages;
36
37
    /**
38
     * @var MessageSubject
39
     */
40
    protected $sendSubject;
41
42
    /**
43
     * @var array
44
     */
45
    protected $channels = [];
46
47
    /**
48
     * @var int
49
     */
50
    protected $delay = self::DEFAULT_DELAY;
51
52
    /**
53
     * @internal
54
     */
55 5
    public function __construct(Observable $client)
56
    {
57 5
        $this->messages = $client
58
            // Save this subject for sending stuff
59
            ->do(function (MessageSubject $ms) {
60
                $this->sendSubject = $ms;
61
62
                // Resubscribe to an channels we where subscribed to when disconnected
63
                foreach ($this->channels as $channel => $_) {
64
                    $this->subscribeOnChannel($channel);
65
                }
66 5
            })
67
68
            // Make sure if there is a disconnect or something
69
            // that we unset the sendSubject
70
            ->finally(function () {
71 2
                $this->sendSubject = null;
72 5
            })
73
74
            ->flatMap(function (MessageSubject $ms) {
75
                return $ms;
76 5
            })
77
78
            // This is the ping/timeout functionality
79
            ->flatMapLatest(function ($x) {
80
                // this Observable emits the current value immediately
81
                // if another value comes along, this all gets disposed (because we are using flatMapLatest)
82
                // before the timeouts start get triggered
83
                return Observable::never()
84
                    ->timeout($this->noActivityTimeout * 1000)
85
                    ->catch(function () use ($x) {
86
                        // ping (do something that causes incoming stream to get a message)
87
                        $this->send(['event' => 'pusher:ping']);
88
                        // this timeout will actually timeout with a TimeoutException - causing
89
                        //   everything above this to dispose
90
                        return Observable::never()->timeout(self::NO_PING_RESPONSE_TIMEOUT * 1000);
91
                    })
92
                    ->startWith($x);
93 5
            })
94
95
            // Decode JSON
96 5
            ->_ApiClients_jsonDecode()
97
98
            // Deal with connection established messages
99
            ->flatMap(function (array $message) {
100
                $this->delay = self::DEFAULT_DELAY;
101
102
                $event = Event::createFromMessage($message);
103
104
                if ($event->getEvent() === 'pusher:error') {
105
                    return Observable::fromPromise(reject(
106
                        new PusherErrorException($event->getData()['message'], $event->getData()['code'])
107
                    ));
108
                }
109
110
                // If this event represents the connection_established event set the timeout
111
                if ($event->getEvent() === 'pusher:connection_established') {
112
                    $this->setActivityTimeout($event);
113
                }
114
115
                return Observable::fromPromise(resolve($event));
116 5
            })
117
118
            // Handle connection level and Pusher procotol errors
119
            ->retryWhen(function (Observable $errors) {
120
                return $errors->flatMap(function (Throwable $throwable) {
121 2
                    return $this->handleLowLevelError($throwable);
122 2
                });
123 5
            })
124
125
        // Share client
126 5
        ->share();
127 5
    }
128
129
    /**
130
     * @param  LoopInterface $loop
131
     * @param  string        $app      Application ID
132
     * @param  Resolver      $resolver Optional DNS resolver
133
     * @return AsyncClient
134
     */
135 4
    public static function create(LoopInterface $loop, string $app, Resolver $resolver = null): AsyncClient
136
    {
137
        // Rather not do this, but have to until ReactPHP gets it's own global loop
138
        try {
139
            Scheduler::setAsyncFactory(function () use ($loop) {
140 1
                return new Scheduler\EventLoopScheduler($loop);
0 ignored issues
show
Documentation introduced by
$loop is of type object<React\EventLoop\LoopInterface>, but the function expects a callable.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
141 4
            });
142 3
        } catch (Throwable $t) {
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
143
        }
144
145
        try {
146
            Scheduler::setDefaultFactory(function () {
147
                return Scheduler::getImmediate();
148 4
            });
149
        } catch (Throwable $t) {
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
150
        }
151
152 4
        return new self(
153 4
            new WebsocketClient(
154 4
                ApiSettings::createUrl($app),
155 4
                false,
156 4
                [],
157 4
                $loop,
158 4
                $resolver
159
            )
160
        );
161
    }
162
163
    /**
164
     * Listen on a channel.
165
     *
166
     * @param  string     $channel Channel to listen on
167
     * @return Observable
168
     */
169 2
    public function channel(string $channel): Observable
170
    {
171
        // Only join a channel once
172 2
        if (isset($this->channels[$channel])) {
173
            return $this->channels[$channel];
174
        }
175
176
        // Ensure we only get messages for the given channel
177
        $channelMessages = $this->messages->filter(function (Event $event) use ($channel) {
178
            return $event->getChannel() !== '' && $event->getChannel() === $channel;
179 2
        });
180
181
        // Observable representing channel events
182
        $events = Observable::create(function (
183
            ObserverInterface $observer
184
        ) use (
185 2
            $channel,
186 2
            $channelMessages
187
        ) {
188
            // Subscribe to channel messages but filter out internal events
189
            $subscription = $channelMessages
190
                ->filter(function (Event $event) {
191
                    return $event->getEvent() !== 'pusher_internal:subscription_succeeded';
192 2
                })
193 2
                ->subscribe($observer);
0 ignored issues
show
Documentation introduced by
$observer is of type object<Rx\ObserverInterface>, but the function expects a callable|null.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
194
195 2
            $this->subscribeOnChannel($channel);
196
197 2
            return new CallbackDisposable(function () use ($channel, $subscription) {
198
                // Send unsubscribe event
199
                $this->send(['event' => 'pusher:unsubscribe', 'data' => ['channel' => $channel]]);
200
                // Dispose our own subscription to messages
201
                $subscription->dispose();
202
                // Remove our channel from the channel list so we don't resubscribe in case we reconnect
203
                unset($this->channels[$channel]);
204 2
            });
205 2
        });
206
207
        // Share stream amount subscribers to this channel
208 2
        $this->channels[$channel] = $events->share();
209
210 2
        return $this->channels[$channel];
211
    }
212
213
    /**
214
     * Send a message through the client.
215
     *
216
     * @param array $message Message to send, will be json encoded
217
     *
218
     * @return A bool indicating whether or not the connection was active
219
     *           and the given message has been pass onto the connection.
220
     */
221 2
    public function send(array $message): bool
222
    {
223
        // Don't send messages when we aren't connected
224 2
        if ($this->sendSubject ===  null) {
225 2
            return false;
226
        }
227
228
        $this->sendSubject->onNext(json_encode($message));
229
230
        return true;
231
    }
232
233
    /**
234
     *  Handle errors as described at https://pusher.com/docs/pusher_protocol#error-codes.
235
     */
236 2
    private function handleLowLevelError(Throwable $throwable)
237
    {
238
        // Only allow certain, relevant, exceptions
239 2
        if (!($throwable instanceof WebsocketErrorException) &&
240 2
            !($throwable instanceof RuntimeException) &&
241 2
            !($throwable instanceof PusherErrorException)
242
        ) {
243
            return Observable::fromPromise(reject($throwable));
244
        }
245
246 2
        $code = $throwable->getCode();
247 2
        $pusherError = ($throwable instanceof WebsocketErrorException || $throwable instanceof PusherErrorException);
248
249
        // Errors 4000-4099, don't retry connecting
250 2
        if ($pusherError && $code >= 4000 && $code <= 4099) {
251
            return Observable::fromPromise(reject($throwable));
252
        }
253
254
        // Errors 4100-4199 reconnect after 1 or more seconds, we do it after 1.001 second
255 2
        if ($pusherError && $code >= 4100 && $code <= 4199) {
256
            return Observable::timer(1001);
257
        }
258
259
        // Errors 4200-4299 connection closed by Pusher, reconnect immediately, we wait 0.001 second
260 2
        if ($pusherError && $code >= 4200 && $code <= 4299) {
261
            return Observable::timer(1);
262
        }
263
264
        // Double our delay each time we get here
265 2
        $this->delay *= 2;
266
267 2
        return Observable::timer($this->delay);
268
    }
269
270
    /**
271
     * @param string $channel
272
     */
273 2
    private function subscribeOnChannel(string $channel)
274
    {
275 2
        $this->send(['event' => 'pusher:subscribe', 'data' => ['channel' => $channel]]);
276 2
    }
277
278
    /**
279
     * Get connection activity timeout from connection established event.
280
     *
281
     * @param Event $event
282
     */
283
    private function setActivityTimeout(Event $event)
284
    {
285
        $data = $event->getData();
286
287
        // No activity_timeout found on event
288
        if (!isset($data['activity_timeout'])) {
289
            return;
290
        }
291
292
        // activity_timeout holds zero or invalid value (we don't want to hammer Pusher)
293
        if ((int)$data['activity_timeout'] <= 0) {
294
            return;
295
        }
296
297
        $this->noActivityTimeout = (int)$data['activity_timeout'];
298
    }
299
}
300