Passed
Push — master ( 853986...256188 )
by Ramūnas
41:18 queued 16:15
created

AbstractConnection::closeChannelsIfDisconnected()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 9
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 4
nc 3
nop 0
dl 0
loc 9
ccs 2
cts 2
cp 1
crap 3
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace PhpAmqpLib\Connection;
4
5
use PhpAmqpLib\Channel\AbstractChannel;
6
use PhpAmqpLib\Channel\AMQPChannel;
7
use PhpAmqpLib\Channel\Frame;
8
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
9
use PhpAmqpLib\Exception\AMQPHeartbeatMissedException;
10
use PhpAmqpLib\Exception\AMQPInvalidFrameException;
11
use PhpAmqpLib\Exception\AMQPIOException;
12
use PhpAmqpLib\Exception\AMQPNoDataException;
13
use PhpAmqpLib\Exception\AMQPRuntimeException;
14
use PhpAmqpLib\Exception\AMQPSocketException;
15
use PhpAmqpLib\Exception\AMQPTimeoutException;
16
use PhpAmqpLib\Helper\Assert;
17
use PhpAmqpLib\Package;
18
use PhpAmqpLib\Wire;
19
use PhpAmqpLib\Wire\AMQPReader;
20
use PhpAmqpLib\Wire\AMQPTable;
21
use PhpAmqpLib\Wire\AMQPWriter;
22
use PhpAmqpLib\Wire\IO\AbstractIO;
23
24
abstract class AbstractConnection extends AbstractChannel
25
{
26
    /**
27
     * @var array
28
     * @internal
29
     */
30
    public static $LIBRARY_PROPERTIES = array(
31
        'product' => array('S', Package::NAME),
32
        'platform' => array('S', 'PHP'),
33
        'version' => array('S', Package::VERSION),
34
        'information' => array('S', ''),
35
        'copyright' => array('S', ''),
36
        'capabilities' => array(
37
            'F',
38
            array(
39
                'authentication_failure_close' => array('t', true),
40
                'publisher_confirms' => array('t', true),
41
                'consumer_cancel_notify' => array('t', true),
42
                'exchange_exchange_bindings' => array('t', true),
43
                'basic.nack' => array('t', true),
44
                'connection.blocked' => array('t', true)
45
            )
46
        )
47
    );
48
49
    /**
50
     * @var AMQPChannel[]|AbstractChannel[]
51
     * @internal
52
     */
53
    public $channels = array();
54
55
    /** @var int */
56
    protected $version_major;
57
58
    /** @var int */
59
    protected $version_minor;
60
61
    /** @var array */
62
    protected $server_properties;
63
64
    /** @var array */
65
    protected $mechanisms;
66
67
    /** @var array */
68
    protected $locales;
69
70
    /** @var bool */
71
    protected $wait_tune_ok;
72
73
    /** @var string */
74
    protected $known_hosts;
75
76
    /** @var null|Wire\AMQPIOReader */
77
    protected $input;
78
79
    /** @var string */
80
    protected $vhost;
81
82
    /** @var bool */
83
    protected $insist;
84
85
    /** @var string */
86
    protected $login_method;
87
88
    /**
89
     * @var null|string
90
     */
91
    protected $login_response;
92
93
    /** @var string */
94
    protected $locale;
95
96
    /** @var int */
97
    protected $heartbeat;
98
99
    /** @var float */
100
    protected $last_frame;
101
102
    /** @var int */
103
    protected $channel_max = 65535;
104
105
    /** @var int */
106
    protected $frame_max = 131072;
107
108
    /** @var array Constructor parameters for clone */
109
    protected $construct_params;
110
111
    /** @var bool Close the connection in destructor */
112
    protected $close_on_destruct = true;
113
114
    /** @var bool Maintain connection status */
115
    protected $is_connected = false;
116
117
    /** @var AbstractIO */
118
    protected $io;
119
120
    /** @var callable Handles connection blocking from the server */
121
    private $connection_block_handler;
122
123
    /** @var callable Handles connection unblocking from the server */
124
    private $connection_unblock_handler;
125
126
    /** @var int Connection timeout value*/
127
    protected $connection_timeout;
128
129
    /** @var AMQPConnectionConfig|null */
130
    protected $config;
131
132
    /**
133
     * Circular buffer to speed up prepare_content().
134
     * Max size limited by $prepare_content_cache_max_size.
135
     *
136
     * @var array
137
     * @see prepare_content()
138
     */
139
    private $prepare_content_cache = array();
140
141
    /** @var int Maximal size of $prepare_content_cache */
142
    private $prepare_content_cache_max_size = 100;
143
144
    /**
145
     * Maximum time to wait for channel operations, in seconds
146
     * @var float $channel_rpc_timeout
147
     */
148
    private $channel_rpc_timeout;
149
150
    /**
151
     * If connection is blocked due to the broker running low on resources.
152
     * @var bool
153
     */
154
    protected $blocked = false;
155
156
    /**
157
     * If a frame is currently being written
158
     * @var bool
159
     */
160
    protected $writing = false;
161
162
    /**
163
     * @param string $user
164
     * @param string $password
165
     * @param string $vhost
166
     * @param bool $insist
167
     * @param string $login_method
168
     * @param null $login_response @deprecated
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $login_response is correct as it would always require null to be passed?
Loading history...
169
     * @param string $locale
170
     * @param AbstractIO $io
171
     * @param int $heartbeat
172
     * @param int|float $connection_timeout
173
     * @param int|float $channel_rpc_timeout
174
     * @param \PhpAmqpLib\Connection\AMQPConnectionConfig | null $config
175
     * @throws \Exception
176
     */
177
    public function __construct(
178
        $user,
179 53
        $password,
180
        $vhost = '/',
181
        $insist = false,
182
        $login_method = 'AMQPLAIN',
183
        $login_response = null,
184
        $locale = 'en_US',
185
        ?AbstractIO $io = null,
186
        $heartbeat = 0,
187
        $connection_timeout = 0,
188
        $channel_rpc_timeout = 0.0,
189
        ?AMQPConnectionConfig $config = null
190
    ) {
191
        if (is_null($io)) {
192
            throw new \InvalidArgumentException('Argument $io cannot be null');
193 53
        }
194 1
195
        if ($config) {
196
            $this->config = clone $config;
197 52
        }
198 14
199
        // save the params for the use of __clone
200
        $this->construct_params = func_get_args();
201
202 52
        $this->vhost = $vhost;
203
        $this->insist = $insist;
204 52
        $this->login_method = $login_method;
205 52
        $this->locale = $locale;
206 52
        $this->io = $io;
207 52
        $this->heartbeat = max(0, (int)$heartbeat);
208 52
        $this->connection_timeout = $connection_timeout;
209 52
        $this->channel_rpc_timeout = $channel_rpc_timeout;
210 52
211 52
        if ($user && $password) {
212 52
            if ($login_method === 'PLAIN') {
213
                $this->login_response = sprintf("\0%s\0%s", $user, $password);
214 52
            } elseif ($login_method === 'AMQPLAIN') {
215 52
                $login_response = new AMQPWriter();
216 4
                $login_response->write_table(array(
217 48
                    'LOGIN' => array('S', $user),
218 48
                    'PASSWORD' => array('S', $password)
219 48
                ));
220 48
221 48
                // Skip the length
222
                $responseValue = $login_response->getvalue();
223
                $this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII');
224
            } else {
225 48
                throw new \InvalidArgumentException('Unknown login method: ' . $login_method);
226 48
            }
227
        } elseif ($login_method === 'EXTERNAL') {
228 52
            $this->login_response = $login_response;
229
        } else {
230
            $this->login_response = null;
231 1
        }
232
233
        // Lazy Connection waits on connecting
234
        if ($this->connectOnConstruct()) {
235 52
            $this->connect();
236 45
        }
237
    }
238
239
    /**
240
     * Connects to the AMQP server
241
     * @throws \Exception
242
     */
243 48
    protected function connect()
244
    {
245 48
        $this->blocked = false;
246
        try {
247
            // Loop until we connect
248 48
            while (!$this->isConnected()) {
249
                // Assume we will connect, until we dont
250 48
                $this->setIsConnected(true);
251
252
                // Connect the socket
253 48
                $this->io->connect();
254
255 47
                $this->channels = array();
256
                // The connection object itself is treated as channel 0
257 47
                parent::__construct($this, 0);
258
259 47
                $this->input = new Wire\AMQPIOReader($this->io);
260
261 47
                $this->write($this->constants->getHeader());
262
                // assume frame was sent successfully, used in $this->wait_channel()
263 47
                $this->last_frame = microtime(true);
0 ignored issues
show
Documentation Bug introduced by
It seems like microtime(true) can also be of type string. However, the property $last_frame is declared as type double. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
264 47
                $this->wait(array($this->waitHelper->get_wait('connection.start')), false, $this->connection_timeout);
265 47
                $this->x_start_ok(
266 47
                    $this->getLibraryProperties(),
267 47
                    $this->login_method,
268 47
                    $this->login_response,
269 47
                    $this->locale
270
                );
271
272 47
                $this->wait_tune_ok = true;
273 47
                while ($this->wait_tune_ok) {
274 47
                    $this->wait(array(
275 47
                        $this->waitHelper->get_wait('connection.secure'),
276 47
                        $this->waitHelper->get_wait('connection.tune')
277 47
                    ), false, $this->connection_timeout);
278
                }
279
280 47
                $host = $this->x_open($this->vhost, '', $this->insist);
281 47
                if (!$host) {
282
                    //Reconnected
283 47
                    $this->io->reenableHeartbeat();
284 47
                    return null; // we weren't redirected
285
                }
286
287
                $this->setIsConnected(false);
288
                $this->closeChannels();
289
290
                // we were redirected, close the socket, loop and try again
291
                $this->close_socket();
292
            }
293 1
        } catch (\Exception $e) {
294
            // Something went wrong, set the connection status
295 1
            $this->setIsConnected(false);
296 1
            $this->closeChannels();
297 1
            $this->close_input();
298 1
            $this->close_socket();
299 1
            throw $e; // Rethrow exception
300
        }
301
    }
302
303
    /**
304
     * Reconnects using the original connection settings.
305
     * This will not recreate any channels that were established previously
306
     * @throws \Exception
307 6
     */
308
    public function reconnect()
309
    {
310 6
        // Try to close the AMQP connection
311
        $this->safeClose();
312 6
        // Reconnect the socket/stream then AMQP
313
        $this->io->close();
314 6
        // getIO can initiate the connection setting via LazyConnection, set it here to be sure
315 6
        $this->setIsConnected(false);
316
        $this->connect();
317
    }
318
319
    /**
320
     * Cloning will use the old properties to make a new connection to the same server
321
     */
322
    public function __clone()
323
    {
324
        if ($this->config) {
325
            $this->config = clone $this->config;
326
        }
327
        call_user_func_array(array($this, '__construct'), $this->construct_params);
328
    }
329 4
330
    public function __destruct()
331 4
    {
332 4
        if ($this->close_on_destruct) {
333
            $this->safeClose();
334
        }
335
    }
336
337
    /**
338
     * Attempts to close the connection safely
339 10
     */
340
    protected function safeClose()
341
    {
342 10
        try {
343 10
            if (null !== $this->input) {
344
                $this->close();
345
            }
346
        } catch (\Exception $e) {
347
            // Nothing here
348
        }
349
    }
350
351
    /**
352
     * @param int|null $sec
353
     * @param int $usec
354
     * @return int
355
     * @throws AMQPIOException
356
     * @throws AMQPRuntimeException
357
     * @throws AMQPConnectionClosedException
358
     * @throws AMQPRuntimeException
359
     */
360
    public function select(?int $sec, int $usec = 0): int
361
    {
362
        try {
363
            return $this->io->select($sec, $usec);
364
        } catch (AMQPConnectionClosedException $e) {
365
            $this->do_close();
366
            throw $e;
367
        } catch (AMQPRuntimeException $e) {
368
            $this->setIsConnected(false);
369
            throw $e;
370
        }
371
    }
372
373
    /**
374
     * Allows to not close the connection
375
     * it's useful after the fork when you don't want to close parent process connection
376
     *
377
     * @param bool $close
378
     */
379
    public function set_close_on_destruct($close = true)
380
    {
381
        $this->close_on_destruct = (bool) $close;
382
    }
383 41
384
    protected function close_input()
385 41
    {
386
        $this->debug && $this->debug->debug_msg('closing input');
387 41
388 40
        if (null !== $this->input) {
389 40
            $this->input->close();
390
            $this->input = null;
391
        }
392
    }
393 41
394
    protected function close_socket()
395 41
    {
396 41
        $this->debug && $this->debug->debug_msg('closing socket');
397
        $this->io->close();
398
    }
399
400
    /**
401
     * @param string $data
402 47
     * @throws AMQPIOException
403
     */
404 47
    public function write($data)
405
    {
406
        $this->debug->debug_hexdump($data);
407 47
408 47
        try {
409
            $this->writing = true;
410
            $this->io->write($data);
411
        } catch (AMQPConnectionClosedException $e) {
412
            $this->do_close();
413
            throw $e;
414
        } catch (AMQPRuntimeException $e) {
415 47
            $this->setIsConnected(false);
416 47
            throw $e;
417
        } finally {
418
            $this->writing = false;
419
        }
420 40
    }
421
422 40
    protected function do_close()
423 40
    {
424 40
        $this->frame_queue = new \SplQueue();
425 40
        $this->method_queue = [];
426 40
        $this->setIsConnected(false);
427
        $this->closeChannelsIfDisconnected();
428
        $this->close_input();
429
        $this->close_socket();
430
    }
431
432
    /**
433 36
     * @return int
434
     * @throws AMQPRuntimeException
435 36
     */
436 36
    public function get_free_channel_id()
437 36
    {
438
        for ($i = 1; $i <= $this->channel_max; $i++) {
439
            if (!isset($this->channels[$i])) {
440
                return $i;
441
            }
442
        }
443
444
        throw new AMQPRuntimeException('No free channel ids');
445
    }
446
447
    /**
448
     * @param int $channel
449
     * @param int $class_id
450
     * @param int $weight
451
     * @param int $body_size
452
     * @param string $packed_properties
453 14
     * @param string $body
454
     * @param AMQPWriter $pkt
455 14
     * @throws AMQPIOException
456 14
     */
457
    public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt)
