AbstractConnection   F
last analyzed

Complexity

Total Complexity 131

Size/Duplication

Total Lines 1184
Duplicated Lines 0 %

Test Coverage

Coverage 74.37%

Importance

Changes 11
Bugs 2 Features 1
Metric Value
eloc 429
dl 0
loc 1184
ccs 293
cts 394
cp 0.7437
rs 2
c 11
b 2
f 1
wmc 131

52 Methods

Rating   Name   Duplication   Size   Complexity  
A connect() 0 57 5
A close_socket() 0 4 2
A select() 0 10 3
A set_close_on_destruct() 0 3 1
A __destruct() 0 4 2
A safeClose() 0 7 3
A __clone() 0 6 2
A close_input() 0 7 3
A reconnect() 0 9 1
A setIsConnected() 0 3 1
A set_connection_unblock_handler() 0 4 1
A connection_secure() 0 3 1
A connection_start() 0 14 1
A connection_close() 0 11 1
A isWriting() 0 3 1
A connection_open_ok() 0 4 1
A connection_redirect() 0 11 1
A x_tune_ok() 0 8 1
A isBlocked() 0 3 1
A close() 0 30 4
A connection_blocked() 0 5 1
A getIO() 0 3 1
A x_secure_ok() 0 5 1
A connection_close_ok() 0 3 1
A checkHeartBeat() 0 3 1
A isConnected() 0 3 1
A connection_unblocked() 0 5 1
A set_connection_block_handler() 0 4 1
A x_open() 0 17 2
A getLastActivity() 0 3 1
A x_close_ok() 0 6 1
A getReadTimeout() 0 3 1
A x_start_ok() 0 8 1
A closeChannels() 0 10 4
A write() 0 15 3
A get_free_channel_id() 0 9 3
C wait_channel() 0 54 14
A prepare_channel_method_frame() 0 22 3
A prepare_content() 0 54 5
B __construct() 0 59 9
A send_channel_method_frame() 0 5 1
C wait_frame() 0 51 10
A do_close() 0 7 1
A send_content() 0 4 1
A create_connection() 0 23 6
A getServerProperties() 0 3 1
A connectOnConstruct() 0 7 2
A channel() 0 14 4
A getLibraryProperties() 0 10 3
A getHeartbeat() 0 3 1
A validate_host() 0 13 5
A connection_tune() 0 25 5

How to fix   Complexity   

Complex Class

