GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — develop ( 44f5b4...dc3756 )
by Baptiste
03:14
created

Connection   D

Complexity

Total Complexity 32

Size/Duplication

Total Lines 264
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 32

Test Coverage

Coverage 88.49%

Importance

Changes 0
Metric Value
wmc 32
lcom 1
cbo 32
dl 0
loc 264
ccs 123
cts 139
cp 0.8849
rs 4.8
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 22 1
A protocol() 0 4 1
A send() 0 22 3
C wait() 0 55 11
A maxFrameSize() 0 4 1
A close() 0 12 2
A closed() 0 4 2
A __destruct() 0 4 1
A buildSocket() 0 8 1
A open() 0 13 2
B start() 0 42 4
B handshake() 0 32 2
A openVHost() 0 8 1
1
<?php
2
declare(strict_types = 1);
3
4
namespace Innmind\AMQP\Transport\Connection;
5
6
use Innmind\AMQP\{
7
    Transport\Connection as ConnectionInterface,
8
    Transport\Frame,
9
    Transport\Protocol,
10
    Transport\Protocol\Version,
11
    Transport\Frame\Type,
12
    Transport\Frame\Method,
13
    Transport\Frame\Value\UnsignedOctet,
14
    Model\Connection\StartOk,
15
    Model\Connection\SecureOk,
16
    Model\Connection\TuneOk,
17
    Model\Connection\Open,
18
    Model\Connection\Close,
19
    Model\Connection\MaxChannels,
20
    Model\Connection\MaxFrameSize,
21
    Exception\FrameChannelExceedAllowedChannelNumber,
22
    Exception\FrameExceedAllowedSize,
23
    Exception\UnexpectedFrame,
24
    Exception\NoFrameDetected,
25
    Exception\ConnectionClosed
26
};
27
use Innmind\Socket\{
28
    Internet\Transport,
29
    Client\Internet as Socket
30
};
31
use Innmind\Stream\Select;
32
use Innmind\Url\{
33
    UrlInterface,
34
    Authority\NullUserInformation
35
};
36
use Innmind\TimeContinuum\{
37
    ElapsedPeriod,
38
    TimeContinuumInterface
39
};
40
use Innmind\Immutable\Str;
41
42
final class Connection implements ConnectionInterface
43
{
44
    private $transport;
45
    private $authority;
46
    private $vhost;
47
    private $protocol;
48
    private $socket;
49
    private $timeout;
50
    private $select;
51
    private $read;
52
    private $closed = true;
53
    private $opening = true;
54
    private $maxChannels;
55
    private $maxFrameSize;
56
    private $heartbeat;
57
    private $clock;
58
    private $lastReceivedData;
59
60 44
    public function __construct(
61
        Transport $transport,
62
        UrlInterface $server,
63
        Protocol $protocol,
64
        ElapsedPeriod $timeout,
65
        TimeContinuumInterface $clock
66
    ) {
67 44
        $this->transport = $transport;
68 44
        $this->authority = $server->authority();
69 44
        $this->vhost = $server->path();
70 44
        $this->protocol = $protocol;
71 44
        $this->timeout = $timeout;
72 44
        $this->buildSocket();
73 44
        $this->read = new FrameReader;
74 44
        $this->maxChannels = new MaxChannels(0);
75 44
        $this->maxFrameSize = new MaxFrameSize(0);
76 44
        $this->heartbeat = $timeout;
77 44
        $this->clock = $clock;
78 44
        $this->lastReceivedData = $clock->now();
79
80 44
        $this->open();
81 44
    }
82
83 39
    public function protocol(): Protocol
84
    {
85 39
        return $this->protocol;
86
    }
87
88 44
    public function send(Frame $frame): ConnectionInterface
89
    {
90 44
        if (!$this->maxChannels->allows($frame->channel()->toInt())) {
91
            throw new FrameChannelExceedAllowedChannelNumber(
92
                $frame->channel(),
93
                $this->maxChannels
94
            );
95
        }
96
97 44
        $frame = (new Str((string) $frame))->toEncoding('ASCII');
98
99 44
        if (!$this->maxFrameSize->allows($frame->length())) {
100
            throw new FrameExceedAllowedSize(
101
                $frame->length(),
102
                $this->maxFrameSize
103
            );
104
        }
105
106 44
        $this->socket->write($frame);
107
108 44
        return $this;
109
    }
110
111
    /**
112
     * {@inheritdoc}
113
     */
114 44
    public function wait(string ...$names): Frame
115
    {
116
        do {
117 44
            if (!$this->opening && $this->closed()) {
118
                throw new ConnectionClosed;
119
            }
120
121 44
            $now = $this->clock->now();
122 44
            $elapsedPeriod = $now->elapsedSince($this->lastReceivedData);
0 ignored issues
show
Documentation introduced by
$this->lastReceivedData is of type object<Innmind\TimeConti...m\PointInTimeInterface>, but the function expects a object<self>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
123
124 44
            if ($elapsedPeriod->longerThan($this->heartbeat)) {
125 8
                $this->send(Frame::heartbeat());
126
            }
127
128 44
            $streams = ($this->select)();
129 44
        } while (!$streams->get('read')->contains($this->socket));
130
131 44
        $frame = ($this->read)($this->socket, $this->protocol);
132 44
        $this->lastReceivedData = $this->clock->now();
133
134 44
        if ($frame->type() === Type::heartbeat()) {
135
            return $this->wait(...$names);
136
        }
137
138 44
        if (count($names) === 0) {
139 16
            return $frame;
140
        }
141
142 44
        if ($frame->type() !== Type::method()) {
143
            //someone must have forgot a wait() call
144
            throw new ExpectedMethodFrame($frame->type());
145
        }
146
147 44
        foreach ($names as $name) {
148 44
            if ($frame->is($this->protocol->method($name))) {
149 44
                return $frame;
150
            }
151
        }
152
153 2
        if ($frame->is($this->protocol->method('connection.close'))) {
154 1
            $this->send($this->protocol->connection()->closeOk());
155 1
            $this->closed = true;
156
157 1
            throw ConnectionClosed::byServer(
158 1
                (string) $frame->values()->get(1)->original(),
159 1
                $frame->values()->get(0)->original()->value(),
160 1
                new Method(
161 1
                    $frame->values()->get(2)->original()->value(),
162 1
                    $frame->values()->get(3)->original()->value()
163
                )
164
            );
165
        }
166
167 1
        throw new UnexpectedFrame($frame, ...$names);
168
    }
169
170 18
    public function maxFrameSize(): MaxFrameSize
171
    {
172 18
        return $this->maxFrameSize;
173
    }
174
175 40
    public function close(): void
176
    {
177 40
        if ($this->closed()) {
178 2
            return;
179
        }
180
181
        $this
182 39
            ->send($this->protocol->connection()->close(new Close))
183 39
            ->wait('connection.close-ok');
184 39
        $this->socket->close();
185 39
        $this->closed = true;
186 39
    }
187
188 44
    public function closed(): bool
189
    {
190 44
        return $this->closed || $this->socket->closed();
191
    }
192
193 5
    public function __destruct()
194
    {
195 5
        $this->close();
196 5
    }
197
198 44
    private function buildSocket(): void
199
    {
200 44
        $this->socket = new Socket(
201 44
            $this->transport,
202 44
            $this->authority->withUserInformation(new NullUserInformation)
203
        );
204 44
        $this->select = (new Select($this->timeout))->forRead($this->socket);
205 44
    }
206
207 44
    private function open(): void
208
    {
209 44
        if (!$this->closed()) {
210
            return;
211
        }
212
213 44
        $this->start();
214 44
        $this->handshake();
215 44
        $this->openVHost();
216
217 44
        $this->closed = false;
218 44
        $this->opening = false;
219 44
    }
220
221 44
    private function start(): void
222
    {
223 44
        $this->socket->write(
224 44
            new Str((string) $this->protocol->version())
225
        );
226
227
        try {
228 44
            $frame = $this->wait('connection.start');
0 ignored issues
show
Unused Code introduced by
$frame is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
229 1
        } catch (NoFrameDetected $e) {
230 1
            $content = $e->content()->toEncoding('ASCII');
231
232
            if (
233 1
                $content->length() !== 8 ||
234 1
                !$content->matches('/^AMQP/')
235
            ) {
236
                throw $e;
237
            }
238
239
            $version = $content
240 1
                ->substring(5, 8)
241 1
                ->chunk();
242 1
            $this->protocol->use(
243 1
                new Version(
244 1
                    UnsignedOctet::fromString($version->get(0))->original()->value(),
245 1
                    UnsignedOctet::fromString($version->get(1))->original()->value(),
246 1
                    UnsignedOctet::fromString($version->get(2))->original()->value()
247
                )
248
            );
249
            //socket rebuilt as the server close the connection on version mismatch
250 1
            $this->buildSocket();
251 1
            $this->start();
252
253 1
            return;
254
        }
255
256 44
        $this->send($this->protocol->connection()->startOk(
257 44
            new StartOk(
258 44
                $this->authority->userInformation()->user(),
259 44
                $this->authority->userInformation()->password()
260
            )
261
        ));
262 44
    }
263
264 44
    private function handshake(): void
265
    {
266 44
        $frame = $this->wait('connection.secure', 'connection.tune');
267
268 44
        if ($frame->is($this->protocol->method('connection.secure'))) {
269
            $this->send($this->protocol->connection()->secureOk(
270
                new SecureOk(
271
                    $this->authority->userInformation()->user(),
272
                    $this->authority->userInformation()->password()
273
                )
274
            ));
275
            $frame = $this->wait('connection.tune');
276
        }
277
278 44
        $this->maxChannels = new MaxChannels(
279 44
            $frame->values()->get(0)->original()->value()
280
        );
281 44
        $this->maxFrameSize = new MaxFrameSize(
282 44
            $frame->values()->get(1)->original()->value()
283
        );
284 44
        $this->heartbeat = new ElapsedPeriod(
285 44
            $frame->values()->get(2)->original()->value()
286
        );
287 44
        $this->select = (new Select($this->heartbeat))->forRead($this->socket);
288 44
        $this->send($this->protocol->connection()->tuneOk(
289 44
            new TuneOk(
290 44
                $this->maxChannels,
291 44
                $this->maxFrameSize,
292 44
                $this->heartbeat
293
            )
294
        ));
295 44
    }
296
297 44
    private function openVHost(): void
298
    {
299
        $this
300 44
            ->send($this->protocol->connection()->open(
301 44
                new Open($this->vhost)
302
            ))
303 44
            ->wait('connection.open-ok');
304 44
    }
305
}
306