458
    {
459
        $this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt);
460
        $this->write($pkt->getvalue());
461
    }
462
463
    /**
464
     * Returns a new AMQPWriter or mutates the provided $pkt
465
     *
466
     * @param int $channel
467
     * @param int $class_id
468
     * @param int $weight
469
     * @param int $body_size
470
     * @param string $packed_properties
471 14
     * @param string $body
472
     * @param AMQPWriter|null $pkt
473 14
     * @return AMQPWriter
474
     */
475
    public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt)
476 14
    {
477
        $pkt = $pkt ?: new AMQPWriter();
478
479
        // Content already prepared ?
480
        $key_cache = sprintf(
481
            '%s|%s|%s|%s',
482
            $channel,
483
            $packed_properties,
484 14
            $class_id,
485 14
            $weight
486 14
        );
487 14
488 14
        if (!isset($this->prepare_content_cache[$key_cache])) {
489 14
            $w = new AMQPWriter();
490 14
            $w->write_octet(2);
491 14
            $w->write_short($channel);
492 14
            $w->write_long(mb_strlen($packed_properties, 'ASCII') + 12);
493
            $w->write_short($class_id);
494
            $w->write_short($weight);
495
            $this->prepare_content_cache[$key_cache] = $w->getvalue();
496
            if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) {
497
                reset($this->prepare_content_cache);
498 14
                $old_key = key($this->prepare_content_cache);
499
                unset($this->prepare_content_cache[$old_key]);
500 14
            }
501 14
        }
