GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#3)
by Cees-Jan
01:52
created

AsyncClient   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 233
Duplicated Lines 0 %

Coupling/Cohesion

Components 3
Dependencies 11

Test Coverage

Coverage 25%

Importance

Changes 0
Metric Value
wmc 15
lcom 3
cbo 11
dl 0
loc 233
ccs 19
cts 76
cp 0.25
rs 10
c 0
b 0
f 0

7 Methods

Rating   Name   Duplication   Size   Complexity  
A send() 0 10 2
A handleLowLevelError() 0 5 1
A subscribeOnChannel() 0 4 1
A setActivityTimeout() 0 16 3
A create() 0 20 2
A __construct() 0 63 3
B channel() 0 39 3
1
<?php declare(strict_types=1);
2
3
namespace ApiClients\Client\Pusher;
4
5
use React\Dns\Resolver\Resolver;
6
use React\EventLoop\LoopInterface;
7
use Rx\Disposable\CallbackDisposable;
8
use Rx\Observable;
9
use Rx\ObserverInterface;
10
use Rx\Scheduler;
11
use Rx\Websocket\Client as WebsocketClient;
12
use Rx\Websocket\MessageSubject;
13
use Throwable;
14
15
final class AsyncClient
16
{
17
    const NO_ACTIVITY_TIMEOUT = 120;
18
    const NO_PING_RESPONSE_TIMEOUT = 30;
19
20
    protected $noActivityTimeout = self::NO_ACTIVITY_TIMEOUT;
21
22
    /**
23
     * @var LoopInterface
24
     */
25
    protected $loop;
26
27
    /**
28
     * @var Observable\RefCountObservable
29
     */
30
    protected $client;
31
32
    /**
33
     * @var Observable\AnonymousObservable
34
     */
35
    protected $messages;
36
37
    /**
38
     * @var MessageSubject
39
     */
40
    protected $sendSubject;
41
42
    /**
43
     * @var array
44
     */
45
    protected $channels = [];
46
47
    /**
48
     * @var int
49
     */
50
    protected $delay = 200;
51
52
    /**
53
     * @param LoopInterface $loop
54
     * @param string $app Application ID
55
     * @param Resolver $resolver Optional DNS resolver
56
     * @return AsyncClient
57
     */
58 3
    public static function create(LoopInterface $loop, string $app, Resolver $resolver = null): AsyncClient
59
    {
60
        try {
61
            Scheduler::setAsyncFactory(function () use ($loop) {
62
                return new Scheduler\EventLoopScheduler($loop);
0 ignored issues
show
Documentation introduced by
$loop is of type object<React\EventLoop\LoopInterface>, but the function expects a callable.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
63 3
            });
64
        } catch (Throwable $t) {
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
65
        }
66
67 3
        return new self(
68
            $loop,
69 3
            new WebsocketClient(
70 3
                ApiSettings::createUrl($app),
71 3
                false,
72 3
                [],
73
                $loop,
74
                $resolver
75
            )
76
        );
77
    }
78
79
    /**
80
     * @internal
81
     */
82 3
    public function __construct(LoopInterface $loop, WebsocketClient $client)
83
    {
84 3
        $this->loop = $loop;
85 3
        $this->messages = $client
86
            // Save this subject for sending stuff
87
            ->do(function (MessageSubject $ms) {
88
                $this->sendSubject = $ms;
89
90
                // Resubscribe to an channels we where subscribed to when disconnected
91
                foreach ($this->channels as $channel => $_) {
92
                    $this->subscribeOnChannel($channel);
93
                }
94 3
            })
95
96
            // Make sure if there is a disconnect or something
97
            // that we unset the sendSubject
98
            ->finally(function () {
99
                $this->sendSubject = null;
100 3
            })
101
102
            ->flatMap(function (MessageSubject $ms) {
103
                return $ms;
104 3
            })
105
106
            // This is the ping/timeout functionality
107
            ->flatMapLatest(function ($x) {
108
                // this Observable emits the current value immediately
109
                // if another value comes along, this all gets disposed (because we are using flatMapLatest)
110
                // before the timeouts start get triggered
111
                return Observable::never()
112
                    ->timeout($this->noActivityTimeout * 1000)
113
                    ->catch(function () use ($x) {
114
                        // ping (do something that causes incoming stream to get a message)
115
                        $this->send(['event' => 'pusher:ping']);
116
                        // this timeout will actually timeout with a TimeoutException - causing
117
                        //   everything above this to dispose
118
                        return Observable::never()->timeout(self::NO_PING_RESPONSE_TIMEOUT * 1000);
119
                    })
120
                    ->startWith($x);
121 3
            })
122
123
            // Handle connection level errors
124
            ->retryWhen(function (Observable $errors) {
125
                return $errors->flatMap(function (Throwable $throwable) {
126
                    return $this->handleLowLevelError($throwable);
127
                });
128 3
            })
129
130
            // Decode JSON
131 3
            ->_ApiClients_jsonDecode()
132
133
            // Deal with connection established messages
134
            ->map(function (array $message) {
135
                $event = Event::createFromMessage($message);
136
137
                if ($event->getEvent() === 'pusher:connection_established') {
138
                    $this->setActivityTimeout($event);
139
                }
140
141
                return $event;
142 3
            })
143 3
            ->share();
144 3
    }
145
146
    /**
147
     * Listen on a channel
148
     *
149
     * @param string $channel Channel to listen on
150
     * @return Observable
151
     */
152
    public function channel(string $channel): Observable
153
    {
154
        if (isset($this->channels[$channel])) {
155
            return $this->channels[$channel];
156
        }
157
158
        // Ensure we only get messages for the given channel
159
        $channelMessages = $this->messages->filter(function (Event $event) use ($channel) {
160
            return $event->getChannel() !== '' && $event->getChannel() === $channel;
161
        });
162
163
        $events = Observable::create(function (
164
            ObserverInterface $observer
165
        ) use (
166
            $channel,
167
            $channelMessages
168
        ) {
169
            $subscription = $channelMessages
170
                ->filter(function (Event $event) {
171
                    return $event->getEvent() !== 'pusher_internal:subscription_succeeded';
172
                })
173
                ->subscribe($observer);
0 ignored issues
show
Documentation introduced by
$observer is of type object<Rx\ObserverInterface>, but the function expects a callable|null.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
174
175
            $this->subscribeOnChannel($channel);
176
177
            return new CallbackDisposable(function () use ($channel, $subscription) {
178
                // Send unsubscribe event
179
                $this->send(['event' => 'pusher:unsubscribe', 'data' => ['channel' => $channel]]);
180
                // Dispose our own subscription to messages
181
                $subscription->dispose();
182
                // Remove our channel from the channel list so we don't resubscribe in case we reconnect
183
                unset($this->channels[$channel]);
184
            });
185
        });
186
187
        // Share stream amount subscribers to this channel
188
        $this->channels[$channel] = $events->share();
189
        return $this->channels[$channel];
190
    }
191
192
    /**
193
     * Send a message through the client
194
     *
195
     * @param array $message Message to send, will be json encoded
196
     *
197
     * @return A bool indicating whether or not the connection was active
198
     *         and the given message has been pass onto the connection.
199
     */
200
    public function send(array $message): bool
201
    {
202
        // Don't send messages when we aren't connected
203
        if ($this->sendSubject ===  null) {
204
            return false;
205
        }
206
207
        $this->sendSubject->onNext(json_encode($message));
208
        return true;
209
    }
210
211
    private function handleLowLevelError(Throwable $throwable)
0 ignored issues
show
Unused Code introduced by
The parameter $throwable is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
212
    {
213
        $this->delay *= 2;
214
        return Observable::timer($this->delay);
215
    }
216
217
    /**
218
     * @param string $channel
219
     */
220
    private function subscribeOnChannel(string $channel)
221
    {
222
        $this->send(['event' => 'pusher:subscribe', 'data' => ['channel' => $channel]]);
223
    }
224
225
226
    /**
227
     * Get connection activity timeout from connection established event
228
     *
229
     * @param Event $event
230
     */
231
    private function setActivityTimeout(Event $event)
232
    {
233
        $data = $event->getData();
234
235
        // No activity_timeout found on event
236
        if (!isset($data['activity_timeout'])) {
237
            return;
238
        }
239
240
        // activity_timeout holds zero or invalid value (we don't want to hammer Pusher)
241
        if ((int)$data['activity_timeout'] <= 0) {
242
            return;
243
        }
244
245
        $this->noActivityTimeout = (int)$data['activity_timeout'];
246
    }
247
}
248