GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 1f83af...994fb5 )
by Cees-Jan
04:40
created

AsyncClient::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 13
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 13
ccs 0
cts 8
cp 0
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 7
nc 1
nop 2
crap 2
1
<?php declare(strict_types=1);
2
3
namespace ApiClients\Client\Pusher;
4
5
use React\EventLoop\LoopInterface;
6
use Rx\Disposable\CallbackDisposable;
7
use Rx\Observable;
8
use Rx\ObservableInterface;
9
use Rx\ObserverInterface;
10
use Rx\SchedulerInterface;
11
use Rx\Websocket\Client as WebsocketClient;
12
use Rx\Websocket\MessageSubject;
13
use function React\Promise\resolve;
14
use function EventLoop\getLoop;
15
use function EventLoop\setLoop;
16
17
final class AsyncClient
18
{
19
    /**
20
     * @var Observable\RefCountObservable
21
     */
22
    protected $client;
23
24
    /**
25
     * @var Observable\AnonymousObservable
26
     */
27
    protected $messages;
28
29
    /**
30
     * @var array
31
     */
32
    protected $channels = [];
33
34
    /**
35
     * @param LoopInterface $loop
36
     * @param string $app Application ID
37
     */
38
    public function __construct(LoopInterface $loop, string $app)
39
    {
40
        // Set loop into global look accessor
41
        setLoop($loop);
42
43
        //Only create one connection and share the most recent among all subscriber
44
        $this->client   = (new WebsocketClient(ApiSettings::createUrl($app)))->shareReplay(1);
45
        $this->messages = $this->client
46
            ->flatMap(function (MessageSubject $ms) {
47
                return $ms;
48
            })
49
            ->map('json_decode');
50
    }
51
52
    /**
53
     * Listen on a channel
54
     *
55
     * @param string $channel Channel to listen on
56
     * @return ObservableInterface
57
     */
58
    public function channel(string $channel): ObservableInterface
59
    {
60
        if (isset($this->channels[$channel])) {
61
            return $this->channels[$channel];
62
        }
63
64
        $channelMessages = $this->messages->filter(function ($event) use ($channel) {
65
            return isset($event->channel) && $event->channel == $channel;
66
        });
67
68
        $events = Observable::create(function (
69
            ObserverInterface $observer,
70
            SchedulerInterface $scheduler
71
        ) use (
72
            $channel,
73
            $channelMessages
74
        ) {
75
            $subscription = $channelMessages
76
                ->filter(function ($msg) {
77
                    return $msg->event !== 'pusher_internal:subscription_succeeded';
78
                })
79
                ->subscribe($observer, $scheduler);
80
81
            $this->send(['event' => 'pusher:subscribe', 'data' => ['channel' => $channel]]);
82
83
            return new CallbackDisposable(function () use ($channel, $subscription) {
84
                $this->send(['event' => 'pusher:unsubscribe', 'data' => ['channel' => $channel]]);
85
                $subscription->dispose();
86
            });
87
        });
88
89
        $this->channels[$channel] = $events->share();
90
        return $this->channels[$channel];
91
    }
92
93
    /**
94
     * Send a message through the client
95
     *
96
     * @param array $message Message to send, will be json encoded
97
     */
98
    public function send(array $message)
99
    {
100
        $this->client
101
            ->take(1)
102
            ->subscribeCallback(function (MessageSubject $ms) use ($message) {
103
                $ms->send(json_encode($message));
104
            });
105
    }
106
}
107