GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 515224...041874 )
by Cees-Jan
04:19
created

AsyncClient   A

Complexity

Total Complexity 5

Size/Duplication

Total Lines 93
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 9

Test Coverage

Coverage 47.37%

Importance

Changes 0
Metric Value
wmc 5
lcom 1
cbo 9
dl 0
loc 93
ccs 9
cts 19
cp 0.4737
rs 10
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 16 1
B channel() 0 34 3
A send() 0 8 1
1
<?php declare(strict_types=1);
2
3
namespace ApiClients\Client\Pusher;
4
5
use React\EventLoop\LoopInterface;
6
use Rx\Disposable\CallbackDisposable;
7
use Rx\Observable;
8
use Rx\ObservableInterface;
9
use Rx\ObserverInterface;
10
use Rx\SchedulerInterface;
11
use Rx\Websocket\Client as WebsocketClient;
12
use Rx\Websocket\MessageSubject;
13
use function React\Promise\resolve;
14
use function EventLoop\getLoop;
15
use function EventLoop\setLoop;
16
17
final class AsyncClient
18
{
19
    /**
20
     * @var Observable\RefCountObservable
21
     */
22
    protected $client;
23
24
    /**
25
     * @var Observable\AnonymousObservable
26
     */
27
    protected $messages;
28
29
    /**
30
     * @var array
31
     */
32
    protected $channels = [];
33
34
    /**
35
     * @param LoopInterface $loop
36
     * @param string $app Application ID
37
     */
38 2
    public function __construct(LoopInterface $loop, string $app)
39
    {
40
        // Set loop into global look accessor
41 2
        setLoop($loop);
42
43
        //Only create one connection and share the most recent among all subscriber
44 2
        $this->client   = (new WebsocketClient(ApiSettings::createUrl($app)))->shareReplay(1);
45 2
        $this->messages = $this->client
46 2
            ->flatMap(function (MessageSubject $ms) {
47
                return $ms;
48 2
            })
49 2
            ->_ApiClients_jsonDecode()
50 2
            ->map(function (array $message) {
51
                return Event::createFromMessage($message);
52 2
            });
53
    }
54
55
    /**
56
     * Listen on a channel
57
     *
58
     * @param string $channel Channel to listen on
59
     * @return Observable
60
     */
61
    public function channel(string $channel): Observable
62
    {
63
        if (isset($this->channels[$channel])) {
64
            return $this->channels[$channel];
65
        }
66
67
        $channelMessages = $this->messages->filter(function (Event $event) use ($channel) {
68
            return $event->getChannel() !== '' && $event->getChannel() === $channel;
69
        });
70
71
        $events = Observable::create(function (
72
            ObserverInterface $observer,
73
            SchedulerInterface $scheduler
74
        ) use (
75
            $channel,
76
            $channelMessages
77
        ) {
78
            $subscription = $channelMessages
79
                ->filter(function (Event $event) {
80
                    return $event->getEvent() !== 'pusher_internal:subscription_succeeded';
81
                })
82
                ->subscribe($observer, $scheduler);
83
84
            $this->send(['event' => 'pusher:subscribe', 'data' => ['channel' => $channel]]);
85
86
            return new CallbackDisposable(function () use ($channel, $subscription) {
87
                $this->send(['event' => 'pusher:unsubscribe', 'data' => ['channel' => $channel]]);
88
                $subscription->dispose();
89
            });
90
        });
91
92
        $this->channels[$channel] = $events->share();
93
        return $this->channels[$channel];
94
    }
95
96
    /**
97
     * Send a message through the client
98
     *
99
     * @param array $message Message to send, will be json encoded
100
     */
101
    public function send(array $message)
102
    {
103
        $this->client
104
            ->take(1)
105
            ->subscribeCallback(function (MessageSubject $ms) use ($message) {
106
                $ms->send(json_encode($message));
107
            });
108
    }
109
}
110