GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — develop ( 14a217...c3bf76 )
by Baptiste
03:06
created

Connection   C

Complexity

Total Complexity 20

Size/Duplication

Total Lines 182
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 26

Test Coverage

Coverage 86.73%

Importance

Changes 0
Metric Value
wmc 20
lcom 1
cbo 26
dl 0
loc 182
ccs 85
cts 98
cp 0.8673
rs 5
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 22 1
A protocol() 0 4 1
A send() 0 22 3
A wait() 0 16 4
A __destruct() 0 4 1
A open() 0 12 2
A close() 0 12 2
A opened() 0 4 2
A start() 0 21 1
B handshake() 0 32 2
A openVHost() 0 8 1
1
<?php
2
declare(strict_types = 1);
3
4
namespace Innmind\AMQP\Transport;
5
6
use Innmind\AMQP\{
7
    Transport\Connection\FrameReader,
8
    Transport\Protocol\Version,
9
    Model\Connection\StartOk,
10
    Model\Connection\SecureOk,
11
    Model\Connection\TuneOk,
12
    Model\Connection\Open,
13
    Model\Connection\Close,
14
    Model\Connection\MaxChannels,
15
    Model\Connection\MaxFrameSize,
16
    Exception\FrameChannelExceedAllowedChannelNumber,
17
    Exception\FrameExceedAllowedSize,
18
    Exception\UnexpectedFrame
19
};
20
use Innmind\Socket\{
21
    Internet\Transport,
22
    Client\Internet as Socket
23
};
24
use Innmind\Stream\Select;
25
use Innmind\Url\{
26
    UrlInterface,
27
    Authority\NullUserInformation
28
};
29
use Innmind\TimeContinuum\ElapsedPeriod;
30
use Innmind\Immutable\Str;
31
32
final class Connection
33
{
34
    private $transport;
35
    private $authority;
36
    private $vhost;
37
    private $protocol;
38
    private $socket;
39
    private $select;
40
    private $read;
41
    private $opened = false;
42
    private $maxChannels;
43
    private $maxFrameSize;
44
    private $heartbeat;
45
46 2
    public function __construct(
47
        Transport $transport,
48
        UrlInterface $server,
49
        Protocol $protocol,
50
        ElapsedPeriod $timeout
51
    ) {
52 2
        $this->transport = $transport;
53 2
        $this->authority = $server->authority();
54 2
        $this->vhost = $server->path();
55 2
        $this->protocol = $protocol;
56 2
        $this->socket = new Socket(
57 2
            $transport,
58 2
            $this->authority->withUserInformation(new NullUserInformation)
59
        );
60 2
        $this->select = (new Select($timeout))->forRead($this->socket);
61 2
        $this->read = new FrameReader;
62 2
        $this->maxChannels = new MaxChannels(0);
63 2
        $this->maxFrameSize = new MaxFrameSize(0);
64 2
        $this->heartbeat = $timeout;
65
66 2
        $this->open();
67 2
    }
68
69 1
    public function protocol(): Protocol
70
    {
71 1
        return $this->protocol;
72
    }
73
74 2
    public function send(Frame $frame): self
75
    {
76 2
        if (!$this->maxChannels->allows($frame->channel()->toInt())) {
77
            throw new FrameChannelExceedAllowedChannelNumber(
78
                $frame->channel(),
79
                $this->maxChannels
80
            );
81
        }
82
83 2
        $frame = (new Str((string) $frame))->toEncoding('ASCII');
84
85 2
        if (!$this->maxFrameSize->allows($frame->length())) {
86
            throw new FrameExceedAllowedSize(
87
                $frame->length(),
88
                $this->maxFrameSize
89
            );
90
        }
91
92 2
        $this->socket->write($frame);
93
94 2
        return $this;
95
    }
96
97 2
    public function wait(string ...$names): Frame
98
    {
99
        do {
100 2
            $streams = ($this->select)();
101 2
        } while (!$streams->get('read')->contains($this->socket));
102
103 2
        $frame = ($this->read)($this->socket, $this->protocol);
104
105 2
        foreach ($names as $name) {
106 2
            if ($this->protocol->method($name)->equals($frame->method())) {
107 2
                return $frame;
108
            }
109
        }
110
111 1
        throw new UnexpectedFrame($frame->method(), ...$names);
112
    }
113
114 2
    public function __destruct()
115
    {
116 2
        $this->close();
117 2
    }
118
119 2
    private function open(): void
120
    {
121 2
        if ($this->opened()) {
122
            return;
123
        }
124
125 2
        $this->start();
126 2
        $this->handshake();
127 2
        $this->openVHost();
128
129 2
        $this->opened = true;
130 2
    }
131
132 2
    private function close(): void
133
    {
134 2
        if (!$this->opened()) {
135
            return;
136
        }
137
138
        $this
139 2
            ->send($this->protocol->connection()->close(new Close))
140 2
            ->wait('connection.close-ok');
141 2
        $this->socket->close();
142 2
        $this->opened = false;
143 2
    }
144
145 2
    private function opened(): bool
146
    {
147 2
        return $this->opened && !$this->socket->closed();
148
    }
149
150 2
    private function start(): void
151
    {
152 2
        $this->socket->write(
153 2
            new Str((string) $this->protocol->version())
154
        );
155
156 2
        $frame = $this->wait('connection.start');
157 2
        $this->protocol->use(
158 2
            new Version(
159 2
                $frame->values()->get(0)->original()->value(),
160 2
                $frame->values()->get(1)->original()->value(),
161 2
                0 //server doesn't provide bugfix version
162
            )
163
        );
164 2
        $this->send($this->protocol->connection()->startOk(
165 2
            new StartOk(
166 2
                $this->authority->userInformation()->user(),
167 2
                $this->authority->userInformation()->password()
168
            )
169
        ));
170 2
    }
171
172 2
    private function handshake(): void
173
    {
174 2
        $frame = $this->wait('connection.secure', 'connection.tune');
175
176 2
        if ($this->protocol->method('connection.secure')->equals($frame->method())) {
177
            $this->send($this->protocol->connection()->secureOk(
178
                new SecureOk(
179
                    $this->authority->userInformation()->user(),
180
                    $this->authority->userInformation()->password()
181
                )
182
            ));
183
            $frame = $this->wait('connection.tune');
184
        }
185
186 2
        $this->maxChannels = new MaxChannels(
187 2
            $frame->values()->get(0)->original()->value()
188
        );
189 2
        $this->maxFrameSize = new MaxFrameSize(
190 2
            $frame->values()->get(1)->original()->value()
191
        );
192 2
        $this->heartbeat = new ElapsedPeriod(
193 2
            $frame->values()->get(2)->original()->value()
194
        );
195 2
        $this->select = (new Select($this->heartbeat))->forRead($this->socket);
196 2
        $this->send($this->protocol->connection()->tuneOk(
197 2
            new TuneOk(
198 2
                $this->maxChannels,
199 2
                $this->maxFrameSize,
200 2
                $this->heartbeat
201
            )
202
        ));
203 2
    }
204
205 2
    private function openVHost(): void
206
    {
207
        $this
208 2
            ->send($this->protocol->connection()->open(
209 2
                new Open($this->vhost)
210
            ))
211 2
            ->wait('connection.open-ok');
212 2
    }
213
}
214