Complex classes like AbstractConnection often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use AbstractConnection, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace PhpAmqpLib\Connection;
4
5
use PhpAmqpLib\Channel\AbstractChannel;
6
use PhpAmqpLib\Channel\AMQPChannel;
7
use PhpAmqpLib\Channel\Frame;
8
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
9
use PhpAmqpLib\Exception\AMQPHeartbeatMissedException;
10
use PhpAmqpLib\Exception\AMQPInvalidFrameException;
11
use PhpAmqpLib\Exception\AMQPIOException;
12
use PhpAmqpLib\Exception\AMQPNoDataException;
13
use PhpAmqpLib\Exception\AMQPRuntimeException;
14
use PhpAmqpLib\Exception\AMQPSocketException;
15
use PhpAmqpLib\Exception\AMQPTimeoutException;
16
use PhpAmqpLib\Helper\Assert;
17
use PhpAmqpLib\Package;
18
use PhpAmqpLib\Wire;
19
use PhpAmqpLib\Wire\AMQPReader;
20
use PhpAmqpLib\Wire\AMQPTable;
21
use PhpAmqpLib\Wire\AMQPWriter;
22
use PhpAmqpLib\Wire\IO\AbstractIO;
23
24
abstract class AbstractConnection extends AbstractChannel
25
{
26
    /**
27
     * @var array
28
     * @internal
29
     */
30
    public static $LIBRARY_PROPERTIES = array(
31
        'product' => array('S', Package::NAME),
32
        'platform' => array('S', 'PHP'),
33
        'version' => array('S', Package::VERSION),
34
        'information' => array('S', ''),
35
        'copyright' => array('S', ''),
36
        'capabilities' => array(
37
            'F',
38
            array(
39
                'authentication_failure_close' => array('t', true),
40
                'publisher_confirms' => array('t', true),
41
                'consumer_cancel_notify' => array('t', true),
42
                'exchange_exchange_bindings' => array('t', true),
43
                'basic.nack' => array('t', true),
44
                'connection.blocked' => array('t', true)
45
            )
46
        )
47
    );
48
49
    /**
50
     * @var AMQPChannel[]|AbstractChannel[]
51
     * @internal
52
     */
53
    public $channels = array();
54
55
    /** @var int */
56
    protected $version_major;
57
58
    /** @var int */
59
    protected $version_minor;
60
61
    /** @var array */
62
    protected $server_properties;
63
64
    /** @var array */
65
    protected $mechanisms;
66
67
    /** @var array */
68
    protected $locales;
69
70
    /** @var bool */
71
    protected $wait_tune_ok;
72
73
    /** @var string */
74
    protected $known_hosts;
75
76
    /** @var null|Wire\AMQPIOReader */
77
    protected $input;
78
79
    /** @var string */
80
    protected $vhost;
81
82
    /** @var bool */
83
    protected $insist;
84
85
    /** @var string */
86
    protected $login_method;
87
88
    /**
89
     * @var null|string
90
     */
91
    protected $login_response;
92
93
    /** @var string */
94
    protected $locale;
95
96
    /** @var int */
97
    protected $heartbeat;
98
99
    /** @var float */
100
    protected $last_frame;
101
102
    /** @var int */
103
    protected $channel_max = 65535;
104
105
    /** @var int */
106
    protected $frame_max = 131072;
107
108
    /** @var array Constructor parameters for clone */
109
    protected $construct_params;
110
111
    /** @var bool Close the connection in destructor */
112
    protected $close_on_destruct = true;
113
114
    /** @var bool Maintain connection status */
115
    protected $is_connected = false;
116
117
    /** @var AbstractIO */
118
    protected $io;
119
120
    /** @var callable Handles connection blocking from the server */
121
    private $connection_block_handler;
122
123
    /** @var callable Handles connection unblocking from the server */
124
    private $connection_unblock_handler;
125
126
    /** @var int Connection timeout value*/
127
    protected $connection_timeout;
128
129
    /** @var AMQPConnectionConfig|null */
130
    protected $config;
131
132
    /**
133
     * Circular buffer to speed up prepare_content().
134
     * Max size limited by $prepare_content_cache_max_size.
135
     *
136
     * @var array
137
     * @see prepare_content()
138
     */
139
    private $prepare_content_cache = array();
140
141
    /** @var int Maximal size of $prepare_content_cache */
142
    private $prepare_content_cache_max_size = 100;
143
144
    /**
145
     * Maximum time to wait for channel operations, in seconds
146
     * @var float $channel_rpc_timeout
147
     */
148
    private $channel_rpc_timeout;
149
150
    /**
151
     * If connection is blocked due to the broker running low on resources.
152
     * @var bool
153
     */
154
    protected $blocked = false;
155
156
    /**
157
     * If a frame is currently being written
158
     * @var bool
159
     */
160
    protected $writing = false;
161
162
    /**
163
     * @param string $user
164
     * @param string $password
165
     * @param string $vhost
166
     * @param bool $insist
167
     * @param string $login_method
168
     * @param null $login_response @deprecated
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $login_response is correct as it would always require null to be passed?
Loading history...
169
     * @param string $locale
170
     * @param AbstractIO $io
171
     * @param int $heartbeat
172
     * @param int|float $connection_timeout
173
     * @param int|float $channel_rpc_timeout
174
     * @param \PhpAmqpLib\Connection\AMQPConnectionConfig | null $config
175
     * @throws \Exception
176
     */
177
    public function __construct(
178
        $user,
179 53
        $password,
180
        $vhost = '/',
181
        $insist = false,
182
        $login_method = 'AMQPLAIN',
183
        $login_response = null,
184
        $locale = 'en_US',
185
        AbstractIO $io = null,
186
        $heartbeat = 0,
187
        $connection_timeout = 0,
188
        $channel_rpc_timeout = 0.0,
189
        ?AMQPConnectionConfig $config = null
190
    ) {
191
        if (is_null($io)) {
192
            throw new \InvalidArgumentException('Argument $io cannot be null');
193 53
        }
194 1
195
        if ($config) {
196
            $this->config = clone $config;
197 52
        }
198 14
199
        // save the params for the use of __clone
200
        $this->construct_params = func_get_args();
201
202 52
        $this->vhost = $vhost;
203
        $this->insist = $insist;
204 52
        $this->login_method = $login_method;
205 52
        $this->locale = $locale;
206 52
        $this->io = $io;
207 52
        $this->heartbeat = max(0, (int)$heartbeat);
208 52
        $this->connection_timeout = $connection_timeout;
209 52
        $this->channel_rpc_timeout = $channel_rpc_timeout;
210 52
211 52
        if ($user && $password) {
212 52
            if ($login_method === 'PLAIN') {
213
                $this->login_response = sprintf("\0%s\0%s", $user, $password);
214 52
            } elseif ($login_method === 'AMQPLAIN') {
215 52
                $login_response = new AMQPWriter();
216 4
                $login_response->write_table(array(
217 48
                    'LOGIN' => array('S', $user),
218 48
                    'PASSWORD' => array('S', $password)
219 48
                ));
220 48
221 48
                // Skip the length
222
                $responseValue = $login_response->getvalue();
223
                $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII');
224
            } else {
225 48
                throw new \InvalidArgumentException('Unknown login method: ' . $login_method);
226 48
            }
227
        } elseif ($login_method === 'EXTERNAL') {
228 52
            $this->login_response = $login_response;
229
        } else {
230
            $this->login_response = null;
231 1
        }
232
233
        // Lazy Connection waits on connecting
234
        if ($this->connectOnConstruct()) {
235 52
            $this->connect();
236 45
        }
237
    }
238
239
    /**
240
     * Connects to the AMQP server
241
     * @throws \Exception
242
     */
243 48
    protected function connect()
244
    {
245 48
        $this->blocked = false;
246
        try {
247
            // Loop until we connect
248 48
            while (!$this->isConnected()) {
249
                // Assume we will connect, until we dont
250 48
                $this->setIsConnected(true);
251
252
                // Connect the socket
253 48
                $this->io->connect();
254
255 47
                $this->channels = array();
256
                // The connection object itself is treated as channel 0
257 47
                parent::__construct($this, 0);
258
259 47
                $this->input = new Wire\AMQPIOReader($this->io);
260
261 47
                $this->write($this->constants->getHeader());
262
                // assume frame was sent successfully, used in $this->wait_channel()
263 47
                $this->last_frame = microtime(true);
0 ignored issues
show
Documentation Bug introduced by
It seems like microtime(true) can also be of type string. However, the property $last_frame is declared as type double. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
264 47
                $this->wait(array($this->waitHelper->get_wait('connection.start')), false, $this->connection_timeout);
265 47
                $this->x_start_ok(
266 47
                    $this->getLibraryProperties(),
267 47
                    $this->login_method,
268 47
                    $this->login_response,
269 47
                    $this->locale
270
                );
271
272 47
                $this->wait_tune_ok = true;
273 47
                while ($this->wait_tune_ok) {
274 47
                    $this->wait(array(
275 47
                        $this->waitHelper->get_wait('connection.secure'),
276 47
                        $this->waitHelper->get_wait('connection.tune')
277 47
                    ), false, $this->connection_timeout);
278
                }
279
280 47
                $host = $this->x_open($this->vhost, '', $this->insist);
281 47
                if (!$host) {
282
                    //Reconnected
283 47
                    $this->io->reenableHeartbeat();
284 47
                    return null; // we weren't redirected
285
                }
286
287
                $this->setIsConnected(false);
288
                $this->closeChannels();
289
290
                // we were redirected, close the socket, loop and try again
291
                $this->close_socket();
292
            }
293 1
        } catch (\Exception $e) {
294
            // Something went wrong, set the connection status
295 1
            $this->setIsConnected(false);
296 1
            $this->closeChannels();
297 1
            $this->close_input();
298 1
            $this->close_socket();
299 1
            throw $e; // Rethrow exception
300
        }
301
    }
302
303
    /**
304
     * Reconnects using the original connection settings.
305
     * This will not recreate any channels that were established previously
306
     * @throws \Exception
307 6
     */
308
    public function reconnect()
309
    {
310 6
        // Try to close the AMQP connection
311
        $this->safeClose();
312 6
        // Reconnect the socket/stream then AMQP
313
        $this->io->close();
314 6
        // getIO can initiate the connection setting via LazyConnection, set it here to be sure
315 6
        $this->setIsConnected(false);
316
        $this->connect();
317
    }
318
319
    /**
320
     * Cloning will use the old properties to make a new connection to the same server
321
     */
322
    public function __clone()
323
    {
324
        if ($this->config) {
325
            $this->config = clone $this->config;
326
        }
327
        call_user_func_array(array($this, '__construct'), $this->construct_params);
328
    }
329 4
330
    public function __destruct()
331 4
    {
332 4
        if ($this->close_on_destruct) {
333
            $this->safeClose();
334
        }
335
    }
336
337
    /**
338
     * Attempts to close the connection safely
339 10
     */
340
    protected function safeClose()
341
    {
342 10
        try {
343 10
            if (null !== $this->input) {
344
                $this->close();
345
            }
346
        } catch (\Exception $e) {
347
            // Nothing here
348
        }
349
    }
350
351
    /**
352
     * @param int|null $sec
353
     * @param int $usec
354
     * @return int
355
     * @throws AMQPIOException
356
     * @throws AMQPRuntimeException
357
     * @throws AMQPConnectionClosedException
358
     * @throws AMQPRuntimeException
359
     */
360
    public function select(?int $sec, int $usec = 0): int
361
    {
362
        try {
363
            return $this->io->select($sec, $usec);
364
        } catch (AMQPConnectionClosedException $e) {
365
            $this->do_close();
366
            throw $e;
367
        } catch (AMQPRuntimeException $e) {
368
            $this->setIsConnected(false);
369
            throw $e;
370
        }
371
    }
372
373
    /**
374
     * Allows to not close the connection
375
     * it's useful after the fork when you don't want to close parent process connection
376
     *
377
     * @param bool $close
378
     */
379
    public function set_close_on_destruct($close = true)
380
    {
381
        $this->close_on_destruct = (bool) $close;
382
    }
383 41
384
    protected function close_input()
385 41
    {
386
        $this->debug && $this->debug->debug_msg('closing input');
387 41
388 40
        if (null !== $this->input) {
389 40
            $this->input->close();
390
            $this->input = null;
391
        }
392
    }
393 41
394
    protected function close_socket()
395 41
    {
396 41
        $this->debug && $this->debug->debug_msg('closing socket');
397
        $this->io->close();
398
    }
399
400
    /**
401
     * @param string $data
402 47
     * @throws AMQPIOException
403
     */
404 47
    public function write($data)
405
    {
406
        $this->debug->debug_hexdump($data);
407 47
408 47
        try {
409
            $this->writing = true;
410
            $this->io->write($data);
411
        } catch (AMQPConnectionClosedException $e) {
412
            $this->do_close();
413
            throw $e;
414
        } catch (AMQPRuntimeException $e) {
415 47
            $this->setIsConnected(false);
416 47
            throw $e;
417
        } finally {
418
            $this->writing = false;
419
        }
420 40
    }
421
422 40
    protected function do_close()
423 40
    {
424 40
        $this->frame_queue = new \SplQueue();
425 40
        $this->method_queue = [];
426 40
        $this->setIsConnected(false);
427
        $this->close_input();
428
        $this->close_socket();
429
    }
430
431
    /**
432
     * @return int
433 36
     * @throws AMQPRuntimeException
434
     */
435 36
    public function get_free_channel_id()
436 36
    {
437 36
        for ($i = 1; $i <= $this->channel_max; $i++) {
438
            if (!isset($this->channels[$i])) {
439
                return $i;
440
            }
441
        }
442
443
        throw new AMQPRuntimeException('No free channel ids');
444
    }
445
446
    /**
447
     * @param int $channel
448
     * @param int $class_id
449
     * @param int $weight
450
     * @param int $body_size
451
     * @param string $packed_properties
452
     * @param string $body
453 14
     * @param AMQPWriter $pkt
454
     * @throws AMQPIOException
455 14
     */
456 14
    public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt)
