Completed
Pull Request — master (#286)
by
unknown
03:20
created

Connection   C

Complexity

Total Complexity 51

Size/Duplication

Total Lines 450
Duplicated Lines 4.89 %

Coupling/Cohesion

Components 1
Dependencies 16

Importance

Changes 0
Metric Value
dl 22
loc 450
rs 6.0708
c 0
b 0
f 0
wmc 51
lcom 1
cbo 16

13 Methods

Rating   Name   Duplication   Size   Complexity  
A init() 0 15 1
A onReady() 0 10 2
D onRead() 0 175 31
A getChannel() 0 11 3
A openChannel() 0 4 1
B findChannelId() 22 30 5
A command() 0 8 2
A addChannel() 0 6 1
A getFeatures() 0 4 1
A getConnectionOptions() 0 4 1
A getMaximumChannelCount() 0 4 1
A getMaximumFrameSize() 0 4 1
A isHandshaked() 0 4 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like Connection often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Connection, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace PHPDaemon\Clients\AMQP;
4
5
use PHPDaemon\Clients\AMQP\Driver\ConnectionOptions;
6
use PHPDaemon\Clients\AMQP\Driver\Exception\AMQPConnectionException;
7
use PHPDaemon\Clients\AMQP\Driver\Features;
8
use PHPDaemon\Clients\AMQP\Driver\PackageInfo;
9
use PHPDaemon\Clients\AMQP\Driver\Protocol\CommandInterface;
10
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Parser\Frame;
11
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Parser\Frame as FrameParser;
12
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Parser\Table as TableParser;
13
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Connection\ConnectionCloseFrame;
14
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Connection\ConnectionOpenFrame;
15
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Connection\ConnectionOpenOkFrame;
16
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Connection\ConnectionStartFrame;
17
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Connection\ConnectionStartOkFrame;
18
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Connection\ConnectionTuneFrame;
19
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\Connection\ConnectionTuneOkFrame;
20
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\HeartbeatFrame;
21
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Protocol\OutgoingFrame;
22
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Serializer\Frame as FrameSerializer;
23
use PHPDaemon\Clients\AMQP\Driver\Protocol\v091\Serializer\Table as TableSerializer;
24
use PHPDaemon\Network\ClientConnection;
25
use PHPDaemon\Utils\Binary;
26
27
/**
28
 * Class Connection
29
 * @author Aleksey I. Kuleshov YOU GLOBAL LIMITED
30
 * @package PHPDaemon\Clients\AMQP
31
 */
32
class Connection extends ClientConnection implements CommandInterface
33
{
34
35
    /**
36
     * The AMQP protocol header is sent by the client before any frame-based
37
     * communication takes place.  It is the only data transferred that is not
38
     * a frame.
39
     */
40
    const PROTOCOL_HEADER = "AMQP\x00\x00\x09\x01";
41
42
    /**
43
     * The maximum number of channels.
44
     *
45
     * AMQP channel ID is 2 bytes, but zero is reserved for connection-level
46
     * communication.
47
     */
48
    const MAXIMUM_CHANNELS = 0xffff - 1;
49
50
    /**
51
     * The maximum frame size the client supports.
52
     *
53
     * Note: RabbitMQ's default is 0x20000 (128 KB), our limit is higher to
54
     * allow for some server-side configurability.
55
     */
56
    const MAXIMUM_FRAME_SIZE = 0x80000; // 512 KB
57
58
    /**
59
     * The broker sends channelMax of zero in the tune frame if it does not
60
     * impose a channel limit.
61
     */
62
    const UNLIMITED_CHANNELS = 0;
63
64
    /**
65
     * The broker sends frameMax of zero in the tune frame if it does not impose
66
     * a frame size limit.
67
     */
68
    const UNLIMITED_FRAME_SIZE = 0;
69
70
    /**
71
     * The broker sends a heartbeat of zero in the tune frame if it does not use
72
     * heartbeats.
73
     */
74
    const HEARTBEAT_DISABLED = 0;
75
76
    /**
77
     * Event raised when protocol handshake ready
78
     */
79
    const EVENT_ON_HANDSHAKE = 'event.amqp.connection.handshake';
80
81
    /**
82
     * Event raised when connection close frame incoming
83
     */
84
85
    const EVENT_ON_CONNECTION_CLOSE = 'event.amqp.connection.close';
86
87
    /**
88
     * @var FrameParser
89
     */
90
    protected $parser;
91
92
    /**
93
     * @var FrameSerializer
94
     */
95
    protected $serializer;
96
97
    /**
98
     * @var ConnectionOptions
99
     */
100
    protected $connectionOptions;
101
102
    /**
103
     * @var bool
104
     */
105
    protected $isHandshaked = false;
106
107
    /**
108
     * The broker's supported features.
109
     * @var Features
110
     */
111
    private $features;
112
113
    /**
114
     * @var int
115
     */
116
    private $maximumChannelCount;
117
118
    /**
119
     * @var int
120
     */
121
    private $maximumFrameSize;
122
123
    /**
124
     * @var array
125
     */
126
    private $channels = [];
127
128
    /**
129
     * @var int
130
     */
131
    private $nextChannelId = 1;
132
133
    /**
134
     * @var bool
135
     */
136
    private $debug = false;
137
138
    /**
139
     *
140
     */
141
    protected function init()
142
    {
143
        $this->parser = new FrameParser(new TableParser());
144
        $this->serializer = new FrameSerializer(new TableSerializer());
145
146
        $this->connectionOptions = new ConnectionOptions(
147
            $this->pool->config->host->value,
148
            $this->pool->config->port->value,
149
            $this->pool->config->username->value,
150
            $this->pool->config->password->value,
151
            $this->pool->config->vhost->value
152
        );
153
154
        $this->debug = isset($this->pool->config->debug);
155
    }
156
157
    /**
158
     *
159
     */
160
    public function onReady()
161
    {
162
        if ($this->isHandshaked) {
163
            parent::onReady();
164
        }
165
166
        $this->write(self::PROTOCOL_HEADER);
167
168
        parent::onReady();
169
    }
170
171
    /**
172
     *
173
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
174
     * @throws \PHPDaemon\Clients\AMQP\Driver\Exception\AMQPConnectionException
175
     * @throws \InvalidArgumentException
176
     */
177
    protected function onRead()
178
    {
179
        if ($this->getInputLength() <= 0) {
180
            return;
181
        }
182
        // set busy for connection
183
        $this->busy = true;
184
185
        /**
186
         * 1) read Frame::HEADER_SIZE
187
         * 2) get payload length from header
188
         * 3) concatenate header and payload + 1 byte (for Constants::FRAME_END) into buffer
189
         * 4) parse buffer
190
         * 5) do stuff...
191
         * 6) if bev contains more data go to 1
192
         */
193
        frame:
194
        $header = $this->readExact(Frame::HEADER_SIZE);
195
        $framePayloadSize = Binary::b2i(substr($header, Frame::HEADER_TYPE_SIZE + Frame::HEADER_CHANNEL_SIZE, Frame::HEADER_PAYLOAD_LENGTH_SIZE));
0 ignored issues
show
Coding Style introduced by
This line exceeds maximum limit of 120 characters; contains 146 characters

Overly long lines are hard to read on any screen. Most code styles therefor impose a maximum limit on the number of characters in a line.

Loading history...
196
197
        $buffer = $header . $this->readExact($framePayloadSize + 1);
198
199
        $frame = $this->parser->feed($buffer);
200
        if ($frame === null) {
201
            return;
202
        }
203
        if ($this->debug) {
204
            $this->log(sprintf('[AMQP] %s packet received', get_class($frame)));
205
        }
206
207
        if (!$this->isHandshaked) {
208
            switch (true) {
209
                case $frame instanceof ConnectionStartFrame:
210
                    if ($frame->versionMajor !== 0 || $frame->versionMinor !== 9) {
211
                        throw AMQPConnectionException::handshakeFailed(
212
                            $this->connectionOptions,
213
                            sprintf(
214
                                'the broker reported an unexpected AMQP version (v%d.%d)',
215
                                $frame->versionMajor,
216
                                $frame->versionMinor
217
                            )
218
                        );
219
                    }
220
                    if (!preg_match('/\bAMQPLAIN\b/', $frame->mechanisms)) {
221
                        throw AMQPConnectionException::handshakeFailed(
222
                            $this->connectionOptions,
223
                            'the AMQPLAIN authentication mechanism is not supported'
224
                        );
225
                    }
226
227
                    $this->features = new Features();
228
                    $properties = $frame->serverProperties;
229
                    if (isset($properties['product']) && 'RabbitMQ' === $properties['product']) {
230
                        $this->features->qosSizeLimits = false;
231
                    }
232
                    if (array_key_exists('capabilities', $properties)) {
233
                        if (array_key_exists('per_consumer_qos', $properties['capabilities'])) {
234
                            $this->features->perConsumerQos = (bool)$properties['capabilities']['per_consumer_qos'];
235
                        }
236
                        if (array_key_exists('exchange_exchange_bindings', $properties['capabilities'])) {
237
                            $this->features->exchangeToExchangeBindings = (bool)$properties['capabilities']['exchange_exchange_bindings'];
0 ignored issues
show
Coding Style introduced by
This line exceeds maximum limit of 120 characters; contains 138 characters

Overly long lines are hard to read on any screen. Most code styles therefor impose a maximum limit on the number of characters in a line.

Loading history...
238
                        }
239
                    }
240
241
                    // Serialize credentials in "AMQPLAIN" format, which is essentially an
242
                    // AMQP table without the 4-byte size header ...
243
                    $user = $this->connectionOptions->getUsername();
244
                    $pass = $this->connectionOptions->getPassword();
245
246
                    $credentials = "\x05LOGINS" . pack('N', strlen($user)) . $user
247
                        . "\x08PASSWORDS" . pack('N', strlen($pass)) . $pass;
248
249
                    $this->command(ConnectionStartOkFrame::create(
250
                        [
251
                            'product' => $this->connectionOptions->getProductName(),
252
                            'version' => $this->connectionOptions->getProductVersion(),
253
                            'platform' => PackageInfo::AMQP_PLATFORM,
254
                            'copyright' => PackageInfo::AMQP_COPYRIGHT,
255
                            'information' => PackageInfo::AMQP_INFORMATION,
256
257
                        ],
258
                        'AMQPLAIN',
259
                        $credentials
260
                    ));
261
                    break;
262
                case $frame instanceof ConnectionTuneFrame:
263
                    $this->maximumChannelCount = self::MAXIMUM_CHANNELS;
264
                    if ($frame->channelMax === self::UNLIMITED_CHANNELS) {
265
                        $this->maximumChannelCount = self::MAXIMUM_CHANNELS;
266
                    } elseif ($frame->channelMax < self::MAXIMUM_CHANNELS) {
267
                        $this->maximumChannelCount = $frame->channelMax;
268
                    }
269
270
                    $this->maximumFrameSize = self::MAXIMUM_FRAME_SIZE;
271
                    if ($frame->frameMax === self::UNLIMITED_FRAME_SIZE) {
272
                        $this->maximumFrameSize = self::MAXIMUM_FRAME_SIZE;
273
                    } elseif ($frame->frameMax < self::MAXIMUM_FRAME_SIZE) {
274
                        $this->maximumFrameSize = $frame->frameMax;
275
                    }
276
277
                    $heartbeatInterval = 0;
278
                    if (!self::HEARTBEAT_DISABLED) {
279
                        $heartbeatInterval = $this->connectionOptions->getHeartbeatInterval();
280
                        if (null === $heartbeatInterval) {
281
                            $heartbeatInterval = $frame->heartbeat;
282
                        } elseif ($frame->heartbeat < $heartbeatInterval) {
283
                            $heartbeatInterval = $frame->heartbeat;
284
                        }
285
                    }
286
287
                    $outputFrame = ConnectionTuneOkFrame::create(
288
                        $this->maximumChannelCount,
289
                        $this->maximumFrameSize,
290
                        $heartbeatInterval
291
                    );
292
293
                    if ($outputFrame->heartbeat > 0) {
294
                        /**
295
                         * We need to set timeout value = ConnectionTuneFrame::heartbeat + 5 sec
296
                         */
297
                        $timeout = $outputFrame->heartbeat + 5;
298
                        $this->setTimeout($timeout);
299
                        $this->connectionOptions->setHeartbeatInterval($outputFrame->heartbeat);
300
                        $this->connectionOptions->setConnectionTimeout($timeout);
301
                    }
302
303
                    $this->command($outputFrame);
304
305
                    $outputFrame = ConnectionOpenFrame::create(
306
                        $this->connectionOptions->getVhost()
307
                    );
308
                    $this->command($outputFrame);
309
                    break;
310
311
                case $frame instanceof ConnectionOpenOkFrame:
312
                    $this->isHandshaked = true;
313
                    $this->openChannel(function ($channel) {
314
                        $this->trigger(self::EVENT_ON_HANDSHAKE, $channel);
315
                    });
316
                    break;
317
            }
318
319
            $this->checkFree();
320
            return;
321
        }
322
323
        switch (true) {
324
            case $frame instanceof HeartbeatFrame:
325
                $this->command($frame);
326
                break;
327
            case $frame instanceof ConnectionCloseFrame:
328
                $this->trigger(self::EVENT_ON_CONNECTION_CLOSE, $frame->replyCode, $frame->replyText);
329
                $this->close();
330
                return;
331
                break;
0 ignored issues
show
Unused Code introduced by
break is not strictly necessary here and could be removed.

The break statement is not necessary if it is preceded for example by a return statement:

switch ($x) {
    case 1:
        return 'foo';
        break; // This break is not necessary and can be left off.
}

If you would like to keep this construct to be consistent with other case statements, you can safely mark this issue as a false-positive.

Loading history...
332
            default:
333
                if (isset($frame->frameChannelId)
334
                    && $frame->frameChannelId > 0
335
                    && array_key_exists($frame->frameChannelId, $this->channels)) {
336
                    /** @var Channel $channel */
337
                    $channel = $this->channels[$frame->frameChannelId];
338
                    $channel->trigger(get_class($frame), $frame);
339
                    break; // exit
340
                }
341
342
                $this->trigger(get_class($frame), $frame);
343
                break;
344
        }
345
346
        if ($this->bev && $this->getInputLength() > 0) {
347
            goto frame;
348
        }
349
350
        $this->checkFree();
351
    }
352
353
    /**
354
     * @param int $id
355
     * @param callable $callback
356
     * @throws \InvalidArgumentException
357
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
358
     * @throws \PHPDaemon\Clients\AMQP\Driver\Exception\AMQPConnectionException
359
     */
360
    public function getChannel(callable $callback, $id = 1)
361
    {
362
        if (count($this->channels) === 0) {
363
            $this->openChannel($callback);
364
            return;
365
        }
366
367
        if (is_callable($callback)) {
368
            $callback($this->channels[$id]);
369
        }
370
    }
371
372
    /**
373
     * @param callable|null $callback
374
     * @throws \InvalidArgumentException
375
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
376
     * @throws \PHPDaemon\Clients\AMQP\Driver\Exception\AMQPConnectionException
377
     */
378
    public function openChannel(callable $callback = null)
379
    {
380
        new Channel($this, $callback);
381
    }
382
383
    /**
384
     * @return int
385
     * @throws \PHPDaemon\Clients\AMQP\Driver\Exception\AMQPConnectionException
386
     */
387
    public function findChannelId()
388
    {
389
        // first check in range [next, max] ...
390 View Code Duplication
        for (
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
391
            $channelId = $this->nextChannelId;
392
            $channelId <= $this->maximumChannelCount;
393
            ++$channelId
394
        ) {
395
            if (!isset($this->channels[$channelId])) {
396
                $this->nextChannelId = $channelId + 1;
397
398
                return $channelId;
399
            }
400
        }
401
402
        // then check in range [min, next) ...
403 View Code Duplication
        for (
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
404
            $channelId = 1;
405
            $channelId < $this->nextChannelId;
406
            ++$channelId
407
        ) {
408
            if (!isset($this->channels[$channelId])) {
409
                $this->nextChannelId = $channelId + 1;
410
411
                return $channelId;
412
            }
413
        }
414
415
        throw new AMQPConnectionException('No available channels');
416
    }
417
418
    /**
419
     * @param OutgoingFrame $frame
420
     * @param callable|null $callback
421
     * @return bool
422
     * @throws \PHPDaemon\Clients\AMQP\Driver\Protocol\Exception\AMQPProtocolException
423
     * @throws \InvalidArgumentException
424
     */
425
    public function command(OutgoingFrame $frame, callable $callback = null)
426
    {
427
        if ($callback) {
428
            $this->onResponse($callback);
429
        }
430
        $serializedFrame = $this->serializer->serialize($frame);
431
        return $this->write($serializedFrame);
432
    }
433
434
435
    public function addChannel($id, Channel $channel)
436
    {
437
        $this->channels[$id] = $channel;
438
        $this->nextChannelId = max(array_keys($this->channels)) + 1;
439
        return $this;
440
    }
441
442
    /**
443
     * @return Features
444
     */
445
    public function getFeatures()
446
    {
447
        return $this->features;
448
    }
449
450
    /**
451
     * @return ConnectionOptions
452
     */
453
    public function getConnectionOptions()
454
    {
455
        return $this->connectionOptions;
456
    }
457
458
    /**
459
     * @return int
460
     */
461
    public function getMaximumChannelCount()
462
    {
463
        return $this->maximumChannelCount;
464
    }
465
466
    /**
467
     * @return int
468
     */
469
    public function getMaximumFrameSize()
470
    {
471
        return $this->maximumFrameSize;
472
    }
473
474
    /**
475
     * @return bool
476
     */
477
    public function isHandshaked()
478
    {
479
        return $this->isHandshaked;
480
    }
481
}
482