GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

Connection::handshake()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 29
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 2.0625

Importance

Changes 0
Metric Value
cc 2
eloc 19
nc 2
nop 0
dl 0
loc 29
ccs 15
cts 20
cp 0.75
crap 2.0625
rs 9.6333
c 0
b 0
f 0
1
<?php
2
declare(strict_types = 1);
3
4
namespace Innmind\AMQP\Transport\Connection;
5
6
use Innmind\AMQP\{
7
    Transport\Connection as ConnectionInterface,
8
    Transport\Frame,
9
    Transport\Protocol,
10
    Transport\Protocol\Version,
11
    Transport\Frame\Type,
12
    Transport\Frame\Method,
13
    Transport\Frame\Value\UnsignedOctet,
14
    Model\Connection\StartOk,
15
    Model\Connection\SecureOk,
16
    Model\Connection\TuneOk,
17
    Model\Connection\Open,
18
    Model\Connection\Close,
19
    Model\Connection\MaxChannels,
20
    Model\Connection\MaxFrameSize,
21
    Exception\FrameChannelExceedAllowedChannelNumber,
22
    Exception\FrameExceedAllowedSize,
23
    Exception\UnexpectedFrame,
24
    Exception\NoFrameDetected,
25
    Exception\ConnectionClosed,
26
    Exception\ExpectedMethodFrame,
27
};
28
use Innmind\Socket\{
29
    Internet\Transport,
30
    Client\Internet as Socket,
31
};
32
use Innmind\Stream\Select;
33
use Innmind\Url\{
34
    UrlInterface,
35
    Authority\NullUserInformation,
36
};
37
use Innmind\TimeContinuum\{
38
    ElapsedPeriod,
39
    TimeContinuumInterface,
40
};
41
use Innmind\OperatingSystem\Remote;
42
use Innmind\Immutable\Str;
43
44
final class Connection implements ConnectionInterface
45
{
46
    private $transport;
47
    private $authority;
48
    private $vhost;
49
    private $protocol;
50
    private $socket;
51
    private $timeout;
52
    private $remote;
53
    private $select;
54
    private $read;
55
    private $closed = true;
56
    private $opening = true;
57
    private $maxChannels;
58
    private $maxFrameSize;
59
    private $heartbeat;
60
    private $clock;
61
    private $lastReceivedData;
62
63 98
    public function __construct(
64
        Transport $transport,
65
        UrlInterface $server,
66
        Protocol $protocol,
67
        ElapsedPeriod $timeout,
68
        TimeContinuumInterface $clock,
69
        Remote $remote
70
    ) {
71 98
        $this->transport = $transport;
72 98
        $this->authority = $server->authority();
73 98
        $this->vhost = $server->path();
74 98
        $this->protocol = $protocol;
75 98
        $this->timeout = $timeout;
76 98
        $this->remote = $remote;
77 98
        $this->buildSocket();
78 90
        $this->read = new FrameReader;
79 90
        $this->maxChannels = new MaxChannels(0);
80 90
        $this->maxFrameSize = new MaxFrameSize(0);
81 90
        $this->heartbeat = $timeout;
82 90
        $this->clock = $clock;
83 90
        $this->lastReceivedData = $clock->now();
84
85 90
        $this->open();
86 90
    }
87
88 82
    public function protocol(): Protocol
89
    {
90 82
        return $this->protocol;
91
    }
92
93 90
    public function send(Frame $frame): ConnectionInterface
94
    {
95 90
        if (!$this->maxChannels->allows($frame->channel()->toInt())) {
96
            throw new FrameChannelExceedAllowedChannelNumber(
97
                $frame->channel(),
98
                $this->maxChannels
99
            );
100
        }
101
102 90
        $frame = Str::of((string) $frame)->toEncoding('ASCII');
103
104 90
        if (!$this->maxFrameSize->allows($frame->length())) {
105
            throw new FrameExceedAllowedSize(
106
                $frame->length(),
107
                $this->maxFrameSize
108
            );
109
        }
110
111 90
        $this->socket->write($frame);
112
113 90
        return $this;
114
    }
115
116
    /**
117
     * {@inheritdoc}
118
     */
119 90
    public function wait(string ...$names): Frame
120
    {
121
        do {
122 90
            if (!$this->opening && $this->closed()) {
123
                throw new ConnectionClosed;
124
            }
125
126 90
            $now = $this->clock->now();
127 90
            $elapsedPeriod = $now->elapsedSince($this->lastReceivedData);
128
129 90
            if ($elapsedPeriod->longerThan($this->heartbeat)) {
130 15
                $this->send(Frame::heartbeat());
131
            }
132
133 90
            $streams = ($this->select)();
134 90
        } while (!$streams->get('read')->contains($this->socket));
135
136 90
        $frame = ($this->read)($this->socket, $this->protocol);
137 90
        $this->lastReceivedData = $this->clock->now();
138
139 90
        if ($frame->type() === Type::heartbeat()) {
140
            return $this->wait(...$names);
141
        }
142
143 90
        if (\count($names) === 0) {
144 32
            return $frame;
145
        }
146
147 90
        if ($frame->type() !== Type::method()) {
148
            //someone must have forgot a wait() call
149
            throw new ExpectedMethodFrame($frame->type());
150
        }
151
152 90
        foreach ($names as $name) {
153 90
            if ($frame->is($this->protocol->method($name))) {
154 90
                return $frame;
155
            }
156
        }
157
158 4
        if ($frame->is($this->protocol->method('connection.close'))) {
159 2
            $this->send($this->protocol->connection()->closeOk());
160 2
            $this->closed = true;
161
162 2
            throw ConnectionClosed::byServer(
163 2
                (string) $frame->values()->get(1)->original(),
164 2
                $frame->values()->get(0)->original()->value(),
165 2
                new Method(
166 2
                    $frame->values()->get(2)->original()->value(),
167 2
                    $frame->values()->get(3)->original()->value()
168
                )
169
            );
170
        }
171
172 2
        throw new UnexpectedFrame($frame, ...$names);
173
    }
174
175 36
    public function maxFrameSize(): MaxFrameSize
176
    {
177 36
        return $this->maxFrameSize;
178
    }
179
180 76
    public function close(): void
181
    {
182 76
        if ($this->closed()) {
183
            return;
184
        }
185
186
        $this
187 76
            ->send($this->protocol->connection()->close(new Close))
188 76
            ->wait('connection.close-ok');
189 76
        $this->socket->close();
190 76
        $this->closed = true;
191 76
    }
192
193 90
    public function closed(): bool
194
    {
195 90
        return $this->closed || $this->socket->closed();
196
    }
197
198 98
    private function buildSocket(): void
199
    {
200 98
        $this->socket = $this->remote->socket(
201 98
            $this->transport,
202 98
            $this->authority->withUserInformation(new NullUserInformation)
203
        );
204 90
        $this->select = (new Select($this->timeout))->forRead($this->socket);
205 90
    }
206
207 90
    private function open(): void
208
    {
209 90
        if (!$this->closed()) {
210
            return;
211
        }
212
213 90
        $this->start();
214 90
        $this->handshake();
215 90
        $this->openVHost();
216
217 90
        $this->closed = false;
218 90
        $this->opening = false;
219 90
    }
220
221 90
    private function start(): void
222
    {
223 90
        $this->socket->write(
224 90
            new Str((string) $this->protocol->version())
225
        );
226
227
        try {
228 90
            $frame = $this->wait('connection.start');
0 ignored issues
show
Unused Code introduced by
The assignment to $frame is dead and can be removed.
Loading history...
229 2
        } catch (NoFrameDetected $e) {
230 2
            $content = $e->content();
231
232 2
            if ((string) $content->read(4) !== 'AMQP') {
233
                throw $e;
234
            }
235
236 2
            $content->read(1); // there is a zero between AMQP and version number
237
238 2
            $this->protocol->use(
239 2
                new Version(
240 2
                    UnsignedOctet::fromStream($content)->original()->value(),
241 2
                    UnsignedOctet::fromStream($content)->original()->value(),
242 2
                    UnsignedOctet::fromStream($content)->original()->value()
243
                )
244
            );
245
            //socket rebuilt as the server close the connection on version mismatch
246 2
            $this->buildSocket();
247 2
            $this->start();
248
249 2
            return;
250
        }
251
252 90
        $this->send($this->protocol->connection()->startOk(
253 90
            new StartOk(
254 90
                $this->authority->userInformation()->user(),
255 90
                $this->authority->userInformation()->password()
256
            )
257
        ));
258 90
    }
259
260 90
    private function handshake(): void
261
    {
262 90
        $frame = $this->wait('connection.secure', 'connection.tune');
263
264 90
        if ($frame->is($this->protocol->method('connection.secure'))) {
265
            $this->send($this->protocol->connection()->secureOk(
266
                new SecureOk(
267
                    $this->authority->userInformation()->user(),
268
                    $this->authority->userInformation()->password()
269
                )
270
            ));
271
            $frame = $this->wait('connection.tune');
272
        }
273
274 90
        $this->maxChannels = new MaxChannels(
275 90
            $frame->values()->get(0)->original()->value()
276
        );
277 90
        $this->maxFrameSize = new MaxFrameSize(
278 90
            $frame->values()->get(1)->original()->value()
279
        );
280 90
        $this->heartbeat = new ElapsedPeriod(
281 90
            $frame->values()->get(2)->original()->value()
282
        );
283 90
        $this->select = (new Select($this->heartbeat))->forRead($this->socket);
284 90
        $this->send($this->protocol->connection()->tuneOk(
285 90
            new TuneOk(
286 90
                $this->maxChannels,
287 90
                $this->maxFrameSize,
288 90
                $this->heartbeat
289
            )
290
        ));
291 90
    }
292
293 90
    private function openVHost(): void
294
    {
295
        $this
296 90
            ->send($this->protocol->connection()->open(
297 90
                new Open($this->vhost)
298
            ))
299 90
            ->wait('connection.open-ok');
300 90
    }
301
}
302