502
        $pkt->write($this->prepare_content_cache[$key_cache]);
503 14
504
        $pkt->write_longlong($body_size);
505
        $pkt->write($packed_properties);
506
507
        $pkt->write_octet(0xCE);
508
509 14
510 14
        // memory efficiency: walk the string instead of biting
511 14
        // it. good for very large packets (close in size to
512 13
        // memory_limit setting)
513 13
        $position = 0;
514
        $bodyLength = mb_strlen($body, 'ASCII');
515 13
        while ($position < $bodyLength) {
516 13
            $payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII');
517 13
            $position += $this->frame_max - 8;
518
519 13
            $pkt->write_octet(3);
520
            $pkt->write_short($channel);
521 13
            $pkt->write_long(mb_strlen($payload, 'ASCII'));
522
523
            $pkt->write($payload);
524 14
525
            $pkt->write_octet(0xCE);
526
        }
527
528
        return $pkt;
529
    }
530
531
    /**
532
     * @param int $channel
533 47
     * @param array $method_sig
534
     * @param AMQPWriter|string $args
535 47
     * @param null $pkt
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $pkt is correct as it would always require null to be passed?
Loading history...
536 47
     * @throws AMQPIOException
537 47
     */
538
    protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
539
    {
540
        $pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt);
541
        $this->write($pkt->getvalue());