457
    {
458
        $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt);
459
        $this->write($pkt->getvalue());
460
    }
461
462
    /**
463
     * Returns a new AMQPWriter or mutates the provided $pkt
464
     *
465
     * @param int $channel
466
     * @param int $class_id
467
     * @param int $weight
468
     * @param int $body_size
469
     * @param string $packed_properties
470
     * @param string $body
471 14
     * @param AMQPWriter|null $pkt
472
     * @return AMQPWriter
473 14
     */
474
    public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt)
475
    {
476 14
        $pkt = $pkt ?: new AMQPWriter();
477
478
        // Content already prepared ?
479
        $key_cache = sprintf(
480
            '%s|%s|%s|%s',
481
            $channel,
482
            $packed_properties,
483
            $class_id,
484 14
            $weight
485 14
        );
486 14
487 14
        if (!isset($this->prepare_content_cache[$key_cache])) {
488 14
            $w = new AMQPWriter();
489 14
            $w->write_octet(2);
490 14
            $w->write_short($channel);
491 14
            $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12);
492 14
            $w->write_short($class_id);
493
            $w->write_short($weight);
494
            $this->prepare_content_cache[$key_cache] = $w->getvalue();
495
            if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) {
496
                reset($this->prepare_content_cache);
497
                $old_key = key($this->prepare_content_cache);
498 14
                unset($this->prepare_content_cache[$old_key]);
499
            }
