GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#3)
by Cees-Jan
02:00
created

AsyncClient::subscribeOnChannel()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 3
cp 0
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 1
crap 2
1
<?php declare(strict_types=1);
2
3
namespace ApiClients\Client\Pusher;
4
5
use React\Dns\Resolver\Resolver;
6
use React\EventLoop\LoopInterface;
7
use Rx\Disposable\CallbackDisposable;
8
use Rx\Observable;
9
use Rx\ObserverInterface;
10
use Rx\Scheduler;
11
use Rx\Websocket\Client as WebsocketClient;
12
use Rx\Websocket\MessageSubject;
13
use Throwable;
14
15
final class AsyncClient
16
{
17
    const NO_ACTIVITY_TIMEOUT = 120;
18
    const NO_PING_RESPONSE_TIMEOUT = 30;
19
20
    protected $noActivityTimeout = self::NO_ACTIVITY_TIMEOUT;
21
22
    /**
23
     * @var Observable\RefCountObservable
24
     */
25
    protected $client;
26
27
    /**
28
     * @var Observable\AnonymousObservable
29
     */
30
    protected $messages;
31
32
    /**
33
     * @var MessageSubject
34
     */
35
    protected $sendSubject;
36
37
    /**
38
     * @var array
39
     */
40
    protected $channels = [];
41
42
    /**
43
     * @var int
44
     */
45
    protected $delay = 200;
46
47
    /**
48
     * @internal
49
     */
50 3
    public function __construct(Observable $client)
51
    {
52 3
        $this->messages = $client
53
            // Save this subject for sending stuff
54
            ->do(function (MessageSubject $ms) {
55
                $this->sendSubject = $ms;
56
57
                // Resubscribe to an channels we where subscribed to when disconnected
58
                foreach ($this->channels as $channel => $_) {
59
                    $this->subscribeOnChannel($channel);
60
                }
61 3
            })
62
63
            // Make sure if there is a disconnect or something
64
            // that we unset the sendSubject
65
            ->finally(function () {
66
                $this->sendSubject = null;
67 3
            })
68
69
            ->flatMap(function (MessageSubject $ms) {
70
                return $ms;
71 3
            })
72
73
            // This is the ping/timeout functionality
74
            ->flatMapLatest(function ($x) {
75
                // this Observable emits the current value immediately
76
                // if another value comes along, this all gets disposed (because we are using flatMapLatest)
77
                // before the timeouts start get triggered
78
                return Observable::never()
79
                    ->timeout($this->noActivityTimeout * 1000)
80
                    ->catch(function () use ($x) {
81
                        // ping (do something that causes incoming stream to get a message)
82
                        $this->send(['event' => 'pusher:ping']);
83
                        // this timeout will actually timeout with a TimeoutException - causing
84
                        //   everything above this to dispose
85
                        return Observable::never()->timeout(self::NO_PING_RESPONSE_TIMEOUT * 1000);
86
                    })
87
                    ->startWith($x);
88 3
            })
89
90
            // Handle connection level errors
91
            ->retryWhen(function (Observable $errors) {
92
                return $errors->flatMap(function (Throwable $throwable) {
93
                    return $this->handleLowLevelError($throwable);
94
                });
95 3
            })
96
97
            // Decode JSON
98 3
            ->_ApiClients_jsonDecode()
99
100
            // Deal with connection established messages
101
            ->map(function (array $message) {
102
                $event = Event::createFromMessage($message);
103
104
                if ($event->getEvent() === 'pusher:connection_established') {
105
                    $this->setActivityTimeout($event);
106
                }
107
108
                return $event;
109 3
            })
110 3
            ->share();
111 3
    }
112
113
    /**
114
     * @param  LoopInterface $loop
115
     * @param  string        $app      Application ID
116
     * @param  Resolver      $resolver Optional DNS resolver
117
     * @return AsyncClient
118
     */
119 3
    public static function create(LoopInterface $loop, string $app, Resolver $resolver = null): AsyncClient
120
    {
121
        try {
122
            Scheduler::setAsyncFactory(function () use ($loop) {
123
                return new Scheduler\EventLoopScheduler($loop);
0 ignored issues
show
Documentation introduced by
$loop is of type object<React\EventLoop\LoopInterface>, but the function expects a callable.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
124 3
            });
125
        } catch (Throwable $t) {
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
126
        }
127
128 3
        return new self(
129 3
            new WebsocketClient(
130 3
                ApiSettings::createUrl($app),
131 3
                false,
132 3
                [],
133
                $loop,
134 3
                $resolver
135
            )
136
        );
137
    }
138
139
    /**
140
     * Listen on a channel.
141
     *
142
     * @param  string     $channel Channel to listen on
143
     * @return Observable
144
     */
145
    public function channel(string $channel): Observable
146
    {
147
        if (isset($this->channels[$channel])) {
148
            return $this->channels[$channel];
149
        }
150
151
        // Ensure we only get messages for the given channel
152
        $channelMessages = $this->messages->filter(function (Event $event) use ($channel) {
153
            return $event->getChannel() !== '' && $event->getChannel() === $channel;
154
        });
155
156
        $events = Observable::create(function (
157
            ObserverInterface $observer
158
        ) use (
159
            $channel,
160
            $channelMessages
161
        ) {
162
            $subscription = $channelMessages
163
                ->filter(function (Event $event) {
164
                    return $event->getEvent() !== 'pusher_internal:subscription_succeeded';
165
                })
166
                ->subscribe($observer);
0 ignored issues
show
Documentation introduced by
$observer is of type object<Rx\ObserverInterface>, but the function expects a callable|null.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
167
168
            $this->subscribeOnChannel($channel);
169
170
            return new CallbackDisposable(function () use ($channel, $subscription) {
171
                // Send unsubscribe event
172
                $this->send(['event' => 'pusher:unsubscribe', 'data' => ['channel' => $channel]]);
173
                // Dispose our own subscription to messages
174
                $subscription->dispose();
175
                // Remove our channel from the channel list so we don't resubscribe in case we reconnect
176
                unset($this->channels[$channel]);
177
            });
178
        });
179
180
        // Share stream amount subscribers to this channel
181
        $this->channels[$channel] = $events->share();
182
183
        return $this->channels[$channel];
184
    }
185
186
    /**
187
     * Send a message through the client.
188
     *
189
     * @param array $message Message to send, will be json encoded
190
     *
191
     * @return A bool indicating whether or not the connection was active
192
     *           and the given message has been pass onto the connection.
193
     */
194
    public function send(array $message): bool
195
    {
196
        // Don't send messages when we aren't connected
197
        if ($this->sendSubject ===  null) {
198
            return false;
199
        }
200
201
        $this->sendSubject->onNext(json_encode($message));
202
203
        return true;
204
    }
205
206
    private function handleLowLevelError(Throwable $throwable)
0 ignored issues
show
Unused Code introduced by
The parameter $throwable is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
207
    {
208
        $this->delay *= 2;
209
210
        return Observable::timer($this->delay);
211
    }
212
213
    /**
214
     * @param string $channel
215
     */
216
    private function subscribeOnChannel(string $channel)
217
    {
218
        $this->send(['event' => 'pusher:subscribe', 'data' => ['channel' => $channel]]);
219
    }
220
221
    /**
222
     * Get connection activity timeout from connection established event.
223
     *
224
     * @param Event $event
225
     */
226
    private function setActivityTimeout(Event $event)
227
    {
228
        $data = $event->getData();
229
230
        // No activity_timeout found on event
231
        if (!isset($data['activity_timeout'])) {
232
            return;
233
        }
234
235
        // activity_timeout holds zero or invalid value (we don't want to hammer Pusher)
236
        if ((int)$data['activity_timeout'] <= 0) {
237
            return;
238
        }
239
240
        $this->noActivityTimeout = (int)$data['activity_timeout'];
241
    }
242
}
243