542
        $this->debug->debug_method_signature1($method_sig);
543
    }
544
545
    /**
546
     * Returns a new AMQPWriter or mutates the provided $pkt
547
     *
548
     * @param int $channel
549 47
     * @param array $method_sig
550
     * @param AMQPWriter|string $args
551 47
     * @param AMQPWriter|null $pkt
552 47
     * @return AMQPWriter
553
     */
554
    protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
555 47
    {
556
        if ($args instanceof AMQPWriter) {
557 47
            $args = $args->getvalue();
558 47
        }
559 47
560
        $pkt = $pkt ?: new AMQPWriter();
561
562 47
        $pkt->write_octet(1);
563 47
        $pkt->write_short($channel);
564 47
        $pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id
565
        // in payload
566 47
567
        $pkt->write_short($method_sig[0]); // class_id
568 47
        $pkt->write_short($method_sig[1]); // method_id
569
        $pkt->write($args);
570 47
571
        $pkt->write_octet(0xCE);
572
573
        $this->debug->debug_method_signature1($method_sig);
574
575
        return $pkt;
576
    }
577
578
    /**
579
     * Waits for a frame from the server
580
     *
581
     * @param int|float|null $timeout
582 47
     * @return Frame
583
     * @throws \Exception
584 47
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
585
     * @throws AMQPRuntimeException
586
     */
587
    protected function wait_frame($timeout = 0): Frame
588
    {
589 47
        if (null === $this->input) {
590 47
            $this->setIsConnected(false);
591
            throw new AMQPConnectionClosedException('Broken pipe or closed connection');
592
        }
593
594 47
        $currentTimeout = $this->input->getTimeout();
595 47
        $this->input->setTimeout($timeout);
596
597
        try {
598 47
            $header = $this->input->readFrameHeader();
599 47
            $frame_type = $header['type'];
600
            if (!$this->constants->isFrameType($frame_type)) {
601
                throw new AMQPInvalidFrameException('Invalid frame type ' . $frame_type);
602 47
            }
603 47
            $size = $header['size'];
604
605
            // payload + ch
606 47
            $result = unpack('a' . $size . 'payload/Cch', $this->input->read(AMQPReader::OCTET + $size));
607
            $ch = $result['ch'];
608 47
            $frame = new Frame($frame_type, $header['channel'], $size, $result['payload']);
609 47
        } catch (AMQPTimeoutException $e) {
610 13
            if ($this->input) {
611 5
                $this->input->setTimeout($currentTimeout);
612 5
            }
613
            throw $e;
614 5
        } catch (AMQPNoDataException $e) {
615 8
            if ($this->input) {
616 8
                $this->input->setTimeout($currentTimeout);
617 8
            }
618
            throw $e;
619 8
        } catch (AMQPConnectionClosedException $exception) {
620
            $this->do_close();
621
            throw $exception;
622
        } finally {
623
            if ($this->input) {
624
                $this->input->setTimeout($currentTimeout);
625 47
            }
626
        }
627 47
628
        $this->input->setTimeout($currentTimeout);
0 ignored issues
show
Bug introduced by
The method setTimeout() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

628
        $this->input->/** @scrutinizer ignore-call */ 
629
                      setTimeout($currentTimeout);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
629
630
        if ($ch !== Frame::END) {
631
            throw new AMQPInvalidFrameException(sprintf(
632
                'Framing error, unexpected byte: %x',
633
                $ch
634 47
            ));
635
        }
636
637
        return $frame;
638
    }