500 14
        }
501 14
        $pkt->write($this->prepare_content_cache[$key_cache]);
502
503 14
        $pkt->write_longlong($body_size);
504
        $pkt->write($packed_properties);
505
506
        $pkt->write_octet(0xCE);
507
508
509 14
        // memory efficiency: walk the string instead of biting
510 14
        // it. good for very large packets (close in size to
511 14
        // memory_limit setting)
512 13
        $position = 0;
513 13
        $bodyLength = mb_strlen($body, 'ASCII');
514
        while ($position < $bodyLength) {
515 13
            $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII');
516 13
            $position += $this->frame_max - 8;
517 13
518
            $pkt->write_octet(3);
519 13
            $pkt->write_short($channel);
520
            $pkt->write_long(mb_strlen($payload, 'ASCII'));
521 13
522
            $pkt->write($payload);
523
524 14
            $pkt->write_octet(0xCE);
525
        }
526
527
        return $pkt;
528
    }
529
530
    /**
531
     * @param int $channel
532
     * @param array $method_sig
533 47
     * @param AMQPWriter|string $args
534
     * @param null $pkt
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $pkt is correct as it would always require null to be passed?
Loading history...
535 47
     * @throws AMQPIOException
536 47
     */
537 47
    protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
538
    {
539
        $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt);
540
        $this->write($pkt->getvalue());
541
        $this->debug->debug_method_signature1($method_sig);
542
    }
543
544
    /**
545
     * Returns a new AMQPWriter or mutates the provided $pkt
546
     *
547
     * @param int $channel
548
     * @param array $method_sig
549 47
     * @param AMQPWriter|string $args
550
     * @param AMQPWriter|null $pkt
551 47
     * @return AMQPWriter
552 47
     */
553
    protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
554
    {
555 47
        if ($args instanceof AMQPWriter) {
556
            $args = $args->getvalue();
557 47
        }
558 47
559 47
        $pkt = $pkt ?: new AMQPWriter();
560
561
        $pkt->write_octet(1);
562 47
        $pkt->write_short($channel);
563 47
        $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id
564 47
        // in payload
565
566 47
        $pkt->write_short($method_sig[0]); // class_id
567
        $pkt->write_short($method_sig[1]); // method_id
568 47
        $pkt->write($args);
569
570 47
        $pkt->write_octet(0xCE);
571
572
        $this->debug->debug_method_signature1($method_sig);
573
574
        return $pkt;
575
    }
576
577
    /**
578
     * Waits for a frame from the server
579
     *
580
     * @param int|float|null $timeout
581
     * @return Frame
582 47
     * @throws \Exception
583
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
584 47
     * @throws AMQPRuntimeException
585
     */
