GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — develop ( 8cc0dc...800d8a )
by Baptiste
03:38
created

Connection   A

Complexity

Total Complexity 30

Size/Duplication

Total Lines 256
Duplicated Lines 0 %

Test Coverage

Coverage 87.31%

Importance

Changes 0
Metric Value
eloc 137
dl 0
loc 256
ccs 117
cts 134
cp 0.8731
rs 10
c 0
b 0
f 0
wmc 30

12 Methods

Rating   Name   Duplication   Size   Complexity  
A handshake() 0 29 2
A openVHost() 0 7 1
A close() 0 11 2
A closed() 0 3 2
A __construct() 0 23 1
B wait() 0 54 11
A open() 0 12 2
A maxFrameSize() 0 3 1
A protocol() 0 3 1
A start() 0 35 3
A buildSocket() 0 7 1
A send() 0 21 3
1
<?php
2
declare(strict_types = 1);
3
4
namespace Innmind\AMQP\Transport\Connection;
5
6
use Innmind\AMQP\{
7
    Transport\Connection as ConnectionInterface,
8
    Transport\Frame,
9
    Transport\Protocol,
10
    Transport\Protocol\Version,
11
    Transport\Frame\Type,
12
    Transport\Frame\Method,
13
    Transport\Frame\Value\UnsignedOctet,
14
    Model\Connection\StartOk,
15
    Model\Connection\SecureOk,
16
    Model\Connection\TuneOk,
17
    Model\Connection\Open,
18
    Model\Connection\Close,
19
    Model\Connection\MaxChannels,
20
    Model\Connection\MaxFrameSize,
21
    Exception\FrameChannelExceedAllowedChannelNumber,
22
    Exception\FrameExceedAllowedSize,
23
    Exception\UnexpectedFrame,
24
    Exception\NoFrameDetected,
25
    Exception\ConnectionClosed,
26
    Exception\ExpectedMethodFrame,
27
};
28
use Innmind\Socket\{
29
    Internet\Transport,
30
    Client\Internet as Socket,
31
};
32
use Innmind\Stream\Select;
33
use Innmind\Url\{
34
    UrlInterface,
35
    Authority\NullUserInformation,
36
};
37
use Innmind\TimeContinuum\{
38
    ElapsedPeriod,
39
    TimeContinuumInterface,
40
};
41
use Innmind\OperatingSystem\Remote;
42
use Innmind\Immutable\Str;
43
44
final class Connection implements ConnectionInterface
45
{
46
    private $transport;
47
    private $authority;
48
    private $vhost;
49
    private $protocol;
50
    private $socket;
51
    private $timeout;
52
    private $remote;
53
    private $select;
54
    private $read;
55
    private $closed = true;
56
    private $opening = true;
57
    private $maxChannels;
58
    private $maxFrameSize;
59
    private $heartbeat;
60
    private $clock;
61
    private $lastReceivedData;
62
63 49
    public function __construct(
64
        Transport $transport,
65
        UrlInterface $server,
66
        Protocol $protocol,
67
        ElapsedPeriod $timeout,
68
        TimeContinuumInterface $clock,
69
        Remote $remote
70
    ) {
71 49
        $this->transport = $transport;
72 49
        $this->authority = $server->authority();
73 49
        $this->vhost = $server->path();
74 49
        $this->protocol = $protocol;
75 49
        $this->timeout = $timeout;
76 49
        $this->remote = $remote;
77 49
        $this->buildSocket();
78 45
        $this->read = new FrameReader;
79 45
        $this->maxChannels = new MaxChannels(0);
80 45
        $this->maxFrameSize = new MaxFrameSize(0);
81 45
        $this->heartbeat = $timeout;
82 45
        $this->clock = $clock;
83 45
        $this->lastReceivedData = $clock->now();
84
85 45
        $this->open();
86 45
    }
87
88 41
    public function protocol(): Protocol
89
    {
90 41
        return $this->protocol;
91
    }
92
93 45
    public function send(Frame $frame): ConnectionInterface
94
    {
95 45
        if (!$this->maxChannels->allows($frame->channel()->toInt())) {
96
            throw new FrameChannelExceedAllowedChannelNumber(
97
                $frame->channel(),
98
                $this->maxChannels
99
            );
100
        }
101
102 45
        $frame = Str::of((string) $frame)->toEncoding('ASCII');
103
104 45
        if (!$this->maxFrameSize->allows($frame->length())) {
105
            throw new FrameExceedAllowedSize(
106
                $frame->length(),
107
                $this->maxFrameSize
108
            );
109
        }
110
111 45
        $this->socket->write($frame);
112
113 45
        return $this;
114
    }
115
116
    /**
117
     * {@inheritdoc}
118
     */
119 45
    public function wait(string ...$names): Frame
120
    {
121
        do {
122 45
            if (!$this->opening && $this->closed()) {
123
                throw new ConnectionClosed;
124
            }
125
126 45
            $now = $this->clock->now();
127 45
            $elapsedPeriod = $now->elapsedSince($this->lastReceivedData);
128
129 45
            if ($elapsedPeriod->longerThan($this->heartbeat)) {
130 1
                $this->send(Frame::heartbeat());
131
            }
132
133 45
            $streams = ($this->select)();
134 45
        } while (!$streams->get('read')->contains($this->socket));
135
136 45
        $frame = ($this->read)($this->socket, $this->protocol);
137 45
        $this->lastReceivedData = $this->clock->now();
138
139 45
        if ($frame->type() === Type::heartbeat()) {
140
            return $this->wait(...$names);
141
        }
142
143 45
        if (\count($names) === 0) {
144 16
            return $frame;
145
        }
146
147 45
        if ($frame->type() !== Type::method()) {
148
            //someone must have forgot a wait() call
149
            throw new ExpectedMethodFrame($frame->type());
150
        }
151
152 45
        foreach ($names as $name) {
153 45
            if ($frame->is($this->protocol->method($name))) {
154 45
                return $frame;
155
            }
156
        }
157
158 2
        if ($frame->is($this->protocol->method('connection.close'))) {
159 1
            $this->send($this->protocol->connection()->closeOk());
160 1
            $this->closed = true;
161
162 1
            throw ConnectionClosed::byServer(
163 1
                (string) $frame->values()->get(1)->original(),
164 1
                $frame->values()->get(0)->original()->value(),
165 1
                new Method(
166 1
                    $frame->values()->get(2)->original()->value(),
167 1
                    $frame->values()->get(3)->original()->value()
168
                )
169
            );
170
        }
171
172 1
        throw new UnexpectedFrame($frame, ...$names);
173
    }
174
175 18
    public function maxFrameSize(): MaxFrameSize
176
    {
177 18
        return $this->maxFrameSize;
178
    }
179
180 38
    public function close(): void
181
    {
182 38
        if ($this->closed()) {
183
            return;
184
        }
185
186
        $this
187 38
            ->send($this->protocol->connection()->close(new Close))
188 38
            ->wait('connection.close-ok');
189 38
        $this->socket->close();
190 38
        $this->closed = true;
191 38
    }
192
193 45
    public function closed(): bool
194
    {
195 45
        return $this->closed || $this->socket->closed();
196
    }
197
198 49
    private function buildSocket(): void
199
    {
200 49
        $this->socket = $this->remote->socket(
201 49
            $this->transport,
202 49
            $this->authority->withUserInformation(new NullUserInformation)
203
        );
204 45
        $this->select = (new Select($this->timeout))->forRead($this->socket);
205 45
    }
206
207 45
    private function open(): void
208
    {
209 45
        if (!$this->closed()) {
210
            return;
211
        }
212
213 45
        $this->start();
214 45
        $this->handshake();
215 45
        $this->openVHost();
216
217 45
        $this->closed = false;
218 45
        $this->opening = false;
219 45
    }
220
221 45
    private function start(): void
222
    {
223 45
        $this->socket->write(
224 45
            new Str((string) $this->protocol->version())
225
        );
226
227
        try {
228 45
            $frame = $this->wait('connection.start');
0 ignored issues
show
Unused Code introduced by
The assignment to $frame is dead and can be removed.
Loading history...
229 1
        } catch (NoFrameDetected $e) {
230 1
            $content = $e->content();
231
232 1
            if ((string) $content->read(4) !== 'AMQP') {
233
                throw $e;
234
            }
235
236 1
            $content->read(1); // there is a zero between AMQP and version number
237
238 1
            $this->protocol->use(
239 1
                new Version(
240 1
                    UnsignedOctet::fromStream($content)->original()->value(),
241 1
                    UnsignedOctet::fromStream($content)->original()->value(),
242 1
                    UnsignedOctet::fromStream($content)->original()->value()
243
                )
244
            );
245
            //socket rebuilt as the server close the connection on version mismatch
246 1
            $this->buildSocket();
247 1
            $this->start();
248
249 1
            return;
250
        }
251
252 45
        $this->send($this->protocol->connection()->startOk(
253 45
            new StartOk(
254 45
                $this->authority->userInformation()->user(),
255 45
                $this->authority->userInformation()->password()
256
            )
257
        ));
258 45
    }
259
260 45
    private function handshake(): void
261
    {
262 45
        $frame = $this->wait('connection.secure', 'connection.tune');
263
264 45
        if ($frame->is($this->protocol->method('connection.secure'))) {
265
            $this->send($this->protocol->connection()->secureOk(
266
                new SecureOk(
267
                    $this->authority->userInformation()->user(),
268
                    $this->authority->userInformation()->password()
269
                )
270
            ));
271
            $frame = $this->wait('connection.tune');
272
        }
273
274 45
        $this->maxChannels = new MaxChannels(
275 45
            $frame->values()->get(0)->original()->value()
276
        );
277 45
        $this->maxFrameSize = new MaxFrameSize(
278 45
            $frame->values()->get(1)->original()->value()
279
        );
280 45
        $this->heartbeat = new ElapsedPeriod(
281 45
            $frame->values()->get(2)->original()->value()
282
        );
283 45
        $this->select = (new Select($this->heartbeat))->forRead($this->socket);
284 45
        $this->send($this->protocol->connection()->tuneOk(
285 45
            new TuneOk(
286 45
                $this->maxChannels,
287 45
                $this->maxFrameSize,
288 45
                $this->heartbeat
289
            )
290
        ));
291 45
    }
292
293 45
    private function openVHost(): void
294
    {
295
        $this
296 45
            ->send($this->protocol->connection()->open(
297 45
                new Open($this->vhost)
298
            ))
299 45
            ->wait('connection.open-ok');
300 45
    }
301
}
302