GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — develop ( e4addc...44f5b4 )
by Baptiste
23:22
created

Connection::start()   B

Complexity

Conditions 4
Paths 3

Size

Total Lines 42
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 23
CRAP Score 4.0011

Importance

Changes 0
Metric Value
dl 0
loc 42
ccs 23
cts 24
cp 0.9583
rs 8.5806
c 0
b 0
f 0
cc 4
eloc 26
nc 3
nop 0
crap 4.0011
1
<?php
2
declare(strict_types = 1);
3
4
namespace Innmind\AMQP\Transport\Connection;
5
6
use Innmind\AMQP\{
7
    Transport\Connection as ConnectionInterface,
8
    Transport\Frame,
9
    Transport\Protocol,
10
    Transport\Protocol\Version,
11
    Transport\Frame\Type,
12
    Transport\Frame\Method,
13
    Transport\Frame\Value\UnsignedOctet,
14
    Model\Connection\StartOk,
15
    Model\Connection\SecureOk,
16
    Model\Connection\TuneOk,
17
    Model\Connection\Open,
18
    Model\Connection\Close,
19
    Model\Connection\MaxChannels,
20
    Model\Connection\MaxFrameSize,
21
    Exception\FrameChannelExceedAllowedChannelNumber,
22
    Exception\FrameExceedAllowedSize,
23
    Exception\UnexpectedFrame,
24
    Exception\NoFrameDetected,
25
    Exception\ConnectionClosed
26
};
27
use Innmind\Socket\{
28
    Internet\Transport,
29
    Client\Internet as Socket
30
};
31
use Innmind\Stream\Select;
32
use Innmind\Url\{
33
    UrlInterface,
34
    Authority\NullUserInformation
35
};
36
use Innmind\TimeContinuum\{
37
    ElapsedPeriod,
38
    TimeContinuumInterface
39
};
40
use Innmind\Immutable\Str;
41
42
final class Connection implements ConnectionInterface
43
{
44
    private $transport;
45
    private $authority;
46
    private $vhost;
47
    private $protocol;
48
    private $socket;
49
    private $timeout;
50
    private $select;
51
    private $read;
52
    private $closed = true;
53
    private $maxChannels;
54
    private $maxFrameSize;
55
    private $heartbeat;
56
    private $clock;
57
    private $lastReceivedData;
58
59 44
    public function __construct(
60
        Transport $transport,
61
        UrlInterface $server,
62
        Protocol $protocol,
63
        ElapsedPeriod $timeout,
64
        TimeContinuumInterface $clock
65
    ) {
66 44
        $this->transport = $transport;
67 44
        $this->authority = $server->authority();
68 44
        $this->vhost = $server->path();
69 44
        $this->protocol = $protocol;
70 44
        $this->timeout = $timeout;
71 44
        $this->buildSocket();
72 44
        $this->read = new FrameReader;
73 44
        $this->maxChannels = new MaxChannels(0);
74 44
        $this->maxFrameSize = new MaxFrameSize(0);
75 44
        $this->heartbeat = $timeout;
76 44
        $this->clock = $clock;
77 44
        $this->lastReceivedData = $clock->now();
78
79 44
        $this->open();
80 44
    }
81
82 39
    public function protocol(): Protocol
83
    {
84 39
        return $this->protocol;
85
    }
86
87 44
    public function send(Frame $frame): ConnectionInterface
88
    {
89 44
        if (!$this->maxChannels->allows($frame->channel()->toInt())) {
90
            throw new FrameChannelExceedAllowedChannelNumber(
91
                $frame->channel(),
92
                $this->maxChannels
93
            );
94
        }
95
96 44
        $frame = (new Str((string) $frame))->toEncoding('ASCII');
97
98 44
        if (!$this->maxFrameSize->allows($frame->length())) {
99
            throw new FrameExceedAllowedSize(
100
                $frame->length(),
101
                $this->maxFrameSize
102
            );
103
        }
104
105 44
        $this->socket->write($frame);
106
107 44
        return $this;
108
    }
109
110
    /**
111
     * {@inheritdoc}
112
     */
113 44
    public function wait(string ...$names): Frame
114
    {
115
        do {
116 44
            $now = $this->clock->now();
117 44
            $elapsedPeriod = $now->elapsedSince($this->lastReceivedData);
0 ignored issues
show
Documentation introduced by
$this->lastReceivedData is of type object<Innmind\TimeConti...m\PointInTimeInterface>, but the function expects a object<self>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
118
119 44
            if ($elapsedPeriod->longerThan($this->heartbeat)) {
120 10
                $this->send(Frame::heartbeat());
121
            }
122
123 44
            $streams = ($this->select)();
124 44
        } while (!$streams->get('read')->contains($this->socket));
125
126 44
        $frame = ($this->read)($this->socket, $this->protocol);
127 44
        $this->lastReceivedData = $this->clock->now();
128
129 44
        if ($frame->type() === Type::heartbeat()) {
130
            return $this->wait(...$names);
131
        }
132
133 44
        if (count($names) === 0) {
134 16
            return $frame;
135
        }
136
137 44
        if ($frame->type() !== Type::method()) {
138
            //someone must have forgot a wait() call
139
            throw new ExpectedMethodFrame($frame->type());
140
        }
141
142 44
        foreach ($names as $name) {
143 44
            if ($this->protocol->method($name)->equals($frame->method())) {
144 44
                return $frame;
145
            }
146
        }
147
148 2
        if ($this->protocol->method('connection.close')->equals($frame->method())) {
149 1
            $this->send($this->protocol->connection()->closeOk());
150 1
            $this->closed = true;
151
152 1
            throw new ConnectionClosed(
153 1
                (string) $frame->values()->get(1)->original(),
154 1
                $frame->values()->get(0)->original()->value(),
155 1
                new Method(
156 1
                    $frame->values()->get(2)->original()->value(),
157 1
                    $frame->values()->get(3)->original()->value()
158
                )
159
            );
160
        }
161
162 1
        throw new UnexpectedFrame($frame->method(), ...$names);
163
    }
164
165 18
    public function maxFrameSize(): MaxFrameSize
166
    {
167 18
        return $this->maxFrameSize;
168
    }
169
170 40
    public function close(): void
171
    {
172 40
        if ($this->closed()) {
173 2
            return;
174
        }
175
176
        $this
177 39
            ->send($this->protocol->connection()->close(new Close))
178 39
            ->wait('connection.close-ok');
179 39
        $this->socket->close();
180 39
        $this->closed = true;
181 39
    }
182
183 44
    public function closed(): bool
184
    {
185 44
        return $this->closed || $this->socket->closed();
186
    }
187
188 5
    public function __destruct()
189
    {
190 5
        $this->close();
191 5
    }
192
193 44
    private function buildSocket(): void
194
    {
195 44
        $this->socket = new Socket(
196 44
            $this->transport,
197 44
            $this->authority->withUserInformation(new NullUserInformation)
198
        );
199 44
        $this->select = (new Select($this->timeout))->forRead($this->socket);
200 44
    }
201
202 44
    private function open(): void
203
    {
204 44
        if (!$this->closed()) {
205
            return;
206
        }
207
208 44
        $this->start();
209 44
        $this->handshake();
210 44
        $this->openVHost();
211
212 44
        $this->closed = false;
213 44
    }
214
215 44
    private function start(): void
216
    {
217 44
        $this->socket->write(
218 44
            new Str((string) $this->protocol->version())
219
        );
220
221
        try {
222 44
            $frame = $this->wait('connection.start');
0 ignored issues
show
Unused Code introduced by
$frame is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
223 1
        } catch (NoFrameDetected $e) {
224 1
            $content = $e->content()->toEncoding('ASCII');
225
226
            if (
227 1
                $content->length() !== 8 ||
228 1
                !$content->matches('/^AMQP/')
229
            ) {
230
                throw $e;
231
            }
232
233
            $version = $content
234 1
                ->substring(5, 8)
235 1
                ->chunk();
236 1
            $this->protocol->use(
237 1
                new Version(
238 1
                    UnsignedOctet::fromString($version->get(0))->original()->value(),
239 1
                    UnsignedOctet::fromString($version->get(1))->original()->value(),
240 1
                    UnsignedOctet::fromString($version->get(2))->original()->value()
241
                )
242
            );
243
            //socket rebuilt as the server close the connection on version mismatch
244 1
            $this->buildSocket();
245 1
            $this->start();
246
247 1
            return;
248
        }
249
250 44
        $this->send($this->protocol->connection()->startOk(
251 44
            new StartOk(
252 44
                $this->authority->userInformation()->user(),
253 44
                $this->authority->userInformation()->password()
254
            )
255
        ));
256 44
    }
257
258 44
    private function handshake(): void
259
    {
260 44
        $frame = $this->wait('connection.secure', 'connection.tune');
261
262 44
        if ($this->protocol->method('connection.secure')->equals($frame->method())) {
263
            $this->send($this->protocol->connection()->secureOk(
264
                new SecureOk(
265
                    $this->authority->userInformation()->user(),
266
                    $this->authority->userInformation()->password()
267
                )
268
            ));
269
            $frame = $this->wait('connection.tune');
270
        }
271
272 44
        $this->maxChannels = new MaxChannels(
273 44
            $frame->values()->get(0)->original()->value()
274
        );
275 44
        $this->maxFrameSize = new MaxFrameSize(
276 44
            $frame->values()->get(1)->original()->value()
277
        );
278 44
        $this->heartbeat = new ElapsedPeriod(
279 44
            $frame->values()->get(2)->original()->value()
280
        );
281 44
        $this->select = (new Select($this->heartbeat))->forRead($this->socket);
282 44
        $this->send($this->protocol->connection()->tuneOk(
283 44
            new TuneOk(
284 44
                $this->maxChannels,
285 44
                $this->maxFrameSize,
286 44
                $this->heartbeat
287
            )
288
        ));
289 44
    }
290
291 44
    private function openVHost(): void
292
    {
293
        $this
294 44
            ->send($this->protocol->connection()->open(
295 44
                new Open($this->vhost)
296
            ))
297 44
            ->wait('connection.open-ok');
298 44
    }
299
}
300