639
640
    /**
641
     * Waits for a frame from the server destined for a particular channel.
642
     *
643
     * @param int $channel_id
644 47
     * @param int|float|null $timeout
645
     * @return Frame
646
     * @throws \Exception
647 47
     */
648 47
    protected function wait_channel(int $channel_id, $timeout = 0): Frame
649 47
    {
650
        // Keeping the original timeout unchanged.
651 47
        $_timeout = $timeout;
652 13
        while (true) {
653
            $start = microtime(true);
654 5
            try {
655 5
                $frame = $this->wait_frame($_timeout);
656
            } catch (AMQPTimeoutException $e) {
657
                if (
658
                    $this->heartbeat && $this->last_frame
659
                    && microtime(true) - ($this->heartbeat * 2) > $this->last_frame
660
                ) {
661
                    $this->debug->debug_msg('missed server heartbeat (at threshold * 2)');
662 5
                    $this->setIsConnected(false);
663
                    throw new AMQPHeartbeatMissedException('Missed server heartbeat');
664
                }
665 47
666
                throw $e;
667 47
            }
668
669 6
            $this->last_frame = microtime(true);
0 ignored issues
show
Documentation Bug introduced by
It seems like microtime(true) can also be of type string. However, the property $last_frame is declared as type double. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
670 6
            $frame_channel = $frame->getChannel();
671 4
672 4
            if ($frame_channel === 0 && $frame->isHeartbeat()) {
673
                // skip heartbeat frames and reduce the timeout by the time passed
674
                $this->debug->debug_msg('received server heartbeat');
675
                if ($_timeout > 0) {
676
                    $_timeout -= $this->last_frame - $start;
677 6
                    if ($_timeout <= 0) {
678
                        // If timeout has been reached, throw the exception without calling wait_frame
679
                        throw new AMQPTimeoutException('Timeout waiting on channel');
680 47
                    }
681 47
                }
682
                continue;
683
            }
684
685
            if ($frame_channel === $channel_id) {
686
                return $frame;
687
            }
688 1
689 1
            // Not the channel we were looking for.  Queue this frame
690
            //for later, when the other channel is looking for frames.
691
            // Make sure the channel still exists, it could have been
692
            // closed by a previous Exception.
693
            if (isset($this->channels[$frame_channel])) {
694
                $this->channels[$frame_channel]->frame_queue->enqueue($frame);
695 1
            }
696
697
            // If we just queued up a method for channel 0 (the Connection
698
            // itself) it's probably a close method in reaction to some
699
            // error, so deal with it right away.
700
            if ($frame_channel === 0 && $frame->isMethod()) {
701
                $this->wait();
702
            }
703
        }
704
    }
705
706
    /**
707
     * Fetches a channel object identified by the numeric channel_id, or
708
     * create that object if it doesn't already exist.
709
     *
710
     * @param int|null $channel_id
711
     * @return AMQPChannel
712 36
     * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
713
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
714 36
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException
715
     * @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException
716
     */
717
    public function channel($channel_id = null)
718 36
    {
719 36
        if (!$this->is_connected) {
720 36
            $this->connect();
721
        }
722 36
        if (isset($this->channels[$channel_id])) {
723
            return $this->channels[$channel_id];
724
        }
725
726
        $channel_id = $channel_id ?: $this->get_free_channel_id();
727
        $ch = new AMQPChannel($this, $channel_id, true, $this->channel_rpc_timeout);
728
        $this->channels[$channel_id] = $ch;
729
730
        return $ch;
731
    }
732
733 40
    /**
734
     * Requests a connection close
735 40
     *
736 40
     * @param int $reply_code
737 5
     * @param string $reply_text
738
     * @param array $method_sig
739
     * @return mixed|null
740 40
     * @throws \Exception
741
     */
742 40
    public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