586
    protected function wait_frame($timeout = 0): Frame
587
    {
588
        if (null === $this->input) {
589 47
            $this->setIsConnected(false);
590 47
            throw new AMQPConnectionClosedException('Broken pipe or closed connection');
591
        }
592
593
        $currentTimeout = $this->input->getTimeout();
594 47
        $this->input->setTimeout($timeout);
595 47
596
        try {
597
            $header = $this->input->readFrameHeader();
598 47
            $frame_type = $header['type'];
599 47
            if (!$this->constants->isFrameType($frame_type)) {
600
                throw new AMQPInvalidFrameException('Invalid frame type ' . $frame_type);
601
            }
602 47
            $size = $header['size'];
603 47
604
            // payload + ch
605
            $result = unpack('a' . $size . 'payload/Cch', $this->input->read(AMQPReader::OCTET + $size));
606 47
            $ch = $result['ch'];
607
            $frame = new Frame($frame_type, $header['channel'], $size, $result['payload']);
608 47
        } catch (AMQPTimeoutException $e) {
609 47
            if ($this->input) {
610 13
                $this->input->setTimeout($currentTimeout);
611 5
            }
612 5
            throw $e;
613
        } catch (AMQPNoDataException $e) {
614 5
            if ($this->input) {
615 8
                $this->input->setTimeout($currentTimeout);
616 8
            }
617 8
            throw $e;
618
        } catch (AMQPConnectionClosedException $exception) {
619 8
            $this->do_close();
620
            throw $exception;
621
        } finally {
622
            if ($this->input) {
623
                $this->input->setTimeout($currentTimeout);
624
            }
625 47
        }
626
627 47
        $this->input->setTimeout($currentTimeout);
0 ignored issues
show
Bug introduced by
The method setTimeout() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

627
        $this->input->/** @scrutinizer ignore-call */ 
628
                      setTimeout($currentTimeout);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
628
629
        if ($ch !== Frame::END) {
630
            throw new AMQPInvalidFrameException(sprintf(
631
                'Framing error, unexpected byte: %x',
632
                $ch
633
            ));
634 47
        }
635
636
        return $frame;
637
    }
638
639
    /**
640
     * Waits for a frame from the server destined for a particular channel.
641
     *
642
     * @param int $channel_id
643
     * @param int|float|null $timeout
644 47
     * @return Frame
645
     * @throws \Exception
646
     */
647 47
    protected function wait_channel(int $channel_id, $timeout = 0): Frame
648 47
    {
649 47
        // Keeping the original timeout unchanged.
650
        $_timeout = $timeout;
651 47
        while (true) {
652 13
            $start = microtime(true);
653
            try {
654 5
                $frame = $this->wait_frame($_timeout);
655 5
            } catch (AMQPTimeoutException $e) {
656
                if (
657
                    $this->heartbeat && $this->last_frame
658
                    && microtime(true) - ($this->heartbeat * 2) > $this->last_frame
659
                ) {
660
                    $this->debug->debug_msg('missed server heartbeat (at threshold * 2)');
661
                    $this->setIsConnected(false);
662 5
                    throw new AMQPHeartbeatMissedException('Missed server heartbeat');
663
                }
664
665 47
                throw $e;
666
            }
667 47
668
            $this->last_frame = microtime(true);
0 ignored issues
show
Documentation Bug introduced by
It seems like microtime(true) can also be of type string. However, the property $last_frame is declared as type double. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
669 6
            $frame_channel = $frame->getChannel();
670 6
671 4
            if ($frame_channel === 0 && $frame->isHeartbeat()) {
672 4
                // skip heartbeat frames and reduce the timeout by the time passed
673
                $this->debug->debug_msg('received server heartbeat');
674
                if ($_timeout > 0) {
675
                    $_timeout -= $this->last_frame - $start;
676
                    if ($_timeout <= 0) {
677 6
                        // If timeout has been reached, throw the exception without calling wait_frame
678
                        throw new AMQPTimeoutException('Timeout waiting on channel');
679
                    }
680 47
                }
681 47
                continue;
682
            }
683
684
            if ($frame_channel === $channel_id) {
685
                return $frame;
686
            }
687
688 1
            // Not the channel we were looking for.  Queue this frame
689 1
            //for later, when the other channel is looking for frames.
690
            // Make sure the channel still exists, it could have been
691
            // closed by a previous Exception.
692
            if (isset($this->channels[$frame_channel])) {
693
                $this->channels[$frame_channel]->frame_queue->enqueue($frame);
694
            }
695 1
696
            // If we just queued up a method for channel 0 (the Connection
697
            // itself) it's probably a close method in reaction to some
698
            // error, so deal with it right away.
699
            if ($frame_channel === 0 && $frame->isMethod()) {
700
                $this->wait();
701
            }
702
        }
703
    }
704
705
    /**
706
     * Fetches a channel object identified by the numeric channel_id, or
707
     * create that object if it doesn't already exist.
708
     *
709
     * @param int|null $channel_id
710
     * @return AMQPChannel
711
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
712 36
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
713
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
714 36
     * @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException
715
     */
