GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#3)
by Cees-Jan
01:58
created

AsyncClient::handleLowLevelError()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 6
ccs 0
cts 3
cp 0
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 3
nc 1
nop 1
crap 2
1
<?php declare(strict_types=1);
2
3
namespace ApiClients\Client\Pusher;
4
5
use React\Dns\Resolver\Resolver;
6
use React\EventLoop\LoopInterface;
7
use Rx\Disposable\CallbackDisposable;
8
use Rx\Observable;
9
use Rx\ObserverInterface;
10
use Rx\Scheduler;
11
use Rx\Websocket\Client as WebsocketClient;
12
use Rx\Websocket\MessageSubject;
13
use Throwable;
14
15
final class AsyncClient
16
{
17
    const DEFAULT_DELAY = 200;
18
    const NO_ACTIVITY_TIMEOUT = 120;
19
    const NO_PING_RESPONSE_TIMEOUT = 30;
20
21
    protected $noActivityTimeout = self::NO_ACTIVITY_TIMEOUT;
22
23
    /**
24
     * @var Observable\RefCountObservable
25
     */
26
    protected $client;
27
28
    /**
29
     * @var Observable\AnonymousObservable
30
     */
31
    protected $messages;
32
33
    /**
34
     * @var MessageSubject
35
     */
36
    protected $sendSubject;
37
38
    /**
39
     * @var array
40
     */
41
    protected $channels = [];
42
43
    /**
44
     * @var int
45
     */
46
    protected $delay = self::DEFAULT_DELAY;
47
48
    /**
49
     * @internal
50
     */
51 3
    public function __construct(Observable $client)
52
    {
53 3
        $this->messages = $client
54
            // Save this subject for sending stuff
55
            ->do(function (MessageSubject $ms) {
56
                $this->sendSubject = $ms;
57
58
                // Resubscribe to an channels we where subscribed to when disconnected
59
                foreach ($this->channels as $channel => $_) {
60
                    $this->subscribeOnChannel($channel);
61
                }
62 3
            })
63
64
            // Make sure if there is a disconnect or something
65
            // that we unset the sendSubject
66
            ->finally(function () {
67
                $this->sendSubject = null;
68 3
            })
69
70
            ->flatMap(function (MessageSubject $ms) {
71
                return $ms;
72 3
            })
73
74
            // This is the ping/timeout functionality
75
            ->flatMapLatest(function ($x) {
76
                // this Observable emits the current value immediately
77
                // if another value comes along, this all gets disposed (because we are using flatMapLatest)
78
                // before the timeouts start get triggered
79
                return Observable::never()
80
                    ->timeout($this->noActivityTimeout * 1000)
81
                    ->catch(function () use ($x) {
82
                        // ping (do something that causes incoming stream to get a message)
83
                        $this->send(['event' => 'pusher:ping']);
84
                        // this timeout will actually timeout with a TimeoutException - causing
85
                        //   everything above this to dispose
86
                        return Observable::never()->timeout(self::NO_PING_RESPONSE_TIMEOUT * 1000);
87
                    })
88
                    ->startWith($x);
89 3
            })
90
91
            // Handle connection level errors
92
            ->retryWhen(function (Observable $errors) {
93
                return $errors->flatMap(function (Throwable $throwable) {
94
                    return $this->handleLowLevelError($throwable);
95
                });
96 3
            })
97
98
            // Decode JSON
99 3
            ->_ApiClients_jsonDecode()
100
101
            // Deal with connection established messages
102
            ->map(function (array $message) {
103
                $this->delay = self::DEFAULT_DELAY;
104
105
                $event = Event::createFromMessage($message);
106
107
                if ($event->getEvent() === 'pusher:connection_established') {
108
                    $this->setActivityTimeout($event);
109
                }
110
111
                return $event;
112 3
            })
113 3
            ->share();
114 3
    }
115
116
    /**
117
     * @param  LoopInterface $loop
118
     * @param  string        $app      Application ID
119
     * @param  Resolver      $resolver Optional DNS resolver
120
     * @return AsyncClient
121
     */
122 3
    public static function create(LoopInterface $loop, string $app, Resolver $resolver = null): AsyncClient
123
    {
124
        try {
125
            Scheduler::setAsyncFactory(function () use ($loop) {
126
                return new Scheduler\EventLoopScheduler($loop);
0 ignored issues
show
Documentation introduced by
$loop is of type object<React\EventLoop\LoopInterface>, but the function expects a callable.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
127 3
            });
128
        } catch (Throwable $t) {
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
129
        }
130
131 3
        return new self(
132 3
            new WebsocketClient(
133 3
                ApiSettings::createUrl($app),
134 3
                false,
135 3
                [],
136
                $loop,
137 3
                $resolver
138
            )
139
        );
140
    }
141
142
    /**
143
     * Listen on a channel.
144
     *
145
     * @param  string     $channel Channel to listen on
146
     * @return Observable
147
     */
148
    public function channel(string $channel): Observable
149
    {
150
        if (isset($this->channels[$channel])) {
151
            return $this->channels[$channel];
152
        }
153
154
        // Ensure we only get messages for the given channel
155
        $channelMessages = $this->messages->filter(function (Event $event) use ($channel) {
156
            return $event->getChannel() !== '' && $event->getChannel() === $channel;
157
        });
158
159
        $events = Observable::create(function (
160
            ObserverInterface $observer
161
        ) use (
162
            $channel,
163
            $channelMessages
164
        ) {
165
            $subscription = $channelMessages
166
                ->filter(function (Event $event) {
167
                    return $event->getEvent() !== 'pusher_internal:subscription_succeeded';
168
                })
169
                ->subscribe($observer);
0 ignored issues
show
Documentation introduced by
$observer is of type object<Rx\ObserverInterface>, but the function expects a callable|null.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
170
171
            $this->subscribeOnChannel($channel);
172
173
            return new CallbackDisposable(function () use ($channel, $subscription) {
174
                // Send unsubscribe event
175
                $this->send(['event' => 'pusher:unsubscribe', 'data' => ['channel' => $channel]]);
176
                // Dispose our own subscription to messages
177
                $subscription->dispose();
178
                // Remove our channel from the channel list so we don't resubscribe in case we reconnect
179
                unset($this->channels[$channel]);
180
            });
181
        });
182
183
        // Share stream amount subscribers to this channel
184
        $this->channels[$channel] = $events->share();
185
186
        return $this->channels[$channel];
187
    }
188
189
    /**
190
     * Send a message through the client.
191
     *
192
     * @param array $message Message to send, will be json encoded
193
     *
194
     * @return A bool indicating whether or not the connection was active
195
     *           and the given message has been pass onto the connection.
196
     */
197
    public function send(array $message): bool
198
    {
199
        // Don't send messages when we aren't connected
200
        if ($this->sendSubject ===  null) {
201
            return false;
202
        }
203
204
        $this->sendSubject->onNext(json_encode($message));
205
206
        return true;
207
    }
208
209
    private function handleLowLevelError(Throwable $throwable)
0 ignored issues
show
Unused Code introduced by
The parameter $throwable is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
210
    {
211
        $this->delay *= 2;
212
213
        return Observable::timer($this->delay);
214
    }
215
216
    /**
217
     * @param string $channel
218
     */
219
    private function subscribeOnChannel(string $channel)
220
    {
221
        $this->send(['event' => 'pusher:subscribe', 'data' => ['channel' => $channel]]);
222
    }
223
224
    /**
225
     * Get connection activity timeout from connection established event.
226
     *
227
     * @param Event $event
228
     */
229
    private function setActivityTimeout(Event $event)
230
    {
231
        $data = $event->getData();
232
233
        // No activity_timeout found on event
234
        if (!isset($data['activity_timeout'])) {
235
            return;
236
        }
237
238
        // activity_timeout holds zero or invalid value (we don't want to hammer Pusher)
239
        if ((int)$data['activity_timeout'] <= 0) {
240
            return;
241
        }
242
243
        $this->noActivityTimeout = (int)$data['activity_timeout'];
244
    }
245
}
246