743 40
    {
744
        $this->io->disableHeartbeat();
745
        if (empty($this->protocolWriter) || !$this->isConnected()) {
746 40
            return null;
747 40
        }
748
749 40
        $result = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $result is dead and can be removed.
Loading history...
750 40
        try {
751 40
            $this->closeChannels();
752
            list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose(
753 40
                $reply_code,
754
                $reply_text,
755
                $method_sig[0],
756
                $method_sig[1]
757
            );
758
            $this->send_method_frame(array($class_id, $method_id), $args);
759
            $result = $this->wait(
760 40
                array($this->waitHelper->get_wait('connection.close_ok')),
761
                false,
762 40
                $this->connection_timeout
763
            );
764
        } catch (\Exception $exception) {
765
            $this->do_close();
766
            throw $exception;
767
        }
768
769
        $this->setIsConnected(false);
770
771
        return $result;
772
    }
773
774
    /**
775
     * @param AMQPReader $reader
776
     * @throws AMQPConnectionClosedException
777
     */
778
    protected function connection_close(AMQPReader $reader)
779
    {
780
        $code = (int)$reader->read_short();
781
        $reason = $reader->read_shortstr();
782
        $class = $reader->read_short();
783
        $method = $reader->read_short();
784
        $reason .= sprintf('(%s, %s)', $class, $method);
785
786
        $this->x_close_ok();
787
788
        throw new AMQPConnectionClosedException($reason, $code);
789
    }
790
791
    /**
792
     * Confirms a connection close
793
     */
794
    protected function x_close_ok()
795
    {
796 40
        $this->send_method_frame(
797
            explode(',', $this->waitHelper->get_wait('connection.close_ok'))
798 40
        );
799
        $this->do_close();
800
    }
801
802
    /**
803
     * Confirm a connection close
804
     */
805
    protected function connection_close_ok()
806
    {
807 47
        $this->do_close();
808
    }
809 47
810 47
    /**
811 47
     * @param string $virtual_host
812 47
     * @param string $capabilities
813 47
     * @param bool $insist
814
     * @return mixed
815
     */
816 47
    protected function x_open($virtual_host, $capabilities = '', $insist = false)
817
    {
818
        $args = new AMQPWriter();
819 47
        $args->write_shortstr($virtual_host);
820
        $args->write_shortstr($capabilities);
821
        $args->write_bits(array($insist));
822
        $this->send_method_frame(array(10, 40), $args);
823 47
824
        $wait = array(
825
            $this->waitHelper->get_wait('connection.open_ok')
826
        );
827
828
        if ($this->protocolVersion === Wire\Constants080::VERSION) {
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Channel\Abstr...annel::$protocolVersion has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

828
        if (/** @scrutinizer ignore-deprecated */ $this->protocolVersion === Wire\Constants080::VERSION) {
Loading history...
829
            $wait[] = $this->waitHelper->get_wait('connection.redirect');
830
        }
831 47
832
        return $this->wait($wait, false, $this->connection_timeout);
833 47
    }
834 47
835
    /**
836
     * Signals that the connection is ready
837
     *
838
     * @param AMQPReader $args
839
     */
840
    protected function connection_open_ok($args)
841
    {
842
        $this->known_hosts = $args->read_shortstr();
843
        $this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts);
844
    }
845
846
    /**
847
     * Asks the client to use a different server
848
     *
849
     * @param AMQPReader $args
850
     * @return string
851
     */
852
    protected function connection_redirect($args)
853
    {
854
        $host = $args->read_shortstr();
855
        $this->known_hosts = $args->read_shortstr();
856
        $this->debug->debug_msg(sprintf(
857
            'Redirected to [%s], known_hosts [%s]',
858
            $host,
859
            $this->known_hosts
860
        ));
861
862
        return $host;
863
    }
864
865
    /**
866
     * Security mechanism challenge
867
     *
868
     * @param AMQPReader $args
869
     */
870
    protected function connection_secure($args)
871
    {
872
        $args->read_longstr();
873
    }
874
875
    /**
876
     * Security mechanism response
877
     *
878
     * @param string $response
879
     */
880
    protected function x_secure_ok($response)
881
    {
882
        $args = new AMQPWriter();
883 47
        $args->write_longstr($response);
884
        $this->send_method_frame(array(10, 21), $args);
885 47
    }
886 47
887 47
    /**
888 47
     * Starts connection negotiation
889 47
     *
890
     * @param AMQPReader $args
891 47
     */
892 47
    protected function connection_start($args)
893 47
    {
894 47
        $this->version_major = $args->read_octet();
895 47
        $this->version_minor = $args->read_octet();
896 47
        $this->server_properties = $args->read_table();
897
        $this->mechanisms = explode(' ', $args->read_longstr());
898
        $this->locales = explode(' ', $args->read_longstr());
899
900
        $this->debug->debug_connection_start(
901
            $this->version_major,
902
            $this->version_minor,
903
            $this->server_properties,
904
            $this->mechanisms,
905
            $this->locales
906 47
        );
907
    }
908 47
909 47
    /**
910 47
     * @param AMQPTable|array $clientProperties
911 47
     * @param string $mechanism
912 47
     * @param string $response
913 47
     * @param string $locale
914
     */
915
    protected function x_start_ok($clientProperties, $mechanism, $response, $locale)