716
    public function channel($channel_id = null)
717
    {
718 36
        if (!$this->is_connected) {
719 36
            $this->connect();
720 36
        }
721
        if (isset($this->channels[$channel_id])) {
722 36
            return $this->channels[$channel_id];
723
        }
724
725
        $channel_id = $channel_id ?: $this->get_free_channel_id();
726
        $ch = new AMQPChannel($this, $channel_id, true, $this->channel_rpc_timeout);
727
        $this->channels[$channel_id] = $ch;
728
729
        return $ch;
730
    }
731
732
    /**
733 40
     * Requests a connection close
734
     *
735 40
     * @param int $reply_code
736 40
     * @param string $reply_text
737 5
     * @param array $method_sig
738
     * @return mixed|null
739
     * @throws \Exception
740 40
     */
741
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
742 40
    {
743 40
        $this->io->disableHeartbeat();
744
        if (empty($this->protocolWriter) || !$this->isConnected()) {
745
            return null;
746 40
        }
747 40
748
        $result = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $result is dead and can be removed.
Loading history...
749 40
        try {
750 40
            $this->closeChannels();
751 40
            list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose(
752
                $reply_code,
753 40
                $reply_text,
754
                $method_sig[0],
755
                $method_sig[1]
756
            );
757
            $this->send_method_frame(array($class_id, $method_id), $args);
758
            $result = $this->wait(
759
                array($this->waitHelper->get_wait('connection.close_ok')),
760 40
                false,
761
                $this->connection_timeout
762 40
            );
763
        } catch (\Exception $exception) {
764
            $this->do_close();
765
            throw $exception;
766
        }
767
768
        $this->setIsConnected(false);
769
770
        return $result;
771
    }
772
773
    /**
774
     * @param AMQPReader $reader
775
     * @throws AMQPConnectionClosedException
776
     */
777
    protected function connection_close(AMQPReader $reader)
778
    {
779
        $code = (int)$reader->read_short();
780
        $reason = $reader->read_shortstr();
781
        $class = $reader->read_short();
782
        $method = $reader->read_short();
783
        $reason .= sprintf('(%s, %s)', $class, $method);
784
785
        $this->x_close_ok();
786
787
        throw new AMQPConnectionClosedException($reason, $code);
788
    }
789
790
    /**
791
     * Confirms a connection close
792
     */
793
    protected function x_close_ok()
794
    {
795
        $this->send_method_frame(
796 40
            explode(',', $this->waitHelper->get_wait('connection.close_ok'))
797
        );
798 40
        $this->do_close();
799
    }
800
801
    /**
802
     * Confirm a connection close
803
     */
804
    protected function connection_close_ok()
805
    {
806
        $this->do_close();
807 47
    }
808
809 47
    /**
810 47
     * @param string $virtual_host
811 47
     * @param string $capabilities
812 47
     * @param bool $insist
813 47
     * @return mixed
814
     */
815
    protected function x_open($virtual_host, $capabilities = '', $insist = false)
816 47
    {
817
        $args = new AMQPWriter();
818
        $args->write_shortstr($virtual_host);
819 47
        $args->write_shortstr($capabilities);
820
        $args->write_bits(array($insist));
821
        $this->send_method_frame(array(10, 40), $args);
822
823 47
        $wait = array(
824
            $this->waitHelper->get_wait('connection.open_ok')
825
        );
826
827
        if ($this->protocolVersion === Wire\Constants080::VERSION) {
828
            $wait[] = $this->waitHelper->get_wait('connection.redirect');
829
        }
830
831 47
        return $this->wait($wait, false, $this->connection_timeout);
832
    }
833 47
834 47
    /**
835
     * Signals that the connection is ready
836
     *
837
     * @param AMQPReader $args
838
     */
839
    protected function connection_open_ok($args)
840
    {
841
        $this->known_hosts = $args->read_shortstr();
842
        $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts);
843
    }
844
845
    /**
846
     * Asks the client to use a different server
847
     *
848
     * @param AMQPReader $args
849
     * @return string
850
     */
851
    protected function connection_redirect($args)
852
    {
853
        $host = $args->read_shortstr();
854
        $this->known_hosts = $args->read_shortstr();
855
        $this->debug->debug_msg(sprintf(
856
            'Redirected to [%s], known_hosts [%s]',
857
            $host,
858
            $this->known_hosts
859
        ));
860
861
        return $host;
862
    }
863
864
    /**
865
     * Security mechanism challenge
866
     *
867
     * @param AMQPReader $args
868
     */
869
    protected function connection_secure($args)
870
    {
871
        $args->read_longstr();
872
    }
873
874
    /**
875
     * Security mechanism response
876
     *
877
     * @param string $response
878
     */
879
    protected function x_secure_ok($response)
880
    {
881
        $args = new AMQPWriter();
882
        $args->write_longstr($response);
883 47
        $this->send_method_frame(array(10, 21), $args);
884
    }
885 47
886 47
    /**
887 47
     * Starts connection negotiation
888 47
     *
889 47
     * @param AMQPReader $args
890
     */
891 47
    protected function connection_start($args)
