GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — develop ( 081f57...404db9 )
by Baptiste
12s queued 11s
created

Connection::start()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 35
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 20
CRAP Score 3.0009

Importance

Changes 0
Metric Value
cc 3
eloc 21
nc 3
nop 0
dl 0
loc 35
ccs 20
cts 21
cp 0.9524
crap 3.0009
rs 9.584
c 0
b 0
f 0
1
<?php
2
declare(strict_types = 1);
3
4
namespace Innmind\AMQP\Transport\Connection;
5
6
use Innmind\AMQP\{
7
    Transport\Connection as ConnectionInterface,
8
    Transport\Frame,
9
    Transport\Protocol,
10
    Transport\Protocol\Version,
11
    Transport\Frame\Type,
12
    Transport\Frame\Method,
13
    Transport\Frame\Value\UnsignedOctet,
14
    Model\Connection\StartOk,
15
    Model\Connection\SecureOk,
16
    Model\Connection\TuneOk,
17
    Model\Connection\Open,
18
    Model\Connection\Close,
19
    Model\Connection\MaxChannels,
20
    Model\Connection\MaxFrameSize,
21
    Exception\FrameChannelExceedAllowedChannelNumber,
22
    Exception\FrameExceedAllowedSize,
23
    Exception\UnexpectedFrame,
24
    Exception\NoFrameDetected,
25
    Exception\ConnectionClosed,
26
    Exception\ExpectedMethodFrame
27
};
28
use Innmind\Socket\{
29
    Internet\Transport,
30
    Client\Internet as Socket
31
};
32
use Innmind\Stream\Select;
33
use Innmind\Url\{
34
    UrlInterface,
35
    Authority\NullUserInformation
36
};
37
use Innmind\TimeContinuum\{
38
    ElapsedPeriod,
39
    TimeContinuumInterface
40
};
41
use Innmind\Immutable\Str;
42
43
final class Connection implements ConnectionInterface
44
{
45
    private $transport;
46
    private $authority;
47
    private $vhost;
48
    private $protocol;
49
    private $socket;
50
    private $timeout;
51
    private $select;
52
    private $read;
53
    private $closed = true;
54
    private $opening = true;
55
    private $maxChannels;
56
    private $maxFrameSize;
57
    private $heartbeat;
58
    private $clock;
59
    private $lastReceivedData;
60
61 49
    public function __construct(
62
        Transport $transport,
63
        UrlInterface $server,
64
        Protocol $protocol,
65
        ElapsedPeriod $timeout,
66
        TimeContinuumInterface $clock
67
    ) {
68 49
        $this->transport = $transport;
69 49
        $this->authority = $server->authority();
70 49
        $this->vhost = $server->path();
71 49
        $this->protocol = $protocol;
72 49
        $this->timeout = $timeout;
73 49
        $this->buildSocket();
74 45
        $this->read = new FrameReader;
75 45
        $this->maxChannels = new MaxChannels(0);
76 45
        $this->maxFrameSize = new MaxFrameSize(0);
77 45
        $this->heartbeat = $timeout;
78 45
        $this->clock = $clock;
79 45
        $this->lastReceivedData = $clock->now();
80
81 45
        $this->open();
82 45
    }
83
84 41
    public function protocol(): Protocol
85
    {
86 41
        return $this->protocol;
87
    }
88
89 45
    public function send(Frame $frame): ConnectionInterface
90
    {
91 45
        if (!$this->maxChannels->allows($frame->channel()->toInt())) {
92
            throw new FrameChannelExceedAllowedChannelNumber(
93
                $frame->channel(),
94
                $this->maxChannels
95
            );
96
        }
97
98 45
        $frame = (new Str((string) $frame))->toEncoding('ASCII');
99
100 45
        if (!$this->maxFrameSize->allows($frame->length())) {
101
            throw new FrameExceedAllowedSize(
102
                $frame->length(),
103
                $this->maxFrameSize
104
            );
105
        }
106
107 45
        $this->socket->write($frame);
108
109 45
        return $this;
110
    }
111
112
    /**
113
     * {@inheritdoc}
114
     */
115 45
    public function wait(string ...$names): Frame
116
    {
117
        do {
118 45
            if (!$this->opening && $this->closed()) {
119
                throw new ConnectionClosed;
120
            }
121
122 45
            $now = $this->clock->now();
123 45
            $elapsedPeriod = $now->elapsedSince($this->lastReceivedData);
124
125 45
            if ($elapsedPeriod->longerThan($this->heartbeat)) {
126 1
                $this->send(Frame::heartbeat());
127
            }
128
129 45
            $streams = ($this->select)();
130 45
        } while (!$streams->get('read')->contains($this->socket));
131
132 45
        $frame = ($this->read)($this->socket, $this->protocol);
133 45
        $this->lastReceivedData = $this->clock->now();
134
135 45
        if ($frame->type() === Type::heartbeat()) {
136
            return $this->wait(...$names);
137
        }
138
139 45
        if (count($names) === 0) {
140 16
            return $frame;
141
        }
142
143 45
        if ($frame->type() !== Type::method()) {
144
            //someone must have forgot a wait() call
145
            throw new ExpectedMethodFrame($frame->type());
146
        }
147
148 45
        foreach ($names as $name) {
149 45
            if ($frame->is($this->protocol->method($name))) {
150 45
                return $frame;
151
            }
152
        }
153
154 2
        if ($frame->is($this->protocol->method('connection.close'))) {
155 1
            $this->send($this->protocol->connection()->closeOk());
156 1
            $this->closed = true;
157
158 1
            throw ConnectionClosed::byServer(
159 1
                (string) $frame->values()->get(1)->original(),
160 1
                $frame->values()->get(0)->original()->value(),
161 1
                new Method(
162 1
                    $frame->values()->get(2)->original()->value(),
163 1
                    $frame->values()->get(3)->original()->value()
164
                )
165
            );
166
        }
167
168 1
        throw new UnexpectedFrame($frame, ...$names);
169
    }
170
171 18
    public function maxFrameSize(): MaxFrameSize
172
    {
173 18
        return $this->maxFrameSize;
174
    }
175
176 38
    public function close(): void
177
    {
178 38
        if ($this->closed()) {
179
            return;
180
        }
181
182
        $this
183 38
            ->send($this->protocol->connection()->close(new Close))
184 38
            ->wait('connection.close-ok');
185 38
        $this->socket->close();
186 38
        $this->closed = true;
187 38
    }
188
189 45
    public function closed(): bool
190
    {
191 45
        return $this->closed || $this->socket->closed();
192
    }
193
194 49
    private function buildSocket(): void
195
    {
196 49
        $this->socket = new Socket(
197 49
            $this->transport,
198 49
            $this->authority->withUserInformation(new NullUserInformation)
199
        );
200 45
        $this->select = (new Select($this->timeout))->forRead($this->socket);
201 45
    }
202
203 45
    private function open(): void
204
    {
205 45
        if (!$this->closed()) {
206
            return;
207
        }
208
209 45
        $this->start();
210 45
        $this->handshake();
211 45
        $this->openVHost();
212
213 45
        $this->closed = false;
214 45
        $this->opening = false;
215 45
    }
216
217 45
    private function start(): void
218
    {
219 45
        $this->socket->write(
220 45
            new Str((string) $this->protocol->version())
221
        );
222
223
        try {
224 45
            $frame = $this->wait('connection.start');
0 ignored issues
show
Unused Code introduced by
The assignment to $frame is dead and can be removed.
Loading history...
225 1
        } catch (NoFrameDetected $e) {
226 1
            $content = $e->content();
227
228 1
            if ((string) $content->read(4) !== 'AMQP') {
229
                throw $e;
230
            }
231
232 1
            $content->read(1); // there is a zero between AMQP and version number
233
234 1
            $this->protocol->use(
235 1
                new Version(
236 1
                    UnsignedOctet::fromStream($content)->original()->value(),
237 1
                    UnsignedOctet::fromStream($content)->original()->value(),
238 1
                    UnsignedOctet::fromStream($content)->original()->value()
239
                )
240
            );
241
            //socket rebuilt as the server close the connection on version mismatch
242 1
            $this->buildSocket();
243 1
            $this->start();
244
245 1
            return;
246
        }
247
248 45
        $this->send($this->protocol->connection()->startOk(
249 45
            new StartOk(
250 45
                $this->authority->userInformation()->user(),
251 45
                $this->authority->userInformation()->password()
252
            )
253
        ));
254 45
    }
255
256 45
    private function handshake(): void
257
    {
258 45
        $frame = $this->wait('connection.secure', 'connection.tune');
259
260 45
        if ($frame->is($this->protocol->method('connection.secure'))) {
261
            $this->send($this->protocol->connection()->secureOk(
262
                new SecureOk(
263
                    $this->authority->userInformation()->user(),
264
                    $this->authority->userInformation()->password()
265
                )
266
            ));
267
            $frame = $this->wait('connection.tune');
268
        }
269
270 45
        $this->maxChannels = new MaxChannels(
271 45
            $frame->values()->get(0)->original()->value()
272
        );
273 45
        $this->maxFrameSize = new MaxFrameSize(
274 45
            $frame->values()->get(1)->original()->value()
275
        );
276 45
        $this->heartbeat = new ElapsedPeriod(
277 45
            $frame->values()->get(2)->original()->value()
278
        );
279 45
        $this->select = (new Select($this->heartbeat))->forRead($this->socket);
280 45
        $this->send($this->protocol->connection()->tuneOk(
281 45
            new TuneOk(
282 45
                $this->maxChannels,
283 45
                $this->maxFrameSize,
284 45
                $this->heartbeat
285
            )
286
        ));
287 45
    }
288
289 45
    private function openVHost(): void
290
    {
291
        $this
292 45
            ->send($this->protocol->connection()->open(
293 45
                new Open($this->vhost)
294
            ))
295 45
            ->wait('connection.open-ok');
296 45
    }
297
}
298