916
    {
917
        $args = new AMQPWriter();
918
        $args->write_table($clientProperties);
919
        $args->write_shortstr($mechanism);
920
        $args->write_longstr($response);
921 47
        $args->write_shortstr($locale);
922
        $this->send_method_frame(array(10, 11), $args);
923 47
    }
924 47
925 47
    /**
926
     * Proposes connection tuning parameters
927
     *
928 47
     * @param AMQPReader $args
929 47
     */
930 47
    protected function connection_tune($args)
931
    {
932
        $v = $args->read_short();
933
        if ($v) {
934 47
            $this->channel_max = $v;
935
        }
936
937
        $v = $args->read_long();
938 47
        if ($v) {
939
            $this->frame_max = (int)$v;
940
        }
941
942
        // @see https://www.rabbitmq.com/heartbeats.html
943
        // If either value is 0 (see below), the greater value of the two is used
944
        // Otherwise the smaller value of the two is used
945
        // A zero value indicates that a peer suggests disabling heartbeats entirely.
946
        // To disable heartbeats, both peers have to opt in and use the value of 0
947
        // For BC, this library opts for disabled heartbeat if client value is 0.
948 47
        $v = $args->read_short();
949
        if ($this->heartbeat > 0 && $v > 0) {
950 47
            $this->heartbeat = min($this->heartbeat, $v);
951 47
        }
952 47
953 47
        $this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat);
954 47
        $this->io->afterTune($this->heartbeat);
955 47
    }
956
957
    /**
958
     * Negotiates connection tuning parameters
959
     *
960
     * @param int $channel_max
961
     * @param int $frame_max
962
     * @param int $heartbeat
963
     */
964
    protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
965
    {
966
        $args = new AMQPWriter();
967
        $args->write_short($channel_max);
968
        $args->write_long($frame_max);
969
        $args->write_short($heartbeat);
970
        $this->send_method_frame(array(10, 31), $args);
971
        $this->wait_tune_ok = false;
972
    }
973
974
    /**
975
     * @return AbstractIO
976
     * @deprecated
977
     */
978
    public function getIO()
979
    {
980
        return $this->io;
981
    }
982
983 2
    /**
984
     * Check connection heartbeat if enabled.
985 2
     * @throws AMQPHeartbeatMissedException If too much time passed since last connection activity.
986
     * @throws AMQPConnectionClosedException If connection was closed due to network issues or timeouts.
987
     * @throws AMQPSocketException If connection was already closed.
988
     * @throws AMQPTimeoutException If heartbeat write takes too much time.
989
     * @throws AMQPIOException If other connection problems occurred.
990
     */
991
    public function checkHeartBeat()
992
    {
993
        $this->io->check_heartbeat();
994
    }
995
996
    /**
997
     * @return float|int
998
     */
999
    public function getLastActivity()
1000
    {
1001
        return $this->io->getLastActivity();
1002 1
    }
1003
1004 1
    /**
1005
     * @return float
1006 1
     * @since 3.2.0
1007
     */
1008
    public function getReadTimeout(): float
1009
    {
1010
        return $this->io->getReadTimeout();
1011
    }
1012
1013
    /**
1014
     * Handles connection blocked notifications
1015
     *
1016
     * @param AMQPReader $args
1017
     */
1018
    protected function connection_blocked(AMQPReader $args)
1019
    {
1020
        $this->blocked = true;
1021
        // Call the block handler and pass in the reason
1022
        $this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr()));
1023
    }
1024
1025
    /**
1026
     * Handles connection unblocked notifications
1027
     */
1028
    protected function connection_unblocked()
1029
    {
1030
        $this->blocked = false;
1031
        // No args to an unblock event
1032
        $this->dispatch_to_handler($this->connection_unblock_handler);
1033
    }
1034
1035
    /**
1036
     * Sets a handler which is called whenever a connection.block is sent from the server
1037
     *
1038
     * @param callable $callback
1039
     * @throws \InvalidArgumentException if $callback is not callable
1040
     */
1041
    public function set_connection_block_handler($callback)
1042
    {
1043
        Assert::isCallable($callback);
1044
        $this->connection_block_handler = $callback;
1045
    }
1046
1047
    /**
1048 48
     * Sets a handler which is called whenever a connection.block is sent from the server
1049
     *
1050 48
     * @param callable $callback
1051
     * @throws \InvalidArgumentException if $callback is not callable
1052
     */
1053
    public function set_connection_unblock_handler($callback)
1054
    {
1055
        Assert::isCallable($callback);
1056
        $this->connection_unblock_handler = $callback;
1057
    }
1058 15
1059
    /**
1060 15
     * Gets the connection status
1061
     *
1062
     * @return bool
1063
     */
1064
    public function isConnected()
1065
    {
1066
        return $this->is_connected;
1067 2
    }
1068
1069 2
    /**
1070
     * Get the connection blocked state.
1071
     * @return bool
1072
     * @since 2.12.0
1073
     */
1074
    public function isBlocked()
1075
    {
1076
        return $this->blocked;
1077 48
    }