892 47
    {
893 47
        $this->version_major = $args->read_octet();
894 47
        $this->version_minor = $args->read_octet();
895 47
        $this->server_properties = $args->read_table();
896 47
        $this->mechanisms = explode(' ', $args->read_longstr());
897
        $this->locales = explode(' ', $args->read_longstr());
898
899
        $this->debug->debug_connection_start(
900
            $this->version_major,
901
            $this->version_minor,
902
            $this->server_properties,
903
            $this->mechanisms,
904
            $this->locales
905
        );
906 47
    }
907
908 47
    /**
909 47
     * @param AMQPTable|array $clientProperties
910 47
     * @param string $mechanism
911 47
     * @param string $response
912 47
     * @param string $locale
913 47
     */
914
    protected function x_start_ok($clientProperties, $mechanism, $response, $locale)
915
    {
916
        $args = new AMQPWriter();
917
        $args->write_table($clientProperties);
918
        $args->write_shortstr($mechanism);
919
        $args->write_longstr($response);
920
        $args->write_shortstr($locale);
921 47
        $this->send_method_frame(array(10, 11), $args);
922
    }
923 47
924 47
    /**
925 47
     * Proposes connection tuning parameters
926
     *
927
     * @param AMQPReader $args
928 47
     */
929 47
    protected function connection_tune($args)
930 47
    {
931
        $v = $args->read_short();
932
        if ($v) {
933
            $this->channel_max = $v;
934 47
        }
935
936
        $v = $args->read_long();
937
        if ($v) {
938 47
            $this->frame_max = (int)$v;
939
        }
940
941
        // @see https://www.rabbitmq.com/heartbeats.html
942
        // If either value is 0 (see below), the greater value of the two is used
943
        // Otherwise the smaller value of the two is used
944
        // A zero value indicates that a peer suggests disabling heartbeats entirely.
945
        // To disable heartbeats, both peers have to opt in and use the value of 0
946
        // For BC, this library opts for disabled heartbeat if client value is 0.
947
        $v = $args->read_short();
948 47
        if ($this->heartbeat > 0 && $v > 0) {
949
            $this->heartbeat = min($this->heartbeat, $v);
950 47
        }
951 47
952 47
        $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat);
953 47
        $this->io->afterTune($this->heartbeat);
954 47
    }
955 47
956
    /**
957
     * Negotiates connection tuning parameters
958
     *
959
     * @param int $channel_max
960
     * @param int $frame_max
961
     * @param int $heartbeat
962
     */
963
    protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
964
    {
965
        $args = new AMQPWriter();
966
        $args->write_short($channel_max);
967
        $args->write_long($frame_max);
968
        $args->write_short($heartbeat);
969
        $this->send_method_frame(array(10, 31), $args);
970
        $this->wait_tune_ok = false;
971
    }
972
973
    /**
974
     * @return AbstractIO
975
     * @deprecated
976
     */
977
    public function getIO()
978
    {
979
        return $this->io;
980
    }
981
982
    /**
983 2
     * Check connection heartbeat if enabled.
984
     * @throws AMQPHeartbeatMissedException If too much time passed since last connection activity.
985 2
     * @throws AMQPConnectionClosedException If connection was closed due to network issues or timeouts.
986
     * @throws AMQPSocketException If connection was already closed.
987
     * @throws AMQPTimeoutException If heartbeat write takes too much time.
988
     * @throws AMQPIOException If other connection problems occurred.
989
     */
990
    public function checkHeartBeat()
991
    {
992
        $this->io->check_heartbeat();
993
    }
994
995
    /**
996
     * @return float|int
997
     */
998
    public function getLastActivity()
999
    {
1000
        return $this->io->getLastActivity();
1001
    }
1002 1
1003
    /**
1004 1
     * @return float
1005
     * @since 3.2.0
1006 1
     */
1007
    public function getReadTimeout(): float
1008
    {
1009
        return $this->io->getReadTimeout();
1010
    }
1011
1012
    /**
1013
     * Handles connection blocked notifications
1014
     *
1015
     * @param AMQPReader $args
1016
     */
1017
    protected function connection_blocked(AMQPReader $args)
1018
    {
1019
        $this->blocked = true;
1020
        // Call the block handler and pass in the reason
1021
        $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr()));
1022
    }
1023
1024
    /**
1025
     * Handles connection unblocked notifications
1026
     */
1027
    protected function connection_unblocked()
1028
    {
1029
        $this->blocked = false;
1030
        // No args to an unblock event
1031
        $this->dispatch_to_handler($this->connection_unblock_handler);
1032
    }
1033
1034
    /**
1035
     * Sets a handler which is called whenever a connection.block is sent from the server
1036
     *
1037
     * @param callable $callback
1038
     * @throws \InvalidArgumentException if $callback is not callable
1039
     */
1040
    public function set_connection_block_handler($callback)
1041
    {
1042
        Assert::isCallable($callback);
1043
        $this->connection_block_handler = $callback;
1044
    }
1045
1046
    /**
1047
     * Sets a handler which is called whenever a connection.block is sent from the server
1048 48
     *
1049
     * @param callable $callback
1050 48
     * @throws \InvalidArgumentException if $callback is not callable
1051
     */
1052
    public function set_connection_unblock_handler($callback)
