GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( e6634e...05453b )
by Cees-Jan
8s
created

AsyncClient::channel()   B

Complexity

Conditions 3
Paths 2

Size

Total Lines 35
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12
Metric Value
dl 0
loc 35
ccs 0
cts 24
cp 0
rs 8.8571
cc 3
eloc 21
nc 2
nop 1
crap 12
1
<?php
2
declare(strict_types=1);
3
4
namespace WyriHaximus\Pusher;
5
6
use React\EventLoop\LoopInterface;
7
use Rx\Disposable\CallbackDisposable;
8
use Rx\Observable;
9
use Rx\ObservableInterface;
10
use Rx\Observer\CallbackObserver;
11
use Rx\ObserverInterface;
12
use Rx\React\Promise;
13
use Rx\Scheduler\EventLoopScheduler;
14
use Rx\SchedulerInterface;
15
use Rx\Websocket\Client;
16
use Rx\Websocket\MessageSubject;
17
use WyriHaximus\ApiClient\Transport\Client as Transport;
18
use WyriHaximus\ApiClient\Transport\Factory;
19
use function React\Promise\resolve;
20
use function EventLoop\getLoop;
21
use function EventLoop\setLoop;
22
23
class AsyncClient
24
{
25
    protected $transport;
26
    protected $app;
27
    protected $url;
28
    protected $client;
29
    protected $messages;
30
    protected $channels = [];
31
32
    public function __construct(LoopInterface $loop, string $app, Transport $transport = null)
0 ignored issues
show
Unused Code introduced by
The parameter $transport is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
33
    {
34
        setLoop($loop);
35
        /*if (!($transport instanceof Transport)) {
0 ignored issues
show
Unused Code Comprehensibility introduced by
51% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
36
            $transport = Factory::create($loop, [
37
                    'resource_namespace' => 'Async',
38
                ] + ApiSettings::TRANSPORT_OPTIONS);
39
        }
40
        $this->transport = $transport;*/
41
42
        $this->app = $app;
43
44
        $this->url = 'wss://ws.pusherapp.com/app/' .
45
            $this->app .
46
            '?client=wyrihaximus-php-pusher-client&version=0.0.1&protocol=7'
47
        ;
48
        //Only create one connection and share the most recent among all subscriber
49
        $this->client   = (new Client($this->url))->shareReplay(1);
50
        $this->messages = $this->client
51
            ->flatMap(function (MessageSubject $ms) {
52
                return $ms;
53
            })
54
            ->map('json_decode');
55
    }
56
57
    public function channel(string $channel): ObservableInterface
58
    {
59
        if (isset($this->channels[$channel])) {
60
            return $this->channels[$channel];
61
        }
62
63
        $channelMessages = $this->messages->filter(function ($event) use ($channel) {
64
            return isset($event->channel) && $event->channel == $channel;
65
        });
66
67
        $events = Observable::create(function (
68
            ObserverInterface $observer,
69
            SchedulerInterface $scheduler
70
        ) use (
71
            $channel,
72
            $channelMessages
73
        ) {
74
75
            $subscription = $channelMessages
76
                ->filter(function ($msg) {
77
                    return $msg->event !== 'pusher_internal:subscription_succeeded';
78
                })
79
                ->subscribe($observer, $scheduler);
80
81
            $this->send(['event' => 'pusher:subscribe', 'data' => ['channel' => $channel]]);
82
83
            return new CallbackDisposable(function () use ($channel, $subscription) {
84
                $this->send(['event' => 'pusher:unsubscribe', 'data' => ['channel' => $channel]]);
85
                $subscription->dispose();
86
            });
87
        });
88
89
        $this->channels[$channel] = $events->share();
90
        return $this->channels[$channel];
91
    }
92
93
    public function send(array $message)
94
    {
95
        $this->client
96
            ->take(1)
97
            ->subscribeCallback(function (MessageSubject $ms) use ($message) {
98
                $ms->send(json_encode($message));
99
            });
100
    }
101
}
102