1078
1079 48
    /**
1080
     * Get the io writing state.
1081
     * @return bool
1082
     */
1083
    public function isWriting()
1084
    {
1085 41
        return $this->writing;
1086
    }
1087 41
1088
    /**
1089 40
     * Set the connection status
1090 40
     *
1091
     * @param bool $is_connected
1092
     */
1093 7
    protected function setIsConnected($is_connected)
1094
    {
1095
        $this->is_connected = (bool) $is_connected;
1096
    }
1097
1098
    /**
1099
     * Closes all available channels
1100
     */
1101
    protected function closeChannels()
1102
    {
1103
        foreach ($this->channels as $key => $channel) {
1104
            // channels[0] is this connection object, so don't close it yet
1105 44
            if ($key === 0) {
1106
                continue;
1107 44
            }
1108
            try {
1109
                $channel->close();
1110
            } catch (\Exception $e) {
1111
                /* Ignore closing errors */
1112
            }
1113
        }
1114
    }
1115
1116
    /**
1117
     * Closes all available channels if disconnected
1118
     */
1119
    protected function closeChannelsIfDisconnected()
1120
    {
1121 4
        foreach ($this->channels as $key => $channel) {
1122
            // channels[0] is this connection object, so don't close it yet
1123 4
            if ($key === 0) {
1124
                continue;
1125
            }
1126
1127
            $channel->closeIfDisconnected();
1128
        }
1129
    }
1130
1131 49
    /**
1132
     * Should the connection be attempted during construction?
1133 49
     *
1134
     * @return bool
1135
     */
1136
    public function connectOnConstruct(): bool
1137
    {
1138
        if ($this->config) {
1139
            return !$this->config->isLazy();
1140
        }
1141
1142
        return true;
1143 2
    }
1144
1145 2
    /**
1146
     * @return array
1147
     */
1148
    public function getServerProperties()
1149
    {
1150
        return $this->server_properties;
1151 2
    }
1152 2
1153 2
    /**
1154 2
     * @return int
1155 2
     */
1156 2
    public function getHeartbeat()
1157 2
    {
1158
        return $this->heartbeat;
1159 2
    }
1160 2
1161
    /**
1162
     * Get the library properties for populating the client protocol information
1163
     *
1164
     * @return array
1165
     */
1166
    public function getLibraryProperties()
1167
    {
1168
        $config = self::$LIBRARY_PROPERTIES;
1169
        if ($this->config !== null) {
1170
            $connectionName = $this->config->getConnectionName();
1171
            if ($connectionName !== '') {
1172
                $config['connection_name'] = ['S', $connectionName];
1173
            }
1174
        }
1175
        return $config;
1176
    }
1177
1178
    /**
1179
     * @param array $hosts
1180
     * @param array $options
1181
     *
1182
     * @return mixed
1183
     * @throws \Exception
1184
     * @deprecated Use AMQPConnectionFactory.
1185
     */
1186
    public static function create_connection($hosts, $options = array())
1187
    {
1188
        if (!is_array($hosts) || count($hosts) < 1) {
0 ignored issues
show
introduced by
The condition is_array($hosts) is always true.
Loading history...
1189
            throw new \InvalidArgumentException(
1190
                'An array of hosts are required when attempting to create a connection'
1191
            );
1192
        }
1193
1194
        foreach ($hosts as $hostdef) {
1195
            self::validate_host($hostdef);
1196
            $host = $hostdef['host'];
1197
            $port = $hostdef['port'];
1198
            $user = $hostdef['user'];
1199
            $password = $hostdef['password'];
1200
            $vhost = isset($hostdef['vhost']) ? $hostdef['vhost'] : '/';
1201
            try {
1202
                $conn = static::try_create_connection($host, $port, $user, $password, $vhost, $options);
0 ignored issues
show
Bug introduced by
The method try_create_connection() does not exist on PhpAmqpLib\Connection\AbstractConnection. Did you maybe mean connect()? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

1202
                /** @scrutinizer ignore-call */ 
1203
                $conn = static::try_create_connection($host, $port, $user, $password, $vhost, $options);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
1203
                return $conn;
1204
            } catch (\Exception $e) {
1205
                $latest_exception = $e;
1206
            }
1207
        }
1208
        throw $latest_exception;
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $latest_exception seems to be defined by a foreach iteration on line 1194. Are you sure the iterator is never empty, otherwise this variable is not defined?
Loading history...
1209
    }
1210
1211
    public static function validate_host($host)
1212
    {
1213
        if (!isset($host['host'])) {
1214
            throw new \InvalidArgumentException("'host' key is required.");
1215
        }
1216
        if (!isset($host['port'])) {
1217
            throw new \InvalidArgumentException("'port' key is required.");
1218
        }
1219
        if (!isset($host['user'])) {
1220
            throw new \InvalidArgumentException("'user' key is required.");
1221
        }
1222
        if (!isset($host['password'])) {
1223
            throw new \InvalidArgumentException("'password' key is required.");
1224
        }
1225
    }
1226
}
1227