1053
    {
1054
        Assert::isCallable($callback);
1055
        $this->connection_unblock_handler = $callback;
1056
    }
1057
1058 15
    /**
1059
     * Gets the connection status
1060 15
     *
1061
     * @return bool
1062
     */
1063
    public function isConnected()
1064
    {
1065
        return $this->is_connected;
1066
    }
1067 2
1068
    /**
1069 2
     * Get the connection blocked state.
1070
     * @return bool
1071
     * @since 2.12.0
1072
     */
1073
    public function isBlocked()
1074
    {
1075
        return $this->blocked;
1076
    }
1077 48
1078
    /**
1079 48
     * Get the io writing state.
1080
     * @return bool
1081
     */
1082
    public function isWriting()
1083
    {
1084
        return $this->writing;
1085 41
    }
1086
1087 41
    /**
1088
     * Set the connection status
1089 40
     *
1090 40
     * @param bool $is_connected
1091
     */
1092
    protected function setIsConnected($is_connected)
1093 7
    {
1094
        $this->is_connected = (bool) $is_connected;
1095
    }
1096
1097
    /**
1098
     * Closes all available channels
1099
     */
1100
    protected function closeChannels()
1101
    {
1102
        foreach ($this->channels as $key => $channel) {
1103
            // channels[0] is this connection object, so don't close it yet
1104
            if ($key === 0) {
1105 44
                continue;
1106
            }
1107 44
            try {
1108
                $channel->close();
1109
            } catch (\Exception $e) {
1110
                /* Ignore closing errors */
1111
            }
1112
        }
1113
    }
1114
1115
    /**
1116
     * Should the connection be attempted during construction?
1117
     *
1118
     * @return bool
1119
     */
1120
    public function connectOnConstruct(): bool
1121 4
    {
1122
        if ($this->config) {
1123 4
            return !$this->config->isLazy();
1124
        }
1125
1126
        return true;
1127
    }
1128
1129
    /**
1130
     * @return array
1131 49
     */
1132
    public function getServerProperties()
1133 49
    {
1134
        return $this->server_properties;
1135
    }
1136
1137
    /**
1138
     * @return int
1139
     */
1140
    public function getHeartbeat()
1141
    {
1142
        return $this->heartbeat;
1143 2
    }
1144
1145 2
    /**
1146
     * Get the library properties for populating the client protocol information
1147
     *
1148
     * @return array
1149
     */
1150
    public function getLibraryProperties()
1151 2
    {
1152 2
        $config = self::$LIBRARY_PROPERTIES;
1153 2
        if ($this->config !== null) {
1154 2
            $connectionName = $this->config->getConnectionName();
1155 2
            if ($connectionName !== '') {
1156 2
                $config['connection_name'] = ['S', $connectionName];
1157 2
            }
1158
        }
1159 2
        return $config;
1160 2
    }
1161
1162
    /**
1163
     * @param array $hosts
1164
     * @param array $options
1165
     *
1166
     * @return mixed
1167
     * @throws \Exception
1168
     * @deprecated Use AMQPConnectionFactory.
1169
     */
1170
    public static function create_connection($hosts, $options = array())
1171
    {
1172
        if (!is_array($hosts) || count($hosts) < 1) {
0 ignored issues
show
introduced by
The condition is_array($hosts) is always true.
Loading history...
1173
            throw new \InvalidArgumentException(
1174
                'An array of hosts are required when attempting to create a connection'
1175
            );
1176
        }
1177
1178
        foreach ($hosts as $hostdef) {
1179
            self::validate_host($hostdef);
1180
            $host = $hostdef['host'];
1181
            $port = $hostdef['port'];
1182
            $user = $hostdef['user'];
1183
            $password = $hostdef['password'];
1184
            $vhost = isset($hostdef['vhost']) ? $hostdef['vhost'] : '/';
1185
            try {
1186
                $conn = static::try_create_connection($host, $port, $user, $password, $vhost, $options);
0 ignored issues
show
Bug introduced by
The method try_create_connection() does not exist on PhpAmqpLib\Connection\AbstractConnection. Did you maybe mean connect()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

1186
                /** @scrutinizer ignore-call */ 
1187
                $conn = static::try_create_connection($host, $port, $user, $password, $vhost, $options);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
1187
                return $conn;
1188
            } catch (\Exception $e) {
1189
                $latest_exception = $e;
1190
            }
1191
        }
1192
        throw $latest_exception;
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $latest_exception seems to be defined by a foreach iteration on line 1178. Are you sure the iterator is never empty, otherwise this variable is not defined?
Loading history...
1193
    }
1194
1195
    public static function validate_host($host)
1196
    {
1197
        if (!isset($host['host'])) {
1198
            throw new \InvalidArgumentException("'host' key is required.");
1199
        }
1200
        if (!isset($host['port'])) {
1201
            throw new \InvalidArgumentException("'port' key is required.");
1202
        }
1203
        if (!isset($host['user'])) {
1204
            throw new \InvalidArgumentException("'user' key is required.");
1205
        }
1206
        if (!isset($host['password'])) {
1207
            throw new \InvalidArgumentException("'password' key is required.");
1208
        }
1209
    